1use std::fmt;
14use std::mem::size_of;
15
16use ::arrow::array::{
17    Array, ArrayRef, AsArray, BinaryArray, BinaryBuilder, Int64Array, make_array,
18};
19use ::arrow::datatypes::ToByteSlice;
20use mz_persist_types::arrow::{ArrayOrd, ProtoArrayData};
21use mz_proto::{ProtoType, RustType, TryFromProtoError};
22
23use crate::indexed::columnar::arrow::realloc_array;
24use crate::metrics::ColumnarMetrics;
25
26pub mod arrow;
27pub mod parquet;
28
29#[allow(clippy::as_conversions)]
46pub const KEY_VAL_DATA_MAX_LEN: usize = i32::MAX as usize;
47
48const BYTES_PER_KEY_VAL_OFFSET: usize = 4;
49
50#[derive(Clone, PartialEq)]
76pub struct ColumnarRecords {
77    len: usize,
78    key_data: BinaryArray,
79    val_data: BinaryArray,
80    timestamps: Int64Array,
81    diffs: Int64Array,
82}
83
84impl Default for ColumnarRecords {
85    fn default() -> Self {
86        Self {
87            len: 0,
88            key_data: BinaryArray::from_vec(vec![]),
89            val_data: BinaryArray::from_vec(vec![]),
90            timestamps: Int64Array::from_iter_values([]),
91            diffs: Int64Array::from_iter_values([]),
92        }
93    }
94}
95
96impl fmt::Debug for ColumnarRecords {
97    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
98        fmt.debug_list().entries(self.iter()).finish()
99    }
100}
101
102impl ColumnarRecords {
103    pub fn new(
105        key_data: BinaryArray,
106        val_data: BinaryArray,
107        timestamps: Int64Array,
108        diffs: Int64Array,
109    ) -> Self {
110        let len = key_data.len();
111        let records = Self {
112            len,
113            key_data,
114            val_data,
115            timestamps,
116            diffs,
117        };
118        assert_eq!(records.validate(), Ok(()));
119        records
120    }
121
122    pub fn len(&self) -> usize {
125        self.len
126    }
127
128    pub fn keys(&self) -> &BinaryArray {
130        &self.key_data
131    }
132
133    pub fn vals(&self) -> &BinaryArray {
135        &self.val_data
136    }
137
138    pub fn timestamps(&self) -> &Int64Array {
140        &self.timestamps
141    }
142
143    pub fn diffs(&self) -> &Int64Array {
145        &self.diffs
146    }
147
148    pub fn goodbytes(&self) -> usize {
151        self.key_data.values().len()
152            + self.val_data.values().len()
153            + self.timestamps.values().inner().len()
154            + self.diffs.values().inner().len()
155    }
156
157    pub fn get(&self, idx: usize) -> Option<((&[u8], &[u8]), [u8; 8], [u8; 8])> {
161        if idx >= self.len {
162            return None;
163        }
164
165        let key = self.key_data.value(idx);
169        let val = self.val_data.value(idx);
170        let ts = i64::to_le_bytes(self.timestamps.values()[idx]);
171        let diff = i64::to_le_bytes(self.diffs.values()[idx]);
172        Some(((key, val), ts, diff))
173    }
174
175    pub fn iter(
177        &self,
178    ) -> impl Iterator<Item = ((&[u8], &[u8]), [u8; 8], [u8; 8])> + ExactSizeIterator {
179        (0..self.len).map(move |idx| self.get(idx).unwrap())
180    }
181
182    pub fn concat(records: &[ColumnarRecords], metrics: &ColumnarMetrics) -> ColumnarRecords {
184        match records.len() {
185            0 => return ColumnarRecords::default(),
186            1 => return records[0].clone(),
187            _ => {}
188        }
189
190        let mut concat_array = vec![];
191        let mut concat = |get: fn(&ColumnarRecords) -> &dyn Array| {
192            concat_array.extend(records.iter().map(get));
193            let res = ::arrow::compute::concat(&concat_array).expect("same type");
194            concat_array.clear();
195            res
196        };
197
198        Self {
199            len: records.iter().map(|c| c.len).sum(),
200            key_data: realloc_array(concat(|c| &c.key_data).as_binary(), metrics),
201            val_data: realloc_array(concat(|c| &c.val_data).as_binary(), metrics),
202            timestamps: realloc_array(concat(|c| &c.timestamps).as_primitive(), metrics),
203            diffs: realloc_array(concat(|c| &c.diffs).as_primitive(), metrics),
204        }
205    }
206}
207
208#[derive(Debug)]
211pub struct ColumnarRecordsBuilder {
212    len: usize,
213    key_data: BinaryBuilder,
214    val_data: BinaryBuilder,
215    timestamps: Vec<i64>,
216    diffs: Vec<i64>,
217}
218
219impl Default for ColumnarRecordsBuilder {
220    fn default() -> Self {
221        ColumnarRecordsBuilder {
222            len: 0,
223            key_data: BinaryBuilder::new(),
224            val_data: BinaryBuilder::new(),
225            timestamps: Vec::new(),
226            diffs: Vec::new(),
227        }
228    }
229}
230
231impl ColumnarRecordsBuilder {
232    pub fn with_capacity(items: usize, key_bytes: usize, val_bytes: usize) -> Self {
235        let key_data = BinaryBuilder::with_capacity(items, key_bytes);
236        let val_data = BinaryBuilder::with_capacity(items, val_bytes);
237        let timestamps = Vec::with_capacity(items);
238        let diffs = Vec::with_capacity(items);
239        Self {
240            len: 0,
241            key_data,
242            val_data,
243            timestamps,
244            diffs,
245        }
246    }
247
248    pub fn len(&self) -> usize {
251        self.len
252    }
253
254    pub fn can_fit(&self, key: &[u8], val: &[u8], limit: usize) -> bool {
260        let key_data_size = self.key_data.values_slice().len()
261            + self.key_data.offsets_slice().to_byte_slice().len()
262            + key.len();
263        let val_data_size = self.val_data.values_slice().len()
264            + self.val_data.offsets_slice().to_byte_slice().len()
265            + val.len();
266        key_data_size <= limit && val_data_size <= limit
267    }
268
269    pub fn total_bytes(&self) -> usize {
272        self.key_data.values_slice().len()
273            + self.key_data.offsets_slice().to_byte_slice().len()
274            + self.val_data.values_slice().len()
275            + self.val_data.offsets_slice().to_byte_slice().len()
276            + self.timestamps.to_byte_slice().len()
277            + self.diffs.to_byte_slice().len()
278    }
279
280    #[must_use]
286    pub fn push(&mut self, record: ((&[u8], &[u8]), [u8; 8], [u8; 8])) -> bool {
287        let ((key, val), ts, diff) = record;
288
289        if !self.can_fit(key, val, KEY_VAL_DATA_MAX_LEN) {
292            return false;
293        }
294
295        self.key_data.append_value(key);
296        self.val_data.append_value(val);
297        self.timestamps.push(i64::from_le_bytes(ts));
298        self.diffs.push(i64::from_le_bytes(diff));
299        self.len += 1;
300
301        true
302    }
303
304    pub fn finish(mut self, _metrics: &ColumnarMetrics) -> ColumnarRecords {
306        let ret = ColumnarRecords {
310            len: self.len,
311            key_data: BinaryBuilder::finish(&mut self.key_data),
312            val_data: BinaryBuilder::finish(&mut self.val_data),
313            timestamps: self.timestamps.into(),
314            diffs: self.diffs.into(),
315        };
316        debug_assert_eq!(ret.validate(), Ok(()));
317        ret
318    }
319
320    pub fn columnar_record_size(key_bytes_len: usize, value_bytes_len: usize) -> usize {
322        (key_bytes_len + BYTES_PER_KEY_VAL_OFFSET)
323            + (value_bytes_len + BYTES_PER_KEY_VAL_OFFSET)
324            + (2 * size_of::<u64>()) }
326}
327
328impl ColumnarRecords {
329    fn validate(&self) -> Result<(), String> {
330        let validate_array = |name: &str, array: &dyn Array| {
331            let len = array.len();
332            if len != self.len {
333                return Err(format!("expected {} {name} got {len}", self.len));
334            }
335            let null_count = array.null_count();
336            if null_count > 0 {
337                return Err(format!("{null_count} unexpected nulls in {name} array"));
338            }
339            Ok(())
340        };
341
342        let key_data_size =
343            self.key_data.values().len() + self.key_data.offsets().inner().inner().len();
344        if key_data_size > KEY_VAL_DATA_MAX_LEN {
345            return Err(format!(
346                "expected encoded key offsets and data size to be less than or equal to {} got {}",
347                KEY_VAL_DATA_MAX_LEN, key_data_size
348            ));
349        }
350        validate_array("keys", &self.key_data)?;
351
352        let val_data_size =
353            self.val_data.values().len() + self.val_data.offsets().inner().inner().len();
354        if val_data_size > KEY_VAL_DATA_MAX_LEN {
355            return Err(format!(
356                "expected encoded val offsets and data size to be less than or equal to {} got {}",
357                KEY_VAL_DATA_MAX_LEN, val_data_size
358            ));
359        }
360        validate_array("vals", &self.val_data)?;
361
362        if self.diffs.len() != self.len {
363            return Err(format!(
364                "expected {} diffs got {}",
365                self.len,
366                self.diffs.len()
367            ));
368        }
369        if self.timestamps.len() != self.len {
370            return Err(format!(
371                "expected {} timestamps got {}",
372                self.len,
373                self.timestamps.len()
374            ));
375        }
376
377        Ok(())
378    }
379}
380
381#[derive(Debug, Clone)]
390pub struct ColumnarRecordsStructuredExt {
391    pub key: ArrayRef,
397    pub val: ArrayRef,
403}
404
405impl PartialEq for ColumnarRecordsStructuredExt {
406    fn eq(&self, other: &Self) -> bool {
407        *self.key == *other.key && *self.val == *other.val
408    }
409}
410
411impl ColumnarRecordsStructuredExt {
412    pub fn into_proto(&self) -> (ProtoArrayData, ProtoArrayData) {
414        let key = self.key.to_data().into_proto();
415        let val = self.val.to_data().into_proto();
416
417        (key, val)
418    }
419
420    pub fn from_proto(
422        key: Option<ProtoArrayData>,
423        val: Option<ProtoArrayData>,
424    ) -> Result<Option<Self>, TryFromProtoError> {
425        let key = key.map(|d| d.into_rust()).transpose()?.map(make_array);
426        let val = val.map(|d| d.into_rust()).transpose()?.map(make_array);
427
428        let ext = match (key, val) {
429            (Some(key), Some(val)) => Some(ColumnarRecordsStructuredExt { key, val }),
430            x @ (Some(_), None) | x @ (None, Some(_)) => {
431                mz_ore::soft_panic_or_log!("found only one of key or val, {x:?}");
432                None
433            }
434            (None, None) => None,
435        };
436        Ok(ext)
437    }
438
439    pub fn goodbytes(&self) -> usize {
441        ArrayOrd::new(self.key.as_ref()).goodbytes() + ArrayOrd::new(self.val.as_ref()).goodbytes()
442    }
443}
444
445#[cfg(test)]
446mod tests {
447    use mz_persist_types::Codec64;
448
449    use super::*;
450
451    #[mz_ore::test]
455    fn columnar_records() {
456        let metrics = ColumnarMetrics::disconnected();
457        let builder = ColumnarRecordsBuilder::default();
458
459        let records = builder.finish(&metrics);
461        let reads: Vec<_> = records.iter().collect();
462        assert_eq!(reads, vec![]);
463
464        let updates: Vec<((Vec<u8>, Vec<u8>), u64, i64)> = vec![
466            (("".into(), "".into()), 0, 0),
467            (("".into(), "".into()), 1, 1),
468        ];
469        let mut builder = ColumnarRecordsBuilder::default();
470        for ((key, val), time, diff) in updates.iter() {
471            assert!(builder.push(((key, val), u64::encode(time), i64::encode(diff))));
472        }
473
474        let records = builder.finish(&metrics);
475        let reads: Vec<_> = records
476            .iter()
477            .map(|((k, v), t, d)| ((k.to_vec(), v.to_vec()), u64::decode(t), i64::decode(d)))
478            .collect();
479        assert_eq!(reads, updates);
480    }
481}