1use std::fmt;
14use std::mem::size_of;
15
16use ::arrow::array::{
17 Array, ArrayRef, AsArray, BinaryArray, BinaryBuilder, Int64Array, make_array,
18};
19use ::arrow::datatypes::ToByteSlice;
20use mz_persist_types::arrow::{ArrayOrd, ProtoArrayData};
21use mz_proto::{ProtoType, RustType, TryFromProtoError};
22
23use crate::indexed::columnar::arrow::realloc_array;
24use crate::metrics::ColumnarMetrics;
25
26pub mod arrow;
27pub mod parquet;
28
29#[allow(clippy::as_conversions)]
46pub const KEY_VAL_DATA_MAX_LEN: usize = i32::MAX as usize;
47
48const BYTES_PER_KEY_VAL_OFFSET: usize = 4;
49
50#[derive(Clone, PartialEq)]
76pub struct ColumnarRecords {
77 len: usize,
78 key_data: BinaryArray,
79 val_data: BinaryArray,
80 timestamps: Int64Array,
81 diffs: Int64Array,
82}
83
84impl Default for ColumnarRecords {
85 fn default() -> Self {
86 Self {
87 len: 0,
88 key_data: BinaryArray::from_vec(vec![]),
89 val_data: BinaryArray::from_vec(vec![]),
90 timestamps: Int64Array::from_iter_values([]),
91 diffs: Int64Array::from_iter_values([]),
92 }
93 }
94}
95
96impl fmt::Debug for ColumnarRecords {
97 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
98 fmt.debug_list().entries(self.iter()).finish()
99 }
100}
101
102impl ColumnarRecords {
103 pub fn new(
105 key_data: BinaryArray,
106 val_data: BinaryArray,
107 timestamps: Int64Array,
108 diffs: Int64Array,
109 ) -> Self {
110 let len = key_data.len();
111 let records = Self {
112 len,
113 key_data,
114 val_data,
115 timestamps,
116 diffs,
117 };
118 assert_eq!(records.validate(), Ok(()));
119 records
120 }
121
122 pub fn len(&self) -> usize {
125 self.len
126 }
127
128 pub fn keys(&self) -> &BinaryArray {
130 &self.key_data
131 }
132
133 pub fn vals(&self) -> &BinaryArray {
135 &self.val_data
136 }
137
138 pub fn timestamps(&self) -> &Int64Array {
140 &self.timestamps
141 }
142
143 pub fn diffs(&self) -> &Int64Array {
145 &self.diffs
146 }
147
148 pub fn goodbytes(&self) -> usize {
151 self.key_data.values().len()
152 + self.val_data.values().len()
153 + self.timestamps.values().inner().len()
154 + self.diffs.values().inner().len()
155 }
156
157 pub fn get(&self, idx: usize) -> Option<((&[u8], &[u8]), [u8; 8], [u8; 8])> {
161 if idx >= self.len {
162 return None;
163 }
164
165 let key = self.key_data.value(idx);
169 let val = self.val_data.value(idx);
170 let ts = i64::to_le_bytes(self.timestamps.values()[idx]);
171 let diff = i64::to_le_bytes(self.diffs.values()[idx]);
172 Some(((key, val), ts, diff))
173 }
174
175 pub fn iter(
177 &self,
178 ) -> impl Iterator<Item = ((&[u8], &[u8]), [u8; 8], [u8; 8])> + ExactSizeIterator {
179 (0..self.len).map(move |idx| self.get(idx).unwrap())
180 }
181
182 pub fn concat(records: &[ColumnarRecords], metrics: &ColumnarMetrics) -> ColumnarRecords {
184 match records.len() {
185 0 => return ColumnarRecords::default(),
186 1 => return records[0].clone(),
187 _ => {}
188 }
189
190 let mut concat_array = vec![];
191 let mut concat = |get: fn(&ColumnarRecords) -> &dyn Array| {
192 concat_array.extend(records.iter().map(get));
193 let res = ::arrow::compute::concat(&concat_array).expect("same type");
194 concat_array.clear();
195 res
196 };
197
198 Self {
199 len: records.iter().map(|c| c.len).sum(),
200 key_data: realloc_array(concat(|c| &c.key_data).as_binary(), metrics),
201 val_data: realloc_array(concat(|c| &c.val_data).as_binary(), metrics),
202 timestamps: realloc_array(concat(|c| &c.timestamps).as_primitive(), metrics),
203 diffs: realloc_array(concat(|c| &c.diffs).as_primitive(), metrics),
204 }
205 }
206}
207
208#[derive(Debug)]
211pub struct ColumnarRecordsBuilder {
212 len: usize,
213 key_data: BinaryBuilder,
214 val_data: BinaryBuilder,
215 timestamps: Vec<i64>,
216 diffs: Vec<i64>,
217}
218
219impl Default for ColumnarRecordsBuilder {
220 fn default() -> Self {
221 ColumnarRecordsBuilder {
222 len: 0,
223 key_data: BinaryBuilder::new(),
224 val_data: BinaryBuilder::new(),
225 timestamps: Vec::new(),
226 diffs: Vec::new(),
227 }
228 }
229}
230
231impl ColumnarRecordsBuilder {
232 pub fn with_capacity(items: usize, key_bytes: usize, val_bytes: usize) -> Self {
235 let key_data = BinaryBuilder::with_capacity(items, key_bytes);
236 let val_data = BinaryBuilder::with_capacity(items, val_bytes);
237 let timestamps = Vec::with_capacity(items);
238 let diffs = Vec::with_capacity(items);
239 Self {
240 len: 0,
241 key_data,
242 val_data,
243 timestamps,
244 diffs,
245 }
246 }
247
248 pub fn len(&self) -> usize {
251 self.len
252 }
253
254 pub fn can_fit(&self, key: &[u8], val: &[u8], limit: usize) -> bool {
260 let key_data_size = self.key_data.values_slice().len()
261 + self.key_data.offsets_slice().to_byte_slice().len()
262 + key.len();
263 let val_data_size = self.val_data.values_slice().len()
264 + self.val_data.offsets_slice().to_byte_slice().len()
265 + val.len();
266 key_data_size <= limit && val_data_size <= limit
267 }
268
269 pub fn total_bytes(&self) -> usize {
272 self.key_data.values_slice().len()
273 + self.key_data.offsets_slice().to_byte_slice().len()
274 + self.val_data.values_slice().len()
275 + self.val_data.offsets_slice().to_byte_slice().len()
276 + self.timestamps.to_byte_slice().len()
277 + self.diffs.to_byte_slice().len()
278 }
279
280 #[must_use]
286 pub fn push(&mut self, record: ((&[u8], &[u8]), [u8; 8], [u8; 8])) -> bool {
287 let ((key, val), ts, diff) = record;
288
289 if !self.can_fit(key, val, KEY_VAL_DATA_MAX_LEN) {
292 return false;
293 }
294
295 self.key_data.append_value(key);
296 self.val_data.append_value(val);
297 self.timestamps.push(i64::from_le_bytes(ts));
298 self.diffs.push(i64::from_le_bytes(diff));
299 self.len += 1;
300
301 true
302 }
303
304 pub fn finish(mut self, _metrics: &ColumnarMetrics) -> ColumnarRecords {
306 let ret = ColumnarRecords {
310 len: self.len,
311 key_data: BinaryBuilder::finish(&mut self.key_data),
312 val_data: BinaryBuilder::finish(&mut self.val_data),
313 timestamps: self.timestamps.into(),
314 diffs: self.diffs.into(),
315 };
316 debug_assert_eq!(ret.validate(), Ok(()));
317 ret
318 }
319
320 pub fn columnar_record_size(key_bytes_len: usize, value_bytes_len: usize) -> usize {
322 (key_bytes_len + BYTES_PER_KEY_VAL_OFFSET)
323 + (value_bytes_len + BYTES_PER_KEY_VAL_OFFSET)
324 + (2 * size_of::<u64>()) }
326}
327
328impl ColumnarRecords {
329 fn validate(&self) -> Result<(), String> {
330 let validate_array = |name: &str, array: &dyn Array| {
331 let len = array.len();
332 if len != self.len {
333 return Err(format!("expected {} {name} got {len}", self.len));
334 }
335 let null_count = array.null_count();
336 if null_count > 0 {
337 return Err(format!("{null_count} unexpected nulls in {name} array"));
338 }
339 Ok(())
340 };
341
342 let key_data_size =
343 self.key_data.values().len() + self.key_data.offsets().inner().inner().len();
344 if key_data_size > KEY_VAL_DATA_MAX_LEN {
345 return Err(format!(
346 "expected encoded key offsets and data size to be less than or equal to {} got {}",
347 KEY_VAL_DATA_MAX_LEN, key_data_size
348 ));
349 }
350 validate_array("keys", &self.key_data)?;
351
352 let val_data_size =
353 self.val_data.values().len() + self.val_data.offsets().inner().inner().len();
354 if val_data_size > KEY_VAL_DATA_MAX_LEN {
355 return Err(format!(
356 "expected encoded val offsets and data size to be less than or equal to {} got {}",
357 KEY_VAL_DATA_MAX_LEN, val_data_size
358 ));
359 }
360 validate_array("vals", &self.val_data)?;
361
362 if self.diffs.len() != self.len {
363 return Err(format!(
364 "expected {} diffs got {}",
365 self.len,
366 self.diffs.len()
367 ));
368 }
369 if self.timestamps.len() != self.len {
370 return Err(format!(
371 "expected {} timestamps got {}",
372 self.len,
373 self.timestamps.len()
374 ));
375 }
376
377 Ok(())
378 }
379}
380
381#[derive(Debug, Clone)]
390pub struct ColumnarRecordsStructuredExt {
391 pub key: ArrayRef,
397 pub val: ArrayRef,
403}
404
405impl PartialEq for ColumnarRecordsStructuredExt {
406 fn eq(&self, other: &Self) -> bool {
407 *self.key == *other.key && *self.val == *other.val
408 }
409}
410
411impl ColumnarRecordsStructuredExt {
412 pub fn into_proto(&self) -> (ProtoArrayData, ProtoArrayData) {
414 let key = self.key.to_data().into_proto();
415 let val = self.val.to_data().into_proto();
416
417 (key, val)
418 }
419
420 pub fn from_proto(
422 key: Option<ProtoArrayData>,
423 val: Option<ProtoArrayData>,
424 ) -> Result<Option<Self>, TryFromProtoError> {
425 let key = key.map(|d| d.into_rust()).transpose()?.map(make_array);
426 let val = val.map(|d| d.into_rust()).transpose()?.map(make_array);
427
428 let ext = match (key, val) {
429 (Some(key), Some(val)) => Some(ColumnarRecordsStructuredExt { key, val }),
430 x @ (Some(_), None) | x @ (None, Some(_)) => {
431 mz_ore::soft_panic_or_log!("found only one of key or val, {x:?}");
432 None
433 }
434 (None, None) => None,
435 };
436 Ok(ext)
437 }
438
439 pub fn goodbytes(&self) -> usize {
441 ArrayOrd::new(self.key.as_ref()).goodbytes() + ArrayOrd::new(self.val.as_ref()).goodbytes()
442 }
443}
444
445#[cfg(test)]
446mod tests {
447 use mz_persist_types::Codec64;
448
449 use super::*;
450
451 #[mz_ore::test]
455 fn columnar_records() {
456 let metrics = ColumnarMetrics::disconnected();
457 let builder = ColumnarRecordsBuilder::default();
458
459 let records = builder.finish(&metrics);
461 let reads: Vec<_> = records.iter().collect();
462 assert_eq!(reads, vec![]);
463
464 let updates: Vec<((Vec<u8>, Vec<u8>), u64, i64)> = vec![
466 (("".into(), "".into()), 0, 0),
467 (("".into(), "".into()), 1, 1),
468 ];
469 let mut builder = ColumnarRecordsBuilder::default();
470 for ((key, val), time, diff) in updates.iter() {
471 assert!(builder.push(((key, val), u64::encode(time), i64::encode(diff))));
472 }
473
474 let records = builder.finish(&metrics);
475 let reads: Vec<_> = records
476 .iter()
477 .map(|((k, v), t, d)| ((k.to_vec(), v.to_vec()), u64::decode(t), i64::decode(d)))
478 .collect();
479 assert_eq!(reads, updates);
480 }
481}