Skip to main content

mz_persist/indexed/columnar/
arrow.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Apache Arrow encodings and utils for persist data
11
12use std::sync::Arc;
13
14use anyhow::anyhow;
15use arrow::array::{Array, ArrayData, ArrayRef, BinaryArray, Int64Array, RecordBatch, make_array};
16use arrow::buffer::{BooleanBuffer, NullBuffer};
17use itertools::Itertools;
18
19use crate::indexed::columnar::{ColumnarRecords, ColumnarRecordsStructuredExt};
20use crate::indexed::encoding::BlobTraceUpdates;
21use crate::metrics::ColumnarMetrics;
22
23/// Converts a [`ColumnarRecords`] into [`arrow`] columns.
24///
25/// Both Time and Diff are presented externally to persist users as a type
26/// parameter that implements [mz_persist_types::Codec64]. Our columnar format
27/// intentionally stores them both as i64 columns (as opposed to something like
28/// a fixed width binary column) because this allows us additional compression
29/// options.
30///
31/// Also note that we intentionally use an i64 over a u64 for Time. Over the
32/// range `[0, i64::MAX]`, the bytes are the same and we've talked at various
33/// times about changing Time in mz to an i64. Both millis since unix epoch and
34/// nanos since unix epoch easily fit into this range (the latter until some
35/// time after year 2200). Using a i64 might be a pessimization for a
36/// non-realtime mz source with u64 timestamps in the range `(i64::MAX,
37/// u64::MAX]`, but realtime sources are overwhelmingly the common case.
38pub fn encode_arrow_batch(updates: &BlobTraceUpdates) -> RecordBatch {
39    fn array_ref<A: Array + Clone + 'static>(a: &A) -> ArrayRef {
40        Arc::new(a.clone())
41    }
42    // For historical reasons, the codec-encoded columns are placed before T/D,
43    // and the structured-encoding columns are placed after.
44    let kv = updates
45        .records()
46        .into_iter()
47        .flat_map(|x| [("k", array_ref(&x.key_data)), ("v", array_ref(&x.val_data))]);
48    let td = [
49        ("t", array_ref(updates.timestamps())),
50        ("d", array_ref(updates.diffs())),
51    ];
52    let ks_vs = updates
53        .structured()
54        .into_iter()
55        .flat_map(|x| [("k_s", Arc::clone(&x.key)), ("v_s", Arc::clone(&x.val))]);
56
57    // We expect all the top-level fields to be fully defined.
58    let fields = kv.chain(td).chain(ks_vs).map(|(f, a)| (f, a, false));
59    RecordBatch::try_from_iter_with_nullable(fields).expect("valid field definitions")
60}
61
62/// Walks the given arrow [`ArrayData`] recursively, dropping null buffers from
63/// non-nullable fields.
64///
65/// Workaround for <https://github.com/apache/arrow-rs/issues/6510>: parquet decoding
66/// can generate nulls in non-nullable fields that are only masked by, e.g., a
67/// grandparent, but some arrow code expects the direct parent to mask its
68/// non-nullable children. Dropping the buffer here prevents those validations
69/// from failing. (Top-level arrays are always marked nullable, so they're
70/// unaffected.)
71fn rebuild_data(data: ArrayData, nullable: bool, metrics: &ColumnarMetrics) -> ArrayData {
72    let buffers = data.buffers().to_vec();
73    let child_data = {
74        let field_iter = mz_persist_types::arrow::fields_for_type(data.data_type()).iter();
75        let child_iter = data.child_data().iter();
76        field_iter
77            .zip_eq(child_iter)
78            .map(|(f, d)| rebuild_data(d.clone(), f.is_nullable(), metrics))
79            .collect()
80    };
81    let nulls = if nullable {
82        data.nulls()
83            .map(|n| NullBuffer::new(BooleanBuffer::new(n.buffer().clone(), n.offset(), n.len())))
84    } else {
85        if data.nulls().is_some() {
86            metrics.parquet.elided_null_buffers.inc();
87        }
88        None
89    };
90
91    // Note that `build` only performs shallow validations, but since we rebuild the array
92    // recursively we will have performed the equivalent of `ArrayData::validation_full` on
93    // the output.
94    data.into_builder()
95        .buffers(buffers)
96        .child_data(child_data)
97        .nulls(nulls)
98        .build()
99        .expect("reconstructing valid arrow array")
100}
101
102/// Rebuild the given array, dropping null buffers on non-nullable fields.
103pub fn realloc_array<A: Array + From<ArrayData>>(array: &A, metrics: &ColumnarMetrics) -> A {
104    let data = array.to_data();
105    // Top-level arrays are always nullable.
106    let data = rebuild_data(data, true, metrics);
107    A::from(data)
108}
109
110/// Rebuild the given array ref, dropping null buffers on non-nullable fields.
111pub fn realloc_any(array: ArrayRef, metrics: &ColumnarMetrics) -> ArrayRef {
112    let data = array.into_data();
113    // Top-level arrays are always nullable.
114    let data = rebuild_data(data, true, metrics);
115    make_array(data)
116}
117
118/// Converts an [`arrow`] [RecordBatch] into a [BlobTraceUpdates] and reallocate the backing data.
119pub fn decode_arrow_batch(
120    batch: &RecordBatch,
121    metrics: &ColumnarMetrics,
122) -> anyhow::Result<BlobTraceUpdates> {
123    fn try_downcast<A: Array + From<ArrayData> + 'static>(
124        batch: &RecordBatch,
125        name: &'static str,
126        metrics: &ColumnarMetrics,
127    ) -> anyhow::Result<Option<A>> {
128        let Some(array_ref) = batch.column_by_name(name) else {
129            return Ok(None);
130        };
131        let col_ref = array_ref
132            .as_any()
133            .downcast_ref::<A>()
134            .ok_or_else(|| anyhow!("wrong datatype for column {}", name))?;
135        let col = realloc_array(col_ref, metrics);
136        Ok(Some(col))
137    }
138
139    let codec_key = try_downcast::<BinaryArray>(batch, "k", metrics)?;
140    let codec_val = try_downcast::<BinaryArray>(batch, "v", metrics)?;
141    let timestamps = try_downcast::<Int64Array>(batch, "t", metrics)?
142        .ok_or_else(|| anyhow!("missing timestamp column"))?;
143    let diffs = try_downcast::<Int64Array>(batch, "d", metrics)?
144        .ok_or_else(|| anyhow!("missing diff column"))?;
145    let structured_key = batch
146        .column_by_name("k_s")
147        .map(|a| realloc_any(Arc::clone(a), metrics));
148    let structured_val = batch
149        .column_by_name("v_s")
150        .map(|a| realloc_any(Arc::clone(a), metrics));
151
152    let updates = match (codec_key, codec_val, structured_key, structured_val) {
153        (Some(codec_key), Some(codec_val), Some(structured_key), Some(structured_val)) => {
154            BlobTraceUpdates::Both(
155                ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
156                ColumnarRecordsStructuredExt {
157                    key: structured_key,
158                    val: structured_val,
159                },
160            )
161        }
162        (Some(codec_key), Some(codec_val), None, None) => BlobTraceUpdates::Row(
163            ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
164        ),
165        (None, None, Some(structured_key), Some(structured_val)) => BlobTraceUpdates::Structured {
166            key_values: ColumnarRecordsStructuredExt {
167                key: structured_key,
168                val: structured_val,
169            },
170            timestamps,
171            diffs,
172        },
173        (k, v, ks, vs) => {
174            anyhow::bail!(
175                "unexpected mix of key/value columns: k={:?}, v={}, k_s={}, v_s={}",
176                k.is_some(),
177                v.is_some(),
178                ks.is_some(),
179                vs.is_some(),
180            );
181        }
182    };
183
184    Ok(updates)
185}