mz_persist/indexed/columnar/
arrow.rs1use std::ptr::NonNull;
13use std::sync::Arc;
14
15use anyhow::anyhow;
16use arrow::array::{Array, ArrayData, ArrayRef, BinaryArray, Int64Array, RecordBatch, make_array};
17use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer};
18use arrow::datatypes::ToByteSlice;
19use itertools::Itertools;
20use mz_dyncfg::Config;
21
22use crate::indexed::columnar::{ColumnarRecords, ColumnarRecordsStructuredExt};
23use crate::indexed::encoding::BlobTraceUpdates;
24use crate::metrics::ColumnarMetrics;
25
26pub fn encode_arrow_batch(updates: &BlobTraceUpdates) -> RecordBatch {
42 fn array_ref<A: Array + Clone + 'static>(a: &A) -> ArrayRef {
43 Arc::new(a.clone())
44 }
45 let kv = updates
48 .records()
49 .into_iter()
50 .flat_map(|x| [("k", array_ref(&x.key_data)), ("v", array_ref(&x.val_data))]);
51 let td = [
52 ("t", array_ref(updates.timestamps())),
53 ("d", array_ref(updates.diffs())),
54 ];
55 let ks_vs = updates
56 .structured()
57 .into_iter()
58 .flat_map(|x| [("k_s", Arc::clone(&x.key)), ("v_s", Arc::clone(&x.val))]);
59
60 let fields = kv.chain(td).chain(ks_vs).map(|(f, a)| (f, a, false));
62 RecordBatch::try_from_iter_with_nullable(fields).expect("valid field definitions")
63}
64
65pub(crate) const ENABLE_ARROW_LGALLOC_CC_SIZES: Config<bool> = Config::new(
66 "persist_enable_arrow_lgalloc_cc_sizes",
67 true,
68 "An incident flag to disable copying decoded arrow data into lgalloc on cc sized clusters.",
69);
70
71pub(crate) const ENABLE_ARROW_LGALLOC_NONCC_SIZES: Config<bool> = Config::new(
72 "persist_enable_arrow_lgalloc_noncc_sizes",
73 false,
74 "A feature flag to enable copying decoded arrow data into lgalloc on non-cc sized clusters.",
75);
76
77fn realloc_data(data: ArrayData, nullable: bool, metrics: &ColumnarMetrics) -> ArrayData {
78 let buffers = data
83 .buffers()
84 .iter()
85 .map(|b| realloc_buffer(b, metrics))
86 .collect();
87 let child_data = {
88 let field_iter = mz_persist_types::arrow::fields_for_type(data.data_type()).iter();
89 let child_iter = data.child_data().iter();
90 field_iter
91 .zip_eq(child_iter)
92 .map(|(f, d)| realloc_data(d.clone(), f.is_nullable(), metrics))
93 .collect()
94 };
95 let nulls = if nullable {
96 data.nulls().map(|n| {
97 let buffer = realloc_buffer(n.buffer(), metrics);
98 NullBuffer::new(BooleanBuffer::new(buffer, n.offset(), n.len()))
99 })
100 } else {
101 if data.nulls().is_some() {
102 metrics.parquet.elided_null_buffers.inc();
113 }
114 None
115 };
116
117 data.into_builder()
121 .buffers(buffers)
122 .child_data(child_data)
123 .nulls(nulls)
124 .build()
125 .expect("reconstructing valid arrow array")
126}
127
128pub fn realloc_array<A: Array + From<ArrayData>>(array: &A, metrics: &ColumnarMetrics) -> A {
131 let data = array.to_data();
132 let data = realloc_data(data, true, metrics);
134 A::from(data)
135}
136
137pub fn realloc_any(array: ArrayRef, metrics: &ColumnarMetrics) -> ArrayRef {
140 let data = array.into_data();
141 let data = realloc_data(data, true, metrics);
143 make_array(data)
144}
145
146fn realloc_buffer(buffer: &Buffer, metrics: &ColumnarMetrics) -> Buffer {
147 let use_lgbytes_mmap = if metrics.is_cc_active {
148 ENABLE_ARROW_LGALLOC_CC_SIZES.get(&metrics.cfg)
149 } else {
150 ENABLE_ARROW_LGALLOC_NONCC_SIZES.get(&metrics.cfg)
151 };
152 let region = if use_lgbytes_mmap {
153 metrics
154 .lgbytes_arrow
155 .try_mmap_region(buffer.as_slice())
156 .ok()
157 } else {
158 None
159 };
160 let Some(region) = region else {
161 return buffer.clone();
162 };
163 let bytes: &[u8] = region.as_ref().to_byte_slice();
164 let ptr: NonNull<[u8]> = bytes.into();
165 let ptr: NonNull<u8> = ptr.cast();
167 unsafe { Buffer::from_custom_allocation(ptr, bytes.len(), Arc::new(region)) }
169}
170
171pub fn decode_arrow_batch(
173 batch: &RecordBatch,
174 metrics: &ColumnarMetrics,
175) -> anyhow::Result<BlobTraceUpdates> {
176 fn try_downcast<A: Array + From<ArrayData> + 'static>(
177 batch: &RecordBatch,
178 name: &'static str,
179 metrics: &ColumnarMetrics,
180 ) -> anyhow::Result<Option<A>> {
181 let Some(array_ref) = batch.column_by_name(name) else {
182 return Ok(None);
183 };
184 let col_ref = array_ref
185 .as_any()
186 .downcast_ref::<A>()
187 .ok_or_else(|| anyhow!("wrong datatype for column {}", name))?;
188 let col = realloc_array(col_ref, metrics);
189 Ok(Some(col))
190 }
191
192 let codec_key = try_downcast::<BinaryArray>(batch, "k", metrics)?;
193 let codec_val = try_downcast::<BinaryArray>(batch, "v", metrics)?;
194 let timestamps = try_downcast::<Int64Array>(batch, "t", metrics)?
195 .ok_or_else(|| anyhow!("missing timestamp column"))?;
196 let diffs = try_downcast::<Int64Array>(batch, "d", metrics)?
197 .ok_or_else(|| anyhow!("missing diff column"))?;
198 let structured_key = batch
199 .column_by_name("k_s")
200 .map(|a| realloc_any(Arc::clone(a), metrics));
201 let structured_val = batch
202 .column_by_name("v_s")
203 .map(|a| realloc_any(Arc::clone(a), metrics));
204
205 let updates = match (codec_key, codec_val, structured_key, structured_val) {
206 (Some(codec_key), Some(codec_val), Some(structured_key), Some(structured_val)) => {
207 BlobTraceUpdates::Both(
208 ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
209 ColumnarRecordsStructuredExt {
210 key: structured_key,
211 val: structured_val,
212 },
213 )
214 }
215 (Some(codec_key), Some(codec_val), None, None) => BlobTraceUpdates::Row(
216 ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
217 ),
218 (None, None, Some(structured_key), Some(structured_val)) => BlobTraceUpdates::Structured {
219 key_values: ColumnarRecordsStructuredExt {
220 key: structured_key,
221 val: structured_val,
222 },
223 timestamps,
224 diffs,
225 },
226 (k, v, ks, vs) => {
227 anyhow::bail!(
228 "unexpected mix of key/value columns: k={:?}, v={}, k_s={}, v_s={}",
229 k.is_some(),
230 v.is_some(),
231 ks.is_some(),
232 vs.is_some(),
233 );
234 }
235 };
236
237 Ok(updates)
238}