mz_persist/indexed/
columnar.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A columnar representation of ((Key, Val), Time, i64) data suitable for in-memory
11//! reads and persistent storage.
12
13use std::fmt;
14use std::mem::size_of;
15
16use ::arrow::array::{
17    Array, ArrayRef, AsArray, BinaryArray, BinaryBuilder, Int64Array, make_array,
18};
19use ::arrow::datatypes::ToByteSlice;
20use mz_persist_types::arrow::{ArrayOrd, ProtoArrayData};
21use mz_proto::{ProtoType, RustType, TryFromProtoError};
22
23use crate::indexed::columnar::arrow::realloc_array;
24use crate::metrics::ColumnarMetrics;
25
26pub mod arrow;
27pub mod parquet;
28
29/// The maximum allowed amount of total key data (similarly val data) in a
30/// single ColumnarBatch.
31///
32/// Note that somewhat counter-intuitively, this also includes offsets (counting
33/// as 4 bytes each) in the definition of "key/val data".
34///
35/// TODO: The limit on the amount of {key,val} data is because we use i32
36/// offsets in parquet; this won't change. However, we include the offsets in
37/// the size because the parquet library we use currently maps each Array 1:1
38/// with a parquet "page" (so for a "binary" column this is both the offsets and
39/// the data). The parquet format internally stores the size of a page in an
40/// i32, so if this gets too big, our library overflows it and writes bad data.
41/// There's no reason it needs to map an Array 1:1 to a page (it could instead
42/// be 1:1 with a "column chunk", which contains 1 or more pages). For now, we
43/// work around it.
44// TODO(benesch): find a way to express this without `as`.
45#[allow(clippy::as_conversions)]
46pub const KEY_VAL_DATA_MAX_LEN: usize = i32::MAX as usize;
47
48const BYTES_PER_KEY_VAL_OFFSET: usize = 4;
49
50/// A set of ((Key, Val), Time, Diff) records stored in a columnar
51/// representation.
52///
53/// Note that the data are unsorted, and unconsolidated (so there may be
54/// multiple instances of the same ((Key, Val), Time), and some Diffs might be
55/// zero, or add up to zero).
56///
57/// Both Time and Diff are presented externally to persist users as a type
58/// parameter that implements [mz_persist_types::Codec64]. Our columnar format
59/// intentionally stores them both as i64 columns (as opposed to something like
60/// a fixed width binary column) because this allows us additional compression
61/// options.
62///
63/// Also note that we intentionally use an i64 over a u64 for Time. Over the
64/// range `[0, i64::MAX]`, the bytes are the same and we've talked at various
65/// times about changing Time in mz to an i64. Both millis since unix epoch and
66/// nanos since unix epoch easily fit into this range (the latter until some
67/// time after year 2200). Using a i64 might be a pessimization for a
68/// non-realtime mz source with u64 timestamps in the range `(i64::MAX,
69/// u64::MAX]`, but realtime sources are overwhelmingly the common case.
70///
71/// Invariants:
72/// - len <= u32::MAX (since we use arrow's `BinaryArray` for our binary data)
73/// - the length of all arrays should == len
74/// - all entries should be non-null
75#[derive(Clone, PartialEq)]
76pub struct ColumnarRecords {
77    len: usize,
78    key_data: BinaryArray,
79    val_data: BinaryArray,
80    timestamps: Int64Array,
81    diffs: Int64Array,
82}
83
84impl Default for ColumnarRecords {
85    fn default() -> Self {
86        Self {
87            len: 0,
88            key_data: BinaryArray::from_vec(vec![]),
89            val_data: BinaryArray::from_vec(vec![]),
90            timestamps: Int64Array::from_iter_values([]),
91            diffs: Int64Array::from_iter_values([]),
92        }
93    }
94}
95
96impl fmt::Debug for ColumnarRecords {
97    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
98        fmt.debug_list().entries(self.iter()).finish()
99    }
100}
101
102impl ColumnarRecords {
103    /// Construct a columnar records instance from the given arrays, checking invariants.
104    pub fn new(
105        key_data: BinaryArray,
106        val_data: BinaryArray,
107        timestamps: Int64Array,
108        diffs: Int64Array,
109    ) -> Self {
110        let len = key_data.len();
111        let records = Self {
112            len,
113            key_data,
114            val_data,
115            timestamps,
116            diffs,
117        };
118        assert_eq!(records.validate(), Ok(()));
119        records
120    }
121
122    /// The number of (potentially duplicated) ((Key, Val), Time, i64) records
123    /// stored in Self.
124    pub fn len(&self) -> usize {
125        self.len
126    }
127
128    /// The keys in this columnar records as an array.
129    pub fn keys(&self) -> &BinaryArray {
130        &self.key_data
131    }
132
133    /// The vals in this columnar records as an array.
134    pub fn vals(&self) -> &BinaryArray {
135        &self.val_data
136    }
137
138    /// The timestamps in this columnar records as an array.
139    pub fn timestamps(&self) -> &Int64Array {
140        &self.timestamps
141    }
142
143    /// The diffs in this columnar records as an array.
144    pub fn diffs(&self) -> &Int64Array {
145        &self.diffs
146    }
147
148    /// The number of logical bytes in the represented data, excluding offsets
149    /// and lengths.
150    pub fn goodbytes(&self) -> usize {
151        self.key_data.values().len()
152            + self.val_data.values().len()
153            + self.timestamps.values().inner().len()
154            + self.diffs.values().inner().len()
155    }
156
157    /// Read the record at `idx`, if there is one.
158    ///
159    /// Returns None if `idx >= self.len()`.
160    pub fn get(&self, idx: usize) -> Option<((&[u8], &[u8]), [u8; 8], [u8; 8])> {
161        if idx >= self.len {
162            return None;
163        }
164
165        // There used to be `debug_assert_eq!(self.validate(), Ok(()))`, but it
166        // resulted in accidentally O(n^2) behavior in debug mode. Instead, we
167        // push that responsibility to the ColumnarRecordsRef constructor.
168        let key = self.key_data.value(idx);
169        let val = self.val_data.value(idx);
170        let ts = i64::to_le_bytes(self.timestamps.values()[idx]);
171        let diff = i64::to_le_bytes(self.diffs.values()[idx]);
172        Some(((key, val), ts, diff))
173    }
174
175    /// Iterate through the records in Self.
176    pub fn iter(
177        &self,
178    ) -> impl Iterator<Item = ((&[u8], &[u8]), [u8; 8], [u8; 8])> + ExactSizeIterator {
179        (0..self.len).map(move |idx| self.get(idx).unwrap())
180    }
181
182    /// Concatenate the given records together, column-by-column.
183    pub fn concat(records: &[ColumnarRecords], metrics: &ColumnarMetrics) -> ColumnarRecords {
184        match records.len() {
185            0 => return ColumnarRecords::default(),
186            1 => return records[0].clone(),
187            _ => {}
188        }
189
190        let mut concat_array = vec![];
191        let mut concat = |get: fn(&ColumnarRecords) -> &dyn Array| {
192            concat_array.extend(records.iter().map(get));
193            let res = ::arrow::compute::concat(&concat_array).expect("same type");
194            concat_array.clear();
195            res
196        };
197
198        Self {
199            len: records.iter().map(|c| c.len).sum(),
200            key_data: realloc_array(concat(|c| &c.key_data).as_binary(), metrics),
201            val_data: realloc_array(concat(|c| &c.val_data).as_binary(), metrics),
202            timestamps: realloc_array(concat(|c| &c.timestamps).as_primitive(), metrics),
203            diffs: realloc_array(concat(|c| &c.diffs).as_primitive(), metrics),
204        }
205    }
206}
207
208/// An abstraction to incrementally add ((Key, Value), Time, i64) records
209/// in a columnar representation, and eventually get back a [ColumnarRecords].
210#[derive(Debug)]
211pub struct ColumnarRecordsBuilder {
212    len: usize,
213    key_data: BinaryBuilder,
214    val_data: BinaryBuilder,
215    timestamps: Vec<i64>,
216    diffs: Vec<i64>,
217}
218
219impl Default for ColumnarRecordsBuilder {
220    fn default() -> Self {
221        ColumnarRecordsBuilder {
222            len: 0,
223            key_data: BinaryBuilder::new(),
224            val_data: BinaryBuilder::new(),
225            timestamps: Vec::new(),
226            diffs: Vec::new(),
227        }
228    }
229}
230
231impl ColumnarRecordsBuilder {
232    /// Reserve space for the given number of items with the given sizes in bytes.
233    /// If they end up being too small, the underlying buffers will be resized as usual.
234    pub fn with_capacity(items: usize, key_bytes: usize, val_bytes: usize) -> Self {
235        let key_data = BinaryBuilder::with_capacity(items, key_bytes);
236        let val_data = BinaryBuilder::with_capacity(items, val_bytes);
237        let timestamps = Vec::with_capacity(items);
238        let diffs = Vec::with_capacity(items);
239        Self {
240            len: 0,
241            key_data,
242            val_data,
243            timestamps,
244            diffs,
245        }
246    }
247
248    /// The number of (potentially duplicated) ((Key, Val), Time, i64) records
249    /// stored in Self.
250    pub fn len(&self) -> usize {
251        self.len
252    }
253
254    /// Returns if the given key_offsets+key_data or val_offsets+val_data fits
255    /// in the limits imposed by ColumnarRecords.
256    ///
257    /// Note that limit is always [KEY_VAL_DATA_MAX_LEN] in production. It's
258    /// only override-able here for testing.
259    pub fn can_fit(&self, key: &[u8], val: &[u8], limit: usize) -> bool {
260        let key_data_size = self.key_data.values_slice().len()
261            + self.key_data.offsets_slice().to_byte_slice().len()
262            + key.len();
263        let val_data_size = self.val_data.values_slice().len()
264            + self.val_data.offsets_slice().to_byte_slice().len()
265            + val.len();
266        key_data_size <= limit && val_data_size <= limit
267    }
268
269    /// The current size of the columnar records data, useful for bounding batches at a
270    /// target size.
271    pub fn total_bytes(&self) -> usize {
272        self.key_data.values_slice().len()
273            + self.key_data.offsets_slice().to_byte_slice().len()
274            + self.val_data.values_slice().len()
275            + self.val_data.offsets_slice().to_byte_slice().len()
276            + self.timestamps.to_byte_slice().len()
277            + self.diffs.to_byte_slice().len()
278    }
279
280    /// Add a record to Self.
281    ///
282    /// Returns whether the record was successfully added. A record will not a
283    /// added if it exceeds the size limitations of ColumnarBatch. This method
284    /// is atomic, if it fails, no partial data will have been added.
285    #[must_use]
286    pub fn push(&mut self, record: ((&[u8], &[u8]), [u8; 8], [u8; 8])) -> bool {
287        let ((key, val), ts, diff) = record;
288
289        // Check size invariants ahead of time so we stay atomic when we can't
290        // add the record.
291        if !self.can_fit(key, val, KEY_VAL_DATA_MAX_LEN) {
292            return false;
293        }
294
295        self.key_data.append_value(key);
296        self.val_data.append_value(val);
297        self.timestamps.push(i64::from_le_bytes(ts));
298        self.diffs.push(i64::from_le_bytes(diff));
299        self.len += 1;
300
301        true
302    }
303
304    /// Finalize constructing a [ColumnarRecords].
305    pub fn finish(mut self, _metrics: &ColumnarMetrics) -> ColumnarRecords {
306        // We're almost certainly going to immediately encode this and drop it,
307        // so don't bother actually copying the data into lgalloc.
308        // Revisit if that changes.
309        let ret = ColumnarRecords {
310            len: self.len,
311            key_data: BinaryBuilder::finish(&mut self.key_data),
312            val_data: BinaryBuilder::finish(&mut self.val_data),
313            timestamps: self.timestamps.into(),
314            diffs: self.diffs.into(),
315        };
316        debug_assert_eq!(ret.validate(), Ok(()));
317        ret
318    }
319
320    /// Size of an update record as stored in the columnar representation
321    pub fn columnar_record_size(key_bytes_len: usize, value_bytes_len: usize) -> usize {
322        (key_bytes_len + BYTES_PER_KEY_VAL_OFFSET)
323            + (value_bytes_len + BYTES_PER_KEY_VAL_OFFSET)
324            + (2 * size_of::<u64>()) // T and D
325    }
326}
327
328impl ColumnarRecords {
329    fn validate(&self) -> Result<(), String> {
330        let validate_array = |name: &str, array: &dyn Array| {
331            let len = array.len();
332            if len != self.len {
333                return Err(format!("expected {} {name} got {len}", self.len));
334            }
335            let null_count = array.null_count();
336            if null_count > 0 {
337                return Err(format!("{null_count} unexpected nulls in {name} array"));
338            }
339            Ok(())
340        };
341
342        let key_data_size =
343            self.key_data.values().len() + self.key_data.offsets().inner().inner().len();
344        if key_data_size > KEY_VAL_DATA_MAX_LEN {
345            return Err(format!(
346                "expected encoded key offsets and data size to be less than or equal to {} got {}",
347                KEY_VAL_DATA_MAX_LEN, key_data_size
348            ));
349        }
350        validate_array("keys", &self.key_data)?;
351
352        let val_data_size =
353            self.val_data.values().len() + self.val_data.offsets().inner().inner().len();
354        if val_data_size > KEY_VAL_DATA_MAX_LEN {
355            return Err(format!(
356                "expected encoded val offsets and data size to be less than or equal to {} got {}",
357                KEY_VAL_DATA_MAX_LEN, val_data_size
358            ));
359        }
360        validate_array("vals", &self.val_data)?;
361
362        if self.diffs.len() != self.len {
363            return Err(format!(
364                "expected {} diffs got {}",
365                self.len,
366                self.diffs.len()
367            ));
368        }
369        if self.timestamps.len() != self.len {
370            return Err(format!(
371                "expected {} timestamps got {}",
372                self.len,
373                self.timestamps.len()
374            ));
375        }
376
377        Ok(())
378    }
379}
380
381/// An "extension" to [`ColumnarRecords`] that duplicates the "key" (`K`) and "val" (`V`) columns
382/// as structured Arrow data.
383///
384/// [`ColumnarRecords`] stores the key and value columns as binary blobs encoded with the [`Codec`]
385/// trait. We're migrating to instead store the key and value columns as structured Parquet data,
386/// which we interface with via Arrow.
387///
388/// [`Codec`]: mz_persist_types::Codec
389#[derive(Debug, Clone)]
390pub struct ColumnarRecordsStructuredExt {
391    /// The structured `k` column.
392    ///
393    /// [`arrow`] does not allow empty [`StructArray`]s so we model an empty `key` column as None.
394    ///
395    /// [`StructArray`]: ::arrow::array::StructArray
396    pub key: ArrayRef,
397    /// The structured `v` column.
398    ///
399    /// [`arrow`] does not allow empty [`StructArray`]s so we model an empty `val` column as None.
400    ///
401    /// [`StructArray`]: ::arrow::array::StructArray
402    pub val: ArrayRef,
403}
404
405impl PartialEq for ColumnarRecordsStructuredExt {
406    fn eq(&self, other: &Self) -> bool {
407        *self.key == *other.key && *self.val == *other.val
408    }
409}
410
411impl ColumnarRecordsStructuredExt {
412    /// See [`RustType::into_proto`].
413    pub fn into_proto(&self) -> (ProtoArrayData, ProtoArrayData) {
414        let key = self.key.to_data().into_proto();
415        let val = self.val.to_data().into_proto();
416
417        (key, val)
418    }
419
420    /// See [`RustType::from_proto`].
421    pub fn from_proto(
422        key: Option<ProtoArrayData>,
423        val: Option<ProtoArrayData>,
424    ) -> Result<Option<Self>, TryFromProtoError> {
425        let key = key.map(|d| d.into_rust()).transpose()?.map(make_array);
426        let val = val.map(|d| d.into_rust()).transpose()?.map(make_array);
427
428        let ext = match (key, val) {
429            (Some(key), Some(val)) => Some(ColumnarRecordsStructuredExt { key, val }),
430            x @ (Some(_), None) | x @ (None, Some(_)) => {
431                mz_ore::soft_panic_or_log!("found only one of key or val, {x:?}");
432                None
433            }
434            (None, None) => None,
435        };
436        Ok(ext)
437    }
438
439    /// The "goodput" of these arrays, excluding overhead like offsets etc.
440    pub fn goodbytes(&self) -> usize {
441        ArrayOrd::new(self.key.as_ref()).goodbytes() + ArrayOrd::new(self.val.as_ref()).goodbytes()
442    }
443}
444
445#[cfg(test)]
446mod tests {
447    use mz_persist_types::Codec64;
448
449    use super::*;
450
451    /// Smoke test some edge cases around empty sets of records and empty keys/vals
452    ///
453    /// Most of this functionality is also well-exercised in other unit tests as well.
454    #[mz_ore::test]
455    fn columnar_records() {
456        let metrics = ColumnarMetrics::disconnected();
457        let builder = ColumnarRecordsBuilder::default();
458
459        // Empty builder.
460        let records = builder.finish(&metrics);
461        let reads: Vec<_> = records.iter().collect();
462        assert_eq!(reads, vec![]);
463
464        // Empty key and val.
465        let updates: Vec<((Vec<u8>, Vec<u8>), u64, i64)> = vec![
466            (("".into(), "".into()), 0, 0),
467            (("".into(), "".into()), 1, 1),
468        ];
469        let mut builder = ColumnarRecordsBuilder::default();
470        for ((key, val), time, diff) in updates.iter() {
471            assert!(builder.push(((key, val), u64::encode(time), i64::encode(diff))));
472        }
473
474        let records = builder.finish(&metrics);
475        let reads: Vec<_> = records
476            .iter()
477            .map(|((k, v), t, d)| ((k.to_vec(), v.to_vec()), u64::decode(t), i64::decode(d)))
478            .collect();
479        assert_eq!(reads, updates);
480    }
481}