mz_persist/indexed/columnar/
arrow.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Apache Arrow encodings and utils for persist data
11
12use std::ptr::NonNull;
13use std::sync::Arc;
14
15use anyhow::anyhow;
16use arrow::array::{Array, ArrayData, ArrayRef, BinaryArray, Int64Array, RecordBatch, make_array};
17use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer};
18use arrow::datatypes::ToByteSlice;
19use mz_dyncfg::Config;
20
21use crate::indexed::columnar::{ColumnarRecords, ColumnarRecordsStructuredExt};
22use crate::indexed::encoding::BlobTraceUpdates;
23use crate::metrics::ColumnarMetrics;
24
25/// Converts a [`ColumnarRecords`] into [`arrow`] columns.
26///
27/// Both Time and Diff are presented externally to persist users as a type
28/// parameter that implements [mz_persist_types::Codec64]. Our columnar format
29/// intentionally stores them both as i64 columns (as opposed to something like
30/// a fixed width binary column) because this allows us additional compression
31/// options.
32///
33/// Also note that we intentionally use an i64 over a u64 for Time. Over the
34/// range `[0, i64::MAX]`, the bytes are the same and we've talked at various
35/// times about changing Time in mz to an i64. Both millis since unix epoch and
36/// nanos since unix epoch easily fit into this range (the latter until some
37/// time after year 2200). Using a i64 might be a pessimization for a
38/// non-realtime mz source with u64 timestamps in the range `(i64::MAX,
39/// u64::MAX]`, but realtime sources are overwhelmingly the common case.
40pub fn encode_arrow_batch(updates: &BlobTraceUpdates) -> RecordBatch {
41    fn array_ref<A: Array + Clone + 'static>(a: &A) -> ArrayRef {
42        Arc::new(a.clone())
43    }
44    // For historical reasons, the codec-encoded columns are placed before T/D,
45    // and the structured-encoding columns are placed after.
46    let kv = updates
47        .records()
48        .into_iter()
49        .flat_map(|x| [("k", array_ref(&x.key_data)), ("v", array_ref(&x.val_data))]);
50    let td = [
51        ("t", array_ref(updates.timestamps())),
52        ("d", array_ref(updates.diffs())),
53    ];
54    let ks_vs = updates
55        .structured()
56        .into_iter()
57        .flat_map(|x| [("k_s", Arc::clone(&x.key)), ("v_s", Arc::clone(&x.val))]);
58
59    // We expect all the top-level fields to be fully defined.
60    let fields = kv.chain(td).chain(ks_vs).map(|(f, a)| (f, a, false));
61    RecordBatch::try_from_iter_with_nullable(fields).expect("valid field definitions")
62}
63
64pub(crate) const ENABLE_ARROW_LGALLOC_CC_SIZES: Config<bool> = Config::new(
65    "persist_enable_arrow_lgalloc_cc_sizes",
66    true,
67    "An incident flag to disable copying decoded arrow data into lgalloc on cc sized clusters.",
68);
69
70pub(crate) const ENABLE_ARROW_LGALLOC_NONCC_SIZES: Config<bool> = Config::new(
71    "persist_enable_arrow_lgalloc_noncc_sizes",
72    false,
73    "A feature flag to enable copying decoded arrow data into lgalloc on non-cc sized clusters.",
74);
75
76fn realloc_data(data: ArrayData, nullable: bool, metrics: &ColumnarMetrics) -> ArrayData {
77    // NB: Arrow generally aligns buffers very coarsely: see arrow::alloc::ALIGNMENT.
78    // However, lgalloc aligns buffers even more coarsely - to the page boundary -
79    // so we never expect alignment issues in practice. If that changes, build()
80    // will return an error below, as it does for all invalid data.
81    let buffers = data
82        .buffers()
83        .iter()
84        .map(|b| realloc_buffer(b, metrics))
85        .collect();
86    let child_data = {
87        let field_iter = mz_persist_types::arrow::fields_for_type(data.data_type()).iter();
88        let child_iter = data.child_data().iter();
89        field_iter
90            .zip(child_iter)
91            .map(|(f, d)| realloc_data(d.clone(), f.is_nullable(), metrics))
92            .collect()
93    };
94    let nulls = if nullable {
95        data.nulls().map(|n| {
96            let buffer = realloc_buffer(n.buffer(), metrics);
97            NullBuffer::new(BooleanBuffer::new(buffer, n.offset(), n.len()))
98        })
99    } else {
100        if data.nulls().is_some() {
101            // This is a workaround for: https://github.com/apache/arrow-rs/issues/6510
102            // It should always be safe to drop the null buffer for a non-nullable field, since
103            // any nulls cannot possibly represent real data and thus must be masked off at
104            // some higher level. We always realloc data we get back from parquet, so this is
105            // a convenient and efficient place to do the rewrite.
106            // Why does this help? Parquet decoding can generate nulls in non-nullable fields
107            // that are only masked by eg. a grandparent, not the direct parent... but some arrow
108            // code expects the parent to mask any nulls in its non-nullable children. Dropping
109            // the buffer here prevents those validations from failing. (Top-level arrays are always
110            // marked nullable, but since they don't have parents that's not a problem either.)
111            metrics.parquet.elided_null_buffers.inc();
112        }
113        None
114    };
115
116    // Note that `build` only performs shallow validations, but since we rebuild the array
117    // recursively we will have performed the equivalent of `ArrayData::validation_full` on
118    // the output.
119    data.into_builder()
120        .buffers(buffers)
121        .child_data(child_data)
122        .nulls(nulls)
123        .build()
124        .expect("reconstructing valid arrow array")
125}
126
127/// Re-allocate the backing storage for a specific array using lgalloc, if it's configured.
128/// (And hopefully-temporarily work around a parquet decoding issue upstream.)
129pub fn realloc_array<A: Array + From<ArrayData>>(array: &A, metrics: &ColumnarMetrics) -> A {
130    let data = array.to_data();
131    // Top-level arrays are always nullable.
132    let data = realloc_data(data, true, metrics);
133    A::from(data)
134}
135
136/// Re-allocate the backing storage for an array ref using lgalloc, if it's configured.
137/// (And hopefully-temporarily work around a parquet decoding issue upstream.)
138pub fn realloc_any(array: ArrayRef, metrics: &ColumnarMetrics) -> ArrayRef {
139    let data = array.into_data();
140    // Top-level arrays are always nullable.
141    let data = realloc_data(data, true, metrics);
142    make_array(data)
143}
144
145fn realloc_buffer(buffer: &Buffer, metrics: &ColumnarMetrics) -> Buffer {
146    let use_lgbytes_mmap = if metrics.is_cc_active {
147        ENABLE_ARROW_LGALLOC_CC_SIZES.get(&metrics.cfg)
148    } else {
149        ENABLE_ARROW_LGALLOC_NONCC_SIZES.get(&metrics.cfg)
150    };
151    let region = if use_lgbytes_mmap {
152        metrics
153            .lgbytes_arrow
154            .try_mmap_region(buffer.as_slice())
155            .ok()
156    } else {
157        None
158    };
159    let Some(region) = region else {
160        return buffer.clone();
161    };
162    let bytes: &[u8] = region.as_ref().to_byte_slice();
163    let ptr: NonNull<[u8]> = bytes.into();
164    // This is fine: see [[NonNull::as_non_null_ptr]] for an unstable version of this usage.
165    let ptr: NonNull<u8> = ptr.cast();
166    // SAFETY: `ptr` is valid for `len` bytes, and kept alive as long as `region` lives.
167    unsafe { Buffer::from_custom_allocation(ptr, bytes.len(), Arc::new(region)) }
168}
169
170/// Converts an [`arrow`] [RecordBatch] into a [BlobTraceUpdates] and reallocate the backing data.
171pub fn decode_arrow_batch(
172    batch: &RecordBatch,
173    metrics: &ColumnarMetrics,
174) -> anyhow::Result<BlobTraceUpdates> {
175    fn try_downcast<A: Array + From<ArrayData> + 'static>(
176        batch: &RecordBatch,
177        name: &'static str,
178        metrics: &ColumnarMetrics,
179    ) -> anyhow::Result<Option<A>> {
180        let Some(array_ref) = batch.column_by_name(name) else {
181            return Ok(None);
182        };
183        let col_ref = array_ref
184            .as_any()
185            .downcast_ref::<A>()
186            .ok_or_else(|| anyhow!("wrong datatype for column {}", name))?;
187        let col = realloc_array(col_ref, metrics);
188        Ok(Some(col))
189    }
190
191    let codec_key = try_downcast::<BinaryArray>(batch, "k", metrics)?;
192    let codec_val = try_downcast::<BinaryArray>(batch, "v", metrics)?;
193    let timestamps = try_downcast::<Int64Array>(batch, "t", metrics)?
194        .ok_or_else(|| anyhow!("missing timestamp column"))?;
195    let diffs = try_downcast::<Int64Array>(batch, "d", metrics)?
196        .ok_or_else(|| anyhow!("missing diff column"))?;
197    let structured_key = batch
198        .column_by_name("k_s")
199        .map(|a| realloc_any(Arc::clone(a), metrics));
200    let structured_val = batch
201        .column_by_name("v_s")
202        .map(|a| realloc_any(Arc::clone(a), metrics));
203
204    let updates = match (codec_key, codec_val, structured_key, structured_val) {
205        (Some(codec_key), Some(codec_val), Some(structured_key), Some(structured_val)) => {
206            BlobTraceUpdates::Both(
207                ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
208                ColumnarRecordsStructuredExt {
209                    key: structured_key,
210                    val: structured_val,
211                },
212            )
213        }
214        (Some(codec_key), Some(codec_val), None, None) => BlobTraceUpdates::Row(
215            ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
216        ),
217        (None, None, Some(structured_key), Some(structured_val)) => BlobTraceUpdates::Structured {
218            key_values: ColumnarRecordsStructuredExt {
219                key: structured_key,
220                val: structured_val,
221            },
222            timestamps,
223            diffs,
224        },
225        (k, v, ks, vs) => {
226            anyhow::bail!(
227                "unexpected mix of key/value columns: k={:?}, v={}, k_s={}, v_s={}",
228                k.is_some(),
229                v.is_some(),
230                ks.is_some(),
231                vs.is_some(),
232            );
233        }
234    };
235
236    Ok(updates)
237}