mz_persist/indexed/columnar/
arrow.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Apache Arrow encodings and utils for persist data
11
12use std::ptr::NonNull;
13use std::sync::Arc;
14
15use anyhow::anyhow;
16use arrow::array::{Array, ArrayData, ArrayRef, BinaryArray, Int64Array, RecordBatch, make_array};
17use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer};
18use arrow::datatypes::ToByteSlice;
19use itertools::Itertools;
20use mz_dyncfg::Config;
21
22use crate::indexed::columnar::{ColumnarRecords, ColumnarRecordsStructuredExt};
23use crate::indexed::encoding::BlobTraceUpdates;
24use crate::metrics::ColumnarMetrics;
25
26/// Converts a [`ColumnarRecords`] into [`arrow`] columns.
27///
28/// Both Time and Diff are presented externally to persist users as a type
29/// parameter that implements [mz_persist_types::Codec64]. Our columnar format
30/// intentionally stores them both as i64 columns (as opposed to something like
31/// a fixed width binary column) because this allows us additional compression
32/// options.
33///
34/// Also note that we intentionally use an i64 over a u64 for Time. Over the
35/// range `[0, i64::MAX]`, the bytes are the same and we've talked at various
36/// times about changing Time in mz to an i64. Both millis since unix epoch and
37/// nanos since unix epoch easily fit into this range (the latter until some
38/// time after year 2200). Using a i64 might be a pessimization for a
39/// non-realtime mz source with u64 timestamps in the range `(i64::MAX,
40/// u64::MAX]`, but realtime sources are overwhelmingly the common case.
41pub fn encode_arrow_batch(updates: &BlobTraceUpdates) -> RecordBatch {
42    fn array_ref<A: Array + Clone + 'static>(a: &A) -> ArrayRef {
43        Arc::new(a.clone())
44    }
45    // For historical reasons, the codec-encoded columns are placed before T/D,
46    // and the structured-encoding columns are placed after.
47    let kv = updates
48        .records()
49        .into_iter()
50        .flat_map(|x| [("k", array_ref(&x.key_data)), ("v", array_ref(&x.val_data))]);
51    let td = [
52        ("t", array_ref(updates.timestamps())),
53        ("d", array_ref(updates.diffs())),
54    ];
55    let ks_vs = updates
56        .structured()
57        .into_iter()
58        .flat_map(|x| [("k_s", Arc::clone(&x.key)), ("v_s", Arc::clone(&x.val))]);
59
60    // We expect all the top-level fields to be fully defined.
61    let fields = kv.chain(td).chain(ks_vs).map(|(f, a)| (f, a, false));
62    RecordBatch::try_from_iter_with_nullable(fields).expect("valid field definitions")
63}
64
65pub(crate) const ENABLE_ARROW_LGALLOC_CC_SIZES: Config<bool> = Config::new(
66    "persist_enable_arrow_lgalloc_cc_sizes",
67    true,
68    "An incident flag to disable copying decoded arrow data into lgalloc on cc sized clusters.",
69);
70
71pub(crate) const ENABLE_ARROW_LGALLOC_NONCC_SIZES: Config<bool> = Config::new(
72    "persist_enable_arrow_lgalloc_noncc_sizes",
73    false,
74    "A feature flag to enable copying decoded arrow data into lgalloc on non-cc sized clusters.",
75);
76
77fn realloc_data(data: ArrayData, nullable: bool, metrics: &ColumnarMetrics) -> ArrayData {
78    // NB: Arrow generally aligns buffers very coarsely: see arrow::alloc::ALIGNMENT.
79    // However, lgalloc aligns buffers even more coarsely - to the page boundary -
80    // so we never expect alignment issues in practice. If that changes, build()
81    // will return an error below, as it does for all invalid data.
82    let buffers = data
83        .buffers()
84        .iter()
85        .map(|b| realloc_buffer(b, metrics))
86        .collect();
87    let child_data = {
88        let field_iter = mz_persist_types::arrow::fields_for_type(data.data_type()).iter();
89        let child_iter = data.child_data().iter();
90        field_iter
91            .zip_eq(child_iter)
92            .map(|(f, d)| realloc_data(d.clone(), f.is_nullable(), metrics))
93            .collect()
94    };
95    let nulls = if nullable {
96        data.nulls().map(|n| {
97            let buffer = realloc_buffer(n.buffer(), metrics);
98            NullBuffer::new(BooleanBuffer::new(buffer, n.offset(), n.len()))
99        })
100    } else {
101        if data.nulls().is_some() {
102            // This is a workaround for: https://github.com/apache/arrow-rs/issues/6510
103            // It should always be safe to drop the null buffer for a non-nullable field, since
104            // any nulls cannot possibly represent real data and thus must be masked off at
105            // some higher level. We always realloc data we get back from parquet, so this is
106            // a convenient and efficient place to do the rewrite.
107            // Why does this help? Parquet decoding can generate nulls in non-nullable fields
108            // that are only masked by eg. a grandparent, not the direct parent... but some arrow
109            // code expects the parent to mask any nulls in its non-nullable children. Dropping
110            // the buffer here prevents those validations from failing. (Top-level arrays are always
111            // marked nullable, but since they don't have parents that's not a problem either.)
112            metrics.parquet.elided_null_buffers.inc();
113        }
114        None
115    };
116
117    // Note that `build` only performs shallow validations, but since we rebuild the array
118    // recursively we will have performed the equivalent of `ArrayData::validation_full` on
119    // the output.
120    data.into_builder()
121        .buffers(buffers)
122        .child_data(child_data)
123        .nulls(nulls)
124        .build()
125        .expect("reconstructing valid arrow array")
126}
127
128/// Re-allocate the backing storage for a specific array using lgalloc, if it's configured.
129/// (And hopefully-temporarily work around a parquet decoding issue upstream.)
130pub fn realloc_array<A: Array + From<ArrayData>>(array: &A, metrics: &ColumnarMetrics) -> A {
131    let data = array.to_data();
132    // Top-level arrays are always nullable.
133    let data = realloc_data(data, true, metrics);
134    A::from(data)
135}
136
137/// Re-allocate the backing storage for an array ref using lgalloc, if it's configured.
138/// (And hopefully-temporarily work around a parquet decoding issue upstream.)
139pub fn realloc_any(array: ArrayRef, metrics: &ColumnarMetrics) -> ArrayRef {
140    let data = array.into_data();
141    // Top-level arrays are always nullable.
142    let data = realloc_data(data, true, metrics);
143    make_array(data)
144}
145
146fn realloc_buffer(buffer: &Buffer, metrics: &ColumnarMetrics) -> Buffer {
147    let use_lgbytes_mmap = if metrics.is_cc_active {
148        ENABLE_ARROW_LGALLOC_CC_SIZES.get(&metrics.cfg)
149    } else {
150        ENABLE_ARROW_LGALLOC_NONCC_SIZES.get(&metrics.cfg)
151    };
152    let region = if use_lgbytes_mmap {
153        metrics
154            .lgbytes_arrow
155            .try_mmap_region(buffer.as_slice())
156            .ok()
157    } else {
158        None
159    };
160    let Some(region) = region else {
161        return buffer.clone();
162    };
163    let bytes: &[u8] = region.as_ref().to_byte_slice();
164    let ptr: NonNull<[u8]> = bytes.into();
165    // This is fine: see [[NonNull::as_non_null_ptr]] for an unstable version of this usage.
166    let ptr: NonNull<u8> = ptr.cast();
167    // SAFETY: `ptr` is valid for `len` bytes, and kept alive as long as `region` lives.
168    unsafe { Buffer::from_custom_allocation(ptr, bytes.len(), Arc::new(region)) }
169}
170
171/// Converts an [`arrow`] [RecordBatch] into a [BlobTraceUpdates] and reallocate the backing data.
172pub fn decode_arrow_batch(
173    batch: &RecordBatch,
174    metrics: &ColumnarMetrics,
175) -> anyhow::Result<BlobTraceUpdates> {
176    fn try_downcast<A: Array + From<ArrayData> + 'static>(
177        batch: &RecordBatch,
178        name: &'static str,
179        metrics: &ColumnarMetrics,
180    ) -> anyhow::Result<Option<A>> {
181        let Some(array_ref) = batch.column_by_name(name) else {
182            return Ok(None);
183        };
184        let col_ref = array_ref
185            .as_any()
186            .downcast_ref::<A>()
187            .ok_or_else(|| anyhow!("wrong datatype for column {}", name))?;
188        let col = realloc_array(col_ref, metrics);
189        Ok(Some(col))
190    }
191
192    let codec_key = try_downcast::<BinaryArray>(batch, "k", metrics)?;
193    let codec_val = try_downcast::<BinaryArray>(batch, "v", metrics)?;
194    let timestamps = try_downcast::<Int64Array>(batch, "t", metrics)?
195        .ok_or_else(|| anyhow!("missing timestamp column"))?;
196    let diffs = try_downcast::<Int64Array>(batch, "d", metrics)?
197        .ok_or_else(|| anyhow!("missing diff column"))?;
198    let structured_key = batch
199        .column_by_name("k_s")
200        .map(|a| realloc_any(Arc::clone(a), metrics));
201    let structured_val = batch
202        .column_by_name("v_s")
203        .map(|a| realloc_any(Arc::clone(a), metrics));
204
205    let updates = match (codec_key, codec_val, structured_key, structured_val) {
206        (Some(codec_key), Some(codec_val), Some(structured_key), Some(structured_val)) => {
207            BlobTraceUpdates::Both(
208                ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
209                ColumnarRecordsStructuredExt {
210                    key: structured_key,
211                    val: structured_val,
212                },
213            )
214        }
215        (Some(codec_key), Some(codec_val), None, None) => BlobTraceUpdates::Row(
216            ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
217        ),
218        (None, None, Some(structured_key), Some(structured_val)) => BlobTraceUpdates::Structured {
219            key_values: ColumnarRecordsStructuredExt {
220                key: structured_key,
221                val: structured_val,
222            },
223            timestamps,
224            diffs,
225        },
226        (k, v, ks, vs) => {
227            anyhow::bail!(
228                "unexpected mix of key/value columns: k={:?}, v={}, k_s={}, v_s={}",
229                k.is_some(),
230                v.is_some(),
231                ks.is_some(),
232                vs.is_some(),
233            );
234        }
235    };
236
237    Ok(updates)
238}