mz_persist/indexed/columnar/
arrow.rs1use std::ptr::NonNull;
13use std::sync::Arc;
14
15use anyhow::anyhow;
16use arrow::array::{Array, ArrayData, ArrayRef, BinaryArray, Int64Array, RecordBatch, make_array};
17use arrow::buffer::{BooleanBuffer, Buffer, NullBuffer};
18use arrow::datatypes::ToByteSlice;
19use mz_dyncfg::Config;
20
21use crate::indexed::columnar::{ColumnarRecords, ColumnarRecordsStructuredExt};
22use crate::indexed::encoding::BlobTraceUpdates;
23use crate::metrics::ColumnarMetrics;
24
25pub fn encode_arrow_batch(updates: &BlobTraceUpdates) -> RecordBatch {
41 fn array_ref<A: Array + Clone + 'static>(a: &A) -> ArrayRef {
42 Arc::new(a.clone())
43 }
44 let kv = updates
47 .records()
48 .into_iter()
49 .flat_map(|x| [("k", array_ref(&x.key_data)), ("v", array_ref(&x.val_data))]);
50 let td = [
51 ("t", array_ref(updates.timestamps())),
52 ("d", array_ref(updates.diffs())),
53 ];
54 let ks_vs = updates
55 .structured()
56 .into_iter()
57 .flat_map(|x| [("k_s", Arc::clone(&x.key)), ("v_s", Arc::clone(&x.val))]);
58
59 let fields = kv.chain(td).chain(ks_vs).map(|(f, a)| (f, a, false));
61 RecordBatch::try_from_iter_with_nullable(fields).expect("valid field definitions")
62}
63
64pub(crate) const ENABLE_ARROW_LGALLOC_CC_SIZES: Config<bool> = Config::new(
65 "persist_enable_arrow_lgalloc_cc_sizes",
66 true,
67 "An incident flag to disable copying decoded arrow data into lgalloc on cc sized clusters.",
68);
69
70pub(crate) const ENABLE_ARROW_LGALLOC_NONCC_SIZES: Config<bool> = Config::new(
71 "persist_enable_arrow_lgalloc_noncc_sizes",
72 false,
73 "A feature flag to enable copying decoded arrow data into lgalloc on non-cc sized clusters.",
74);
75
76fn realloc_data(data: ArrayData, nullable: bool, metrics: &ColumnarMetrics) -> ArrayData {
77 let buffers = data
82 .buffers()
83 .iter()
84 .map(|b| realloc_buffer(b, metrics))
85 .collect();
86 let child_data = {
87 let field_iter = mz_persist_types::arrow::fields_for_type(data.data_type()).iter();
88 let child_iter = data.child_data().iter();
89 field_iter
90 .zip(child_iter)
91 .map(|(f, d)| realloc_data(d.clone(), f.is_nullable(), metrics))
92 .collect()
93 };
94 let nulls = if nullable {
95 data.nulls().map(|n| {
96 let buffer = realloc_buffer(n.buffer(), metrics);
97 NullBuffer::new(BooleanBuffer::new(buffer, n.offset(), n.len()))
98 })
99 } else {
100 if data.nulls().is_some() {
101 metrics.parquet.elided_null_buffers.inc();
112 }
113 None
114 };
115
116 data.into_builder()
120 .buffers(buffers)
121 .child_data(child_data)
122 .nulls(nulls)
123 .build()
124 .expect("reconstructing valid arrow array")
125}
126
127pub fn realloc_array<A: Array + From<ArrayData>>(array: &A, metrics: &ColumnarMetrics) -> A {
130 let data = array.to_data();
131 let data = realloc_data(data, true, metrics);
133 A::from(data)
134}
135
136pub fn realloc_any(array: ArrayRef, metrics: &ColumnarMetrics) -> ArrayRef {
139 let data = array.into_data();
140 let data = realloc_data(data, true, metrics);
142 make_array(data)
143}
144
145fn realloc_buffer(buffer: &Buffer, metrics: &ColumnarMetrics) -> Buffer {
146 let use_lgbytes_mmap = if metrics.is_cc_active {
147 ENABLE_ARROW_LGALLOC_CC_SIZES.get(&metrics.cfg)
148 } else {
149 ENABLE_ARROW_LGALLOC_NONCC_SIZES.get(&metrics.cfg)
150 };
151 let region = if use_lgbytes_mmap {
152 metrics
153 .lgbytes_arrow
154 .try_mmap_region(buffer.as_slice())
155 .ok()
156 } else {
157 None
158 };
159 let Some(region) = region else {
160 return buffer.clone();
161 };
162 let bytes: &[u8] = region.as_ref().to_byte_slice();
163 let ptr: NonNull<[u8]> = bytes.into();
164 let ptr: NonNull<u8> = ptr.cast();
166 unsafe { Buffer::from_custom_allocation(ptr, bytes.len(), Arc::new(region)) }
168}
169
170pub fn decode_arrow_batch(
172 batch: &RecordBatch,
173 metrics: &ColumnarMetrics,
174) -> anyhow::Result<BlobTraceUpdates> {
175 fn try_downcast<A: Array + From<ArrayData> + 'static>(
176 batch: &RecordBatch,
177 name: &'static str,
178 metrics: &ColumnarMetrics,
179 ) -> anyhow::Result<Option<A>> {
180 let Some(array_ref) = batch.column_by_name(name) else {
181 return Ok(None);
182 };
183 let col_ref = array_ref
184 .as_any()
185 .downcast_ref::<A>()
186 .ok_or_else(|| anyhow!("wrong datatype for column {}", name))?;
187 let col = realloc_array(col_ref, metrics);
188 Ok(Some(col))
189 }
190
191 let codec_key = try_downcast::<BinaryArray>(batch, "k", metrics)?;
192 let codec_val = try_downcast::<BinaryArray>(batch, "v", metrics)?;
193 let timestamps = try_downcast::<Int64Array>(batch, "t", metrics)?
194 .ok_or_else(|| anyhow!("missing timestamp column"))?;
195 let diffs = try_downcast::<Int64Array>(batch, "d", metrics)?
196 .ok_or_else(|| anyhow!("missing diff column"))?;
197 let structured_key = batch
198 .column_by_name("k_s")
199 .map(|a| realloc_any(Arc::clone(a), metrics));
200 let structured_val = batch
201 .column_by_name("v_s")
202 .map(|a| realloc_any(Arc::clone(a), metrics));
203
204 let updates = match (codec_key, codec_val, structured_key, structured_val) {
205 (Some(codec_key), Some(codec_val), Some(structured_key), Some(structured_val)) => {
206 BlobTraceUpdates::Both(
207 ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
208 ColumnarRecordsStructuredExt {
209 key: structured_key,
210 val: structured_val,
211 },
212 )
213 }
214 (Some(codec_key), Some(codec_val), None, None) => BlobTraceUpdates::Row(
215 ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
216 ),
217 (None, None, Some(structured_key), Some(structured_val)) => BlobTraceUpdates::Structured {
218 key_values: ColumnarRecordsStructuredExt {
219 key: structured_key,
220 val: structured_val,
221 },
222 timestamps,
223 diffs,
224 },
225 (k, v, ks, vs) => {
226 anyhow::bail!(
227 "unexpected mix of key/value columns: k={:?}, v={}, k_s={}, v_s={}",
228 k.is_some(),
229 v.is_some(),
230 ks.is_some(),
231 vs.is_some(),
232 );
233 }
234 };
235
236 Ok(updates)
237}