mz_persist/indexed/columnar/
arrow.rs1use std::sync::Arc;
13
14use anyhow::anyhow;
15use arrow::array::{Array, ArrayData, ArrayRef, BinaryArray, Int64Array, RecordBatch, make_array};
16use arrow::buffer::{BooleanBuffer, NullBuffer};
17use itertools::Itertools;
18
19use crate::indexed::columnar::{ColumnarRecords, ColumnarRecordsStructuredExt};
20use crate::indexed::encoding::BlobTraceUpdates;
21use crate::metrics::ColumnarMetrics;
22
23pub fn encode_arrow_batch(updates: &BlobTraceUpdates) -> RecordBatch {
39 fn array_ref<A: Array + Clone + 'static>(a: &A) -> ArrayRef {
40 Arc::new(a.clone())
41 }
42 let kv = updates
45 .records()
46 .into_iter()
47 .flat_map(|x| [("k", array_ref(&x.key_data)), ("v", array_ref(&x.val_data))]);
48 let td = [
49 ("t", array_ref(updates.timestamps())),
50 ("d", array_ref(updates.diffs())),
51 ];
52 let ks_vs = updates
53 .structured()
54 .into_iter()
55 .flat_map(|x| [("k_s", Arc::clone(&x.key)), ("v_s", Arc::clone(&x.val))]);
56
57 let fields = kv.chain(td).chain(ks_vs).map(|(f, a)| (f, a, false));
59 RecordBatch::try_from_iter_with_nullable(fields).expect("valid field definitions")
60}
61
62fn rebuild_data(data: ArrayData, nullable: bool, metrics: &ColumnarMetrics) -> ArrayData {
72 let buffers = data.buffers().to_vec();
73 let child_data = {
74 let field_iter = mz_persist_types::arrow::fields_for_type(data.data_type()).iter();
75 let child_iter = data.child_data().iter();
76 field_iter
77 .zip_eq(child_iter)
78 .map(|(f, d)| rebuild_data(d.clone(), f.is_nullable(), metrics))
79 .collect()
80 };
81 let nulls = if nullable {
82 data.nulls()
83 .map(|n| NullBuffer::new(BooleanBuffer::new(n.buffer().clone(), n.offset(), n.len())))
84 } else {
85 if data.nulls().is_some() {
86 metrics.parquet.elided_null_buffers.inc();
87 }
88 None
89 };
90
91 data.into_builder()
95 .buffers(buffers)
96 .child_data(child_data)
97 .nulls(nulls)
98 .build()
99 .expect("reconstructing valid arrow array")
100}
101
102pub fn realloc_array<A: Array + From<ArrayData>>(array: &A, metrics: &ColumnarMetrics) -> A {
104 let data = array.to_data();
105 let data = rebuild_data(data, true, metrics);
107 A::from(data)
108}
109
110pub fn realloc_any(array: ArrayRef, metrics: &ColumnarMetrics) -> ArrayRef {
112 let data = array.into_data();
113 let data = rebuild_data(data, true, metrics);
115 make_array(data)
116}
117
118pub fn decode_arrow_batch(
120 batch: &RecordBatch,
121 metrics: &ColumnarMetrics,
122) -> anyhow::Result<BlobTraceUpdates> {
123 fn try_downcast<A: Array + From<ArrayData> + 'static>(
124 batch: &RecordBatch,
125 name: &'static str,
126 metrics: &ColumnarMetrics,
127 ) -> anyhow::Result<Option<A>> {
128 let Some(array_ref) = batch.column_by_name(name) else {
129 return Ok(None);
130 };
131 let col_ref = array_ref
132 .as_any()
133 .downcast_ref::<A>()
134 .ok_or_else(|| anyhow!("wrong datatype for column {}", name))?;
135 let col = realloc_array(col_ref, metrics);
136 Ok(Some(col))
137 }
138
139 let codec_key = try_downcast::<BinaryArray>(batch, "k", metrics)?;
140 let codec_val = try_downcast::<BinaryArray>(batch, "v", metrics)?;
141 let timestamps = try_downcast::<Int64Array>(batch, "t", metrics)?
142 .ok_or_else(|| anyhow!("missing timestamp column"))?;
143 let diffs = try_downcast::<Int64Array>(batch, "d", metrics)?
144 .ok_or_else(|| anyhow!("missing diff column"))?;
145 let structured_key = batch
146 .column_by_name("k_s")
147 .map(|a| realloc_any(Arc::clone(a), metrics));
148 let structured_val = batch
149 .column_by_name("v_s")
150 .map(|a| realloc_any(Arc::clone(a), metrics));
151
152 let updates = match (codec_key, codec_val, structured_key, structured_val) {
153 (Some(codec_key), Some(codec_val), Some(structured_key), Some(structured_val)) => {
154 BlobTraceUpdates::Both(
155 ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
156 ColumnarRecordsStructuredExt {
157 key: structured_key,
158 val: structured_val,
159 },
160 )
161 }
162 (Some(codec_key), Some(codec_val), None, None) => BlobTraceUpdates::Row(
163 ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
164 ),
165 (None, None, Some(structured_key), Some(structured_val)) => BlobTraceUpdates::Structured {
166 key_values: ColumnarRecordsStructuredExt {
167 key: structured_key,
168 val: structured_val,
169 },
170 timestamps,
171 diffs,
172 },
173 (k, v, ks, vs) => {
174 anyhow::bail!(
175 "unexpected mix of key/value columns: k={:?}, v={}, k_s={}, v_s={}",
176 k.is_some(),
177 v.is_some(),
178 ks.is_some(),
179 vs.is_some(),
180 );
181 }
182 };
183
184 Ok(updates)
185}