1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910//! Apache Arrow encodings and utils for persist data
1112use std::ptr::NonNull;
13use std::sync::Arc;
1415use anyhow::anyhow;
16use arrow::array::{Array, ArrayData, ArrayRef, BinaryArray, Int64Array, RecordBatch, make_array};
17use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer};
18use arrow::datatypes::ToByteSlice;
19use mz_dyncfg::Config;
2021use crate::indexed::columnar::{ColumnarRecords, ColumnarRecordsStructuredExt};
22use crate::indexed::encoding::BlobTraceUpdates;
23use crate::metrics::ColumnarMetrics;
2425/// Converts a [`ColumnarRecords`] into [`arrow`] columns.
26///
27/// Both Time and Diff are presented externally to persist users as a type
28/// parameter that implements [mz_persist_types::Codec64]. Our columnar format
29/// intentionally stores them both as i64 columns (as opposed to something like
30/// a fixed width binary column) because this allows us additional compression
31/// options.
32///
33/// Also note that we intentionally use an i64 over a u64 for Time. Over the
34/// range `[0, i64::MAX]`, the bytes are the same and we've talked at various
35/// times about changing Time in mz to an i64. Both millis since unix epoch and
36/// nanos since unix epoch easily fit into this range (the latter until some
37/// time after year 2200). Using a i64 might be a pessimization for a
38/// non-realtime mz source with u64 timestamps in the range `(i64::MAX,
39/// u64::MAX]`, but realtime sources are overwhelmingly the common case.
40pub fn encode_arrow_batch(updates: &BlobTraceUpdates) -> RecordBatch {
41fn array_ref<A: Array + Clone + 'static>(a: &A) -> ArrayRef {
42 Arc::new(a.clone())
43 }
44// For historical reasons, the codec-encoded columns are placed before T/D,
45 // and the structured-encoding columns are placed after.
46let kv = updates
47 .records()
48 .into_iter()
49 .flat_map(|x| [("k", array_ref(&x.key_data)), ("v", array_ref(&x.val_data))]);
50let td = [
51 ("t", array_ref(updates.timestamps())),
52 ("d", array_ref(updates.diffs())),
53 ];
54let ks_vs = updates
55 .structured()
56 .into_iter()
57 .flat_map(|x| [("k_s", Arc::clone(&x.key)), ("v_s", Arc::clone(&x.val))]);
5859// We expect all the top-level fields to be fully defined.
60let fields = kv.chain(td).chain(ks_vs).map(|(f, a)| (f, a, false));
61 RecordBatch::try_from_iter_with_nullable(fields).expect("valid field definitions")
62}
6364pub(crate) const ENABLE_ARROW_LGALLOC_CC_SIZES: Config<bool> = Config::new(
65"persist_enable_arrow_lgalloc_cc_sizes",
66true,
67"An incident flag to disable copying decoded arrow data into lgalloc on cc sized clusters.",
68);
6970pub(crate) const ENABLE_ARROW_LGALLOC_NONCC_SIZES: Config<bool> = Config::new(
71"persist_enable_arrow_lgalloc_noncc_sizes",
72false,
73"A feature flag to enable copying decoded arrow data into lgalloc on non-cc sized clusters.",
74);
7576fn realloc_data(data: ArrayData, nullable: bool, metrics: &ColumnarMetrics) -> ArrayData {
77// NB: Arrow generally aligns buffers very coarsely: see arrow::alloc::ALIGNMENT.
78 // However, lgalloc aligns buffers even more coarsely - to the page boundary -
79 // so we never expect alignment issues in practice. If that changes, build()
80 // will return an error below, as it does for all invalid data.
81let buffers = data
82 .buffers()
83 .iter()
84 .map(|b| realloc_buffer(b, metrics))
85 .collect();
86let child_data = {
87let field_iter = mz_persist_types::arrow::fields_for_type(data.data_type()).iter();
88let child_iter = data.child_data().iter();
89 field_iter
90 .zip(child_iter)
91 .map(|(f, d)| realloc_data(d.clone(), f.is_nullable(), metrics))
92 .collect()
93 };
94let nulls = if nullable {
95 data.nulls().map(|n| {
96let buffer = realloc_buffer(n.buffer(), metrics);
97 NullBuffer::new(BooleanBuffer::new(buffer, n.offset(), n.len()))
98 })
99 } else {
100if data.nulls().is_some() {
101// This is a workaround for: https://github.com/apache/arrow-rs/issues/6510
102 // It should always be safe to drop the null buffer for a non-nullable field, since
103 // any nulls cannot possibly represent real data and thus must be masked off at
104 // some higher level. We always realloc data we get back from parquet, so this is
105 // a convenient and efficient place to do the rewrite.
106 // Why does this help? Parquet decoding can generate nulls in non-nullable fields
107 // that are only masked by eg. a grandparent, not the direct parent... but some arrow
108 // code expects the parent to mask any nulls in its non-nullable children. Dropping
109 // the buffer here prevents those validations from failing. (Top-level arrays are always
110 // marked nullable, but since they don't have parents that's not a problem either.)
111metrics.parquet.elided_null_buffers.inc();
112 }
113None
114};
115116// Note that `build` only performs shallow validations, but since we rebuild the array
117 // recursively we will have performed the equivalent of `ArrayData::validation_full` on
118 // the output.
119data.into_builder()
120 .buffers(buffers)
121 .child_data(child_data)
122 .nulls(nulls)
123 .build()
124 .expect("reconstructing valid arrow array")
125}
126127/// Re-allocate the backing storage for a specific array using lgalloc, if it's configured.
128/// (And hopefully-temporarily work around a parquet decoding issue upstream.)
129pub fn realloc_array<A: Array + From<ArrayData>>(array: &A, metrics: &ColumnarMetrics) -> A {
130let data = array.to_data();
131// Top-level arrays are always nullable.
132let data = realloc_data(data, true, metrics);
133 A::from(data)
134}
135136/// Re-allocate the backing storage for an array ref using lgalloc, if it's configured.
137/// (And hopefully-temporarily work around a parquet decoding issue upstream.)
138pub fn realloc_any(array: ArrayRef, metrics: &ColumnarMetrics) -> ArrayRef {
139let data = array.into_data();
140// Top-level arrays are always nullable.
141let data = realloc_data(data, true, metrics);
142 make_array(data)
143}
144145fn realloc_buffer(buffer: &Buffer, metrics: &ColumnarMetrics) -> Buffer {
146let use_lgbytes_mmap = if metrics.is_cc_active {
147 ENABLE_ARROW_LGALLOC_CC_SIZES.get(&metrics.cfg)
148 } else {
149 ENABLE_ARROW_LGALLOC_NONCC_SIZES.get(&metrics.cfg)
150 };
151let region = if use_lgbytes_mmap {
152 metrics
153 .lgbytes_arrow
154 .try_mmap_region(buffer.as_slice())
155 .ok()
156 } else {
157None
158};
159let Some(region) = region else {
160return buffer.clone();
161 };
162let bytes: &[u8] = region.as_ref().to_byte_slice();
163let ptr: NonNull<[u8]> = bytes.into();
164// This is fine: see [[NonNull::as_non_null_ptr]] for an unstable version of this usage.
165let ptr: NonNull<u8> = ptr.cast();
166// SAFETY: `ptr` is valid for `len` bytes, and kept alive as long as `region` lives.
167unsafe { Buffer::from_custom_allocation(ptr, bytes.len(), Arc::new(region)) }
168}
169170/// Converts an [`arrow`] [RecordBatch] into a [BlobTraceUpdates] and reallocate the backing data.
171pub fn decode_arrow_batch(
172 batch: &RecordBatch,
173 metrics: &ColumnarMetrics,
174) -> anyhow::Result<BlobTraceUpdates> {
175fn try_downcast<A: Array + From<ArrayData> + 'static>(
176 batch: &RecordBatch,
177 name: &'static str,
178 metrics: &ColumnarMetrics,
179 ) -> anyhow::Result<Option<A>> {
180let Some(array_ref) = batch.column_by_name(name) else {
181return Ok(None);
182 };
183let col_ref = array_ref
184 .as_any()
185 .downcast_ref::<A>()
186 .ok_or_else(|| anyhow!("wrong datatype for column {}", name))?;
187let col = realloc_array(col_ref, metrics);
188Ok(Some(col))
189 }
190191let codec_key = try_downcast::<BinaryArray>(batch, "k", metrics)?;
192let codec_val = try_downcast::<BinaryArray>(batch, "v", metrics)?;
193let timestamps = try_downcast::<Int64Array>(batch, "t", metrics)?
194.ok_or_else(|| anyhow!("missing timestamp column"))?;
195let diffs = try_downcast::<Int64Array>(batch, "d", metrics)?
196.ok_or_else(|| anyhow!("missing diff column"))?;
197let structured_key = batch
198 .column_by_name("k_s")
199 .map(|a| realloc_any(Arc::clone(a), metrics));
200let structured_val = batch
201 .column_by_name("v_s")
202 .map(|a| realloc_any(Arc::clone(a), metrics));
203204let updates = match (codec_key, codec_val, structured_key, structured_val) {
205 (Some(codec_key), Some(codec_val), Some(structured_key), Some(structured_val)) => {
206 BlobTraceUpdates::Both(
207 ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
208 ColumnarRecordsStructuredExt {
209 key: structured_key,
210 val: structured_val,
211 },
212 )
213 }
214 (Some(codec_key), Some(codec_val), None, None) => BlobTraceUpdates::Row(
215 ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
216 ),
217 (None, None, Some(structured_key), Some(structured_val)) => BlobTraceUpdates::Structured {
218 key_values: ColumnarRecordsStructuredExt {
219 key: structured_key,
220 val: structured_val,
221 },
222 timestamps,
223 diffs,
224 },
225 (k, v, ks, vs) => {
226anyhow::bail!(
227"unexpected mix of key/value columns: k={:?}, v={}, k_s={}, v_s={}",
228 k.is_some(),
229 v.is_some(),
230 ks.is_some(),
231 vs.is_some(),
232 );
233 }
234 };
235236Ok(updates)
237}