mz_persist/indexed/columnar/
parquet.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Apache Parquet encodings and utils for persist data
11
12use std::io::Write;
13use std::sync::Arc;
14
15use differential_dataflow::trace::Description;
16use mz_ore::bytes::SegmentedBytes;
17use mz_ore::cast::CastFrom;
18use mz_persist_types::Codec64;
19use mz_persist_types::parquet::EncodingConfig;
20use parquet::arrow::ArrowWriter;
21use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ParquetRecordBatchReaderBuilder};
22use parquet::basic::Encoding;
23use parquet::file::metadata::KeyValue;
24use parquet::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
25use timely::progress::{Antichain, Timestamp};
26use tracing::warn;
27
28use crate::error::Error;
29use crate::generated::persist::ProtoBatchFormat;
30use crate::generated::persist::proto_batch_part_inline::FormatMetadata as ProtoFormatMetadata;
31use crate::indexed::columnar::arrow::{decode_arrow_batch, encode_arrow_batch};
32use crate::indexed::encoding::{
33    BlobTraceBatchPart, BlobTraceUpdates, decode_trace_inline_meta, encode_trace_inline_meta,
34};
35use crate::metrics::{ColumnarMetrics, ParquetColumnMetrics};
36
37const INLINE_METADATA_KEY: &str = "MZ:inline";
38
39/// Encodes a [`BlobTraceBatchPart`] into the Parquet format.
40pub fn encode_trace_parquet<W: Write + Send, T: Timestamp + Codec64>(
41    w: &mut W,
42    batch: &BlobTraceBatchPart<T>,
43    metrics: &ColumnarMetrics,
44    cfg: &EncodingConfig,
45) -> Result<(), Error> {
46    // Better to error now than write out an invalid batch.
47    batch.validate()?;
48
49    let inline_meta = encode_trace_inline_meta(batch);
50    encode_parquet_kvtd(w, inline_meta, &batch.updates, metrics, cfg)
51}
52
53/// Decodes a BlobTraceBatchPart from the Parquet format.
54pub fn decode_trace_parquet<T: Timestamp + Codec64>(
55    buf: SegmentedBytes,
56    metrics: &ColumnarMetrics,
57) -> Result<BlobTraceBatchPart<T>, Error> {
58    let metadata = ArrowReaderMetadata::load(&buf, Default::default())?;
59    let metadata = metadata
60        .metadata()
61        .file_metadata()
62        .key_value_metadata()
63        .as_ref()
64        .and_then(|x| x.iter().find(|x| x.key == INLINE_METADATA_KEY));
65
66    let (format, metadata) = decode_trace_inline_meta(metadata.and_then(|x| x.value.as_ref()))?;
67    let updates = match format {
68        ProtoBatchFormat::Unknown => return Err("unknown format".into()),
69        ProtoBatchFormat::ArrowKvtd => {
70            return Err("ArrowKVTD format not supported in parquet".into());
71        }
72        ProtoBatchFormat::ParquetKvtd => decode_parquet_file_kvtd(buf, None, metrics)?,
73        ProtoBatchFormat::ParquetStructured => {
74            // Even though `format_metadata` is optional, we expect it when
75            // our format is ParquetStructured.
76            let format_metadata = metadata
77                .format_metadata
78                .as_ref()
79                .ok_or_else(|| "missing field 'format_metadata'".to_string())?;
80            decode_parquet_file_kvtd(buf, Some(format_metadata), metrics)?
81        }
82    };
83
84    let ret = BlobTraceBatchPart {
85        desc: metadata.desc.map_or_else(
86            || {
87                Description::new(
88                    Antichain::from_elem(T::minimum()),
89                    Antichain::from_elem(T::minimum()),
90                    Antichain::from_elem(T::minimum()),
91                )
92            },
93            |x| x.into(),
94        ),
95        index: metadata.index,
96        updates,
97    };
98    ret.validate()?;
99    Ok(ret)
100}
101
102/// Encodes [`BlobTraceUpdates`] to Parquet using the [`parquet`] crate.
103pub fn encode_parquet_kvtd<W: Write + Send>(
104    w: &mut W,
105    inline_base64: String,
106    updates: &BlobTraceUpdates,
107    metrics: &ColumnarMetrics,
108    cfg: &EncodingConfig,
109) -> Result<(), Error> {
110    let metadata = KeyValue::new(INLINE_METADATA_KEY.to_string(), inline_base64);
111
112    // Note: most of these settings are the defaults from `arrow2` which we
113    // previously used and maintain until we tune with benchmarking.
114    let properties = WriterProperties::builder()
115        .set_dictionary_enabled(cfg.use_dictionary)
116        .set_encoding(Encoding::PLAIN)
117        .set_statistics_enabled(EnabledStatistics::None)
118        .set_compression(cfg.compression.into())
119        .set_writer_version(WriterVersion::PARQUET_2_0)
120        .set_data_page_size_limit(1024 * 1024)
121        .set_max_row_group_size(usize::MAX)
122        .set_key_value_metadata(Some(vec![metadata]))
123        .build();
124
125    let batch = encode_arrow_batch(updates);
126    let format = match updates {
127        BlobTraceUpdates::Row(_) => "k,v,t,d",
128        BlobTraceUpdates::Both(_, _) => "k,v,t,d,k_s,v_s",
129        BlobTraceUpdates::Structured { .. } => "t,d,k_s,v_s",
130    };
131
132    let mut writer = ArrowWriter::try_new(w, batch.schema(), Some(properties))?;
133    writer.write(&batch)?;
134
135    writer.flush()?;
136    let bytes_written = writer.bytes_written();
137    let file_metadata = writer.close()?;
138
139    report_parquet_metrics(metrics, &file_metadata, bytes_written, format);
140
141    Ok(())
142}
143
144/// Decodes [`BlobTraceUpdates`] from a reader, using [`arrow`].
145pub fn decode_parquet_file_kvtd(
146    r: impl parquet::file::reader::ChunkReader + 'static,
147    format_metadata: Option<&ProtoFormatMetadata>,
148    metrics: &ColumnarMetrics,
149) -> Result<BlobTraceUpdates, Error> {
150    let builder = ParquetRecordBatchReaderBuilder::try_new(r)?;
151
152    // To match arrow2, we default the batch size to the number of rows in the RowGroup.
153    let row_groups = builder.metadata().row_groups();
154    if row_groups.len() > 1 {
155        return Err(Error::String("found more than 1 RowGroup".to_string()));
156    }
157    let num_rows = usize::try_from(row_groups[0].num_rows())
158        .map_err(|_| Error::String("found negative rows".to_string()))?;
159    let builder = builder.with_batch_size(num_rows);
160
161    let schema = Arc::clone(builder.schema());
162    let mut reader = builder.build()?;
163
164    match format_metadata {
165        None => {
166            let mut ret = Vec::new();
167            for batch in reader {
168                let batch = batch.map_err(|e| Error::String(e.to_string()))?;
169                ret.push(batch);
170            }
171            if ret.len() != 1 {
172                warn!("unexpected number of row groups: {}", ret.len());
173            }
174            let batch = ::arrow::compute::concat_batches(&schema, &ret)?;
175            let updates = decode_arrow_batch(&batch, metrics).map_err(|e| e.to_string())?;
176            Ok(updates)
177        }
178        Some(ProtoFormatMetadata::StructuredMigration(v @ 1..=3)) => {
179            let mut batch = reader
180                .next()
181                .ok_or_else(|| Error::String("found empty batch".to_string()))??;
182
183            // We enforce an invariant that we have a single RowGroup.
184            if reader.next().is_some() {
185                return Err(Error::String("found more than one RowGroup".to_string()));
186            }
187
188            // Version 1 is a deprecated format so we just ignored the k_s and v_s columns.
189            if *v == 1 && batch.num_columns() > 4 {
190                batch = batch.project(&[0, 1, 2, 3])?;
191            }
192
193            let updates = decode_arrow_batch(&batch, metrics).map_err(|e| e.to_string())?;
194            Ok(updates)
195        }
196        unknown => Err(format!("unkown ProtoFormatMetadata, {unknown:?}"))?,
197    }
198}
199
200/// Best effort reporting of metrics from the resulting [`parquet::format::FileMetaData`] returned
201/// from the [`ArrowWriter`].
202fn report_parquet_metrics(
203    metrics: &ColumnarMetrics,
204    metadata: &parquet::format::FileMetaData,
205    bytes_written: usize,
206    format: &'static str,
207) {
208    metrics
209        .parquet()
210        .num_row_groups
211        .with_label_values(&[format])
212        .inc_by(u64::cast_from(metadata.row_groups.len()));
213    metrics
214        .parquet()
215        .encoded_size
216        .with_label_values(&[format])
217        .inc_by(u64::cast_from(bytes_written));
218
219    let report_column_size = |col_name: &str, metrics: &ParquetColumnMetrics| {
220        let (uncomp, comp) = metadata
221            .row_groups
222            .iter()
223            .map(|row_group| row_group.columns.iter())
224            .flatten()
225            .filter_map(|col_chunk| col_chunk.meta_data.as_ref())
226            .filter(|m| m.path_in_schema.first().map(|s| s.as_str()) == Some(col_name))
227            .map(|m| (m.total_uncompressed_size, m.total_compressed_size))
228            .fold((0, 0), |(tot_u, tot_c), (u, c)| (tot_u + u, tot_c + c));
229
230        let uncomp = uncomp.try_into().unwrap_or(0u64);
231        let comp = comp.try_into().unwrap_or(0u64);
232
233        metrics.report_sizes(uncomp, comp);
234    };
235
236    report_column_size("k", &metrics.parquet().k_metrics);
237    report_column_size("v", &metrics.parquet().v_metrics);
238    report_column_size("t", &metrics.parquet().t_metrics);
239    report_column_size("d", &metrics.parquet().d_metrics);
240    report_column_size("k_s", &metrics.parquet().k_s_metrics);
241    report_column_size("v_s", &metrics.parquet().v_s_metrics);
242}