mz_persist/indexed/columnar/
parquet.rs
1use std::io::Write;
13use std::sync::Arc;
14
15use differential_dataflow::trace::Description;
16use mz_ore::bytes::SegmentedBytes;
17use mz_ore::cast::CastFrom;
18use mz_persist_types::Codec64;
19use mz_persist_types::parquet::EncodingConfig;
20use parquet::arrow::ArrowWriter;
21use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ParquetRecordBatchReaderBuilder};
22use parquet::basic::Encoding;
23use parquet::file::metadata::KeyValue;
24use parquet::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
25use timely::progress::{Antichain, Timestamp};
26use tracing::warn;
27
28use crate::error::Error;
29use crate::generated::persist::ProtoBatchFormat;
30use crate::generated::persist::proto_batch_part_inline::FormatMetadata as ProtoFormatMetadata;
31use crate::indexed::columnar::arrow::{decode_arrow_batch, encode_arrow_batch};
32use crate::indexed::encoding::{
33 BlobTraceBatchPart, BlobTraceUpdates, decode_trace_inline_meta, encode_trace_inline_meta,
34};
35use crate::metrics::{ColumnarMetrics, ParquetColumnMetrics};
36
37const INLINE_METADATA_KEY: &str = "MZ:inline";
38
39pub fn encode_trace_parquet<W: Write + Send, T: Timestamp + Codec64>(
41 w: &mut W,
42 batch: &BlobTraceBatchPart<T>,
43 metrics: &ColumnarMetrics,
44 cfg: &EncodingConfig,
45) -> Result<(), Error> {
46 batch.validate()?;
48
49 let inline_meta = encode_trace_inline_meta(batch);
50 encode_parquet_kvtd(w, inline_meta, &batch.updates, metrics, cfg)
51}
52
53pub fn decode_trace_parquet<T: Timestamp + Codec64>(
55 buf: SegmentedBytes,
56 metrics: &ColumnarMetrics,
57) -> Result<BlobTraceBatchPart<T>, Error> {
58 let metadata = ArrowReaderMetadata::load(&buf, Default::default())?;
59 let metadata = metadata
60 .metadata()
61 .file_metadata()
62 .key_value_metadata()
63 .as_ref()
64 .and_then(|x| x.iter().find(|x| x.key == INLINE_METADATA_KEY));
65
66 let (format, metadata) = decode_trace_inline_meta(metadata.and_then(|x| x.value.as_ref()))?;
67 let updates = match format {
68 ProtoBatchFormat::Unknown => return Err("unknown format".into()),
69 ProtoBatchFormat::ArrowKvtd => {
70 return Err("ArrowKVTD format not supported in parquet".into());
71 }
72 ProtoBatchFormat::ParquetKvtd => decode_parquet_file_kvtd(buf, None, metrics)?,
73 ProtoBatchFormat::ParquetStructured => {
74 let format_metadata = metadata
77 .format_metadata
78 .as_ref()
79 .ok_or_else(|| "missing field 'format_metadata'".to_string())?;
80 decode_parquet_file_kvtd(buf, Some(format_metadata), metrics)?
81 }
82 };
83
84 let ret = BlobTraceBatchPart {
85 desc: metadata.desc.map_or_else(
86 || {
87 Description::new(
88 Antichain::from_elem(T::minimum()),
89 Antichain::from_elem(T::minimum()),
90 Antichain::from_elem(T::minimum()),
91 )
92 },
93 |x| x.into(),
94 ),
95 index: metadata.index,
96 updates,
97 };
98 ret.validate()?;
99 Ok(ret)
100}
101
102pub fn encode_parquet_kvtd<W: Write + Send>(
104 w: &mut W,
105 inline_base64: String,
106 updates: &BlobTraceUpdates,
107 metrics: &ColumnarMetrics,
108 cfg: &EncodingConfig,
109) -> Result<(), Error> {
110 let metadata = KeyValue::new(INLINE_METADATA_KEY.to_string(), inline_base64);
111
112 let properties = WriterProperties::builder()
115 .set_dictionary_enabled(cfg.use_dictionary)
116 .set_encoding(Encoding::PLAIN)
117 .set_statistics_enabled(EnabledStatistics::None)
118 .set_compression(cfg.compression.into())
119 .set_writer_version(WriterVersion::PARQUET_2_0)
120 .set_data_page_size_limit(1024 * 1024)
121 .set_max_row_group_size(usize::MAX)
122 .set_key_value_metadata(Some(vec![metadata]))
123 .build();
124
125 let batch = encode_arrow_batch(updates);
126 let format = match updates {
127 BlobTraceUpdates::Row(_) => "k,v,t,d",
128 BlobTraceUpdates::Both(_, _) => "k,v,t,d,k_s,v_s",
129 BlobTraceUpdates::Structured { .. } => "t,d,k_s,v_s",
130 };
131
132 let mut writer = ArrowWriter::try_new(w, batch.schema(), Some(properties))?;
133 writer.write(&batch)?;
134
135 writer.flush()?;
136 let bytes_written = writer.bytes_written();
137 let file_metadata = writer.close()?;
138
139 report_parquet_metrics(metrics, &file_metadata, bytes_written, format);
140
141 Ok(())
142}
143
144pub fn decode_parquet_file_kvtd(
146 r: impl parquet::file::reader::ChunkReader + 'static,
147 format_metadata: Option<&ProtoFormatMetadata>,
148 metrics: &ColumnarMetrics,
149) -> Result<BlobTraceUpdates, Error> {
150 let builder = ParquetRecordBatchReaderBuilder::try_new(r)?;
151
152 let row_groups = builder.metadata().row_groups();
154 if row_groups.len() > 1 {
155 return Err(Error::String("found more than 1 RowGroup".to_string()));
156 }
157 let num_rows = usize::try_from(row_groups[0].num_rows())
158 .map_err(|_| Error::String("found negative rows".to_string()))?;
159 let builder = builder.with_batch_size(num_rows);
160
161 let schema = Arc::clone(builder.schema());
162 let mut reader = builder.build()?;
163
164 match format_metadata {
165 None => {
166 let mut ret = Vec::new();
167 for batch in reader {
168 let batch = batch.map_err(|e| Error::String(e.to_string()))?;
169 ret.push(batch);
170 }
171 if ret.len() != 1 {
172 warn!("unexpected number of row groups: {}", ret.len());
173 }
174 let batch = ::arrow::compute::concat_batches(&schema, &ret)?;
175 let updates = decode_arrow_batch(&batch, metrics).map_err(|e| e.to_string())?;
176 Ok(updates)
177 }
178 Some(ProtoFormatMetadata::StructuredMigration(v @ 1..=3)) => {
179 let mut batch = reader
180 .next()
181 .ok_or_else(|| Error::String("found empty batch".to_string()))??;
182
183 if reader.next().is_some() {
185 return Err(Error::String("found more than one RowGroup".to_string()));
186 }
187
188 if *v == 1 && batch.num_columns() > 4 {
190 batch = batch.project(&[0, 1, 2, 3])?;
191 }
192
193 let updates = decode_arrow_batch(&batch, metrics).map_err(|e| e.to_string())?;
194 Ok(updates)
195 }
196 unknown => Err(format!("unkown ProtoFormatMetadata, {unknown:?}"))?,
197 }
198}
199
200fn report_parquet_metrics(
203 metrics: &ColumnarMetrics,
204 metadata: &parquet::format::FileMetaData,
205 bytes_written: usize,
206 format: &'static str,
207) {
208 metrics
209 .parquet()
210 .num_row_groups
211 .with_label_values(&[format])
212 .inc_by(u64::cast_from(metadata.row_groups.len()));
213 metrics
214 .parquet()
215 .encoded_size
216 .with_label_values(&[format])
217 .inc_by(u64::cast_from(bytes_written));
218
219 let report_column_size = |col_name: &str, metrics: &ParquetColumnMetrics| {
220 let (uncomp, comp) = metadata
221 .row_groups
222 .iter()
223 .map(|row_group| row_group.columns.iter())
224 .flatten()
225 .filter_map(|col_chunk| col_chunk.meta_data.as_ref())
226 .filter(|m| m.path_in_schema.first().map(|s| s.as_str()) == Some(col_name))
227 .map(|m| (m.total_uncompressed_size, m.total_compressed_size))
228 .fold((0, 0), |(tot_u, tot_c), (u, c)| (tot_u + u, tot_c + c));
229
230 let uncomp = uncomp.try_into().unwrap_or(0u64);
231 let comp = comp.try_into().unwrap_or(0u64);
232
233 metrics.report_sizes(uncomp, comp);
234 };
235
236 report_column_size("k", &metrics.parquet().k_metrics);
237 report_column_size("v", &metrics.parquet().v_metrics);
238 report_column_size("t", &metrics.parquet().t_metrics);
239 report_column_size("d", &metrics.parquet().d_metrics);
240 report_column_size("k_s", &metrics.parquet().k_s_metrics);
241 report_column_size("v_s", &metrics.parquet().v_s_metrics);
242}