mz_persist_types/
part.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A columnar representation of one blob's worth of data
11
12use std::mem;
13use std::sync::Arc;
14
15use arrow::array::{Array, ArrayRef, AsArray, Int64Array};
16use arrow::datatypes::ToByteSlice;
17use mz_ore::result::ResultExt;
18
19use crate::Codec64;
20use crate::arrow::{ArrayIdx, ArrayOrd};
21use crate::columnar::{ColumnDecoder, ColumnEncoder, Schema};
22
23/// A structured columnar representation of one blob's worth of data.
24#[derive(Debug, Clone)]
25pub struct Part {
26    /// The 'k' values from a Part, generally `SourceData`.
27    pub key: Arc<dyn Array>,
28    /// The 'v' values from a Part, generally `()`.
29    pub val: Arc<dyn Array>,
30    /// The `ts` values from a Part.
31    pub time: Int64Array,
32    /// The `diff` values from a Part.
33    pub diff: Int64Array,
34}
35
36impl Part {
37    /// The length of each of the arrays in the part.
38    pub fn len(&self) -> usize {
39        self.key.len()
40    }
41
42    /// See [ArrayOrd::goodbytes].
43    pub fn goodbytes(&self) -> usize {
44        ArrayOrd::new(&self.key).goodbytes()
45            + ArrayOrd::new(&self.val).goodbytes()
46            + self.time.values().to_byte_slice().len()
47            + self.diff.values().to_byte_slice().len()
48    }
49
50    fn combine(
51        parts: &[Part],
52        mut combine_fn: impl FnMut(&[&dyn Array]) -> anyhow::Result<ArrayRef>,
53    ) -> anyhow::Result<Self> {
54        let mut field_array = Vec::with_capacity(parts.len());
55        let mut combine = |get: fn(&Part) -> &dyn Array| {
56            field_array.extend(parts.iter().map(get));
57            let res = combine_fn(&field_array);
58            field_array.clear();
59            res
60        };
61
62        Ok(Self {
63            key: combine(|p| &p.key)?,
64            val: combine(|p| &p.val)?,
65            time: combine(|p| &p.time)?.as_primitive().clone(),
66            diff: combine(|p| &p.diff)?.as_primitive().clone(),
67        })
68    }
69
70    /// Executes [::arrow::compute::concat] columnwise, or returns `None` if no parts are given.
71    pub fn concat(parts: &[Part]) -> anyhow::Result<Option<Self>> {
72        match parts.len() {
73            0 => return Ok(None),
74            1 => return Ok(Some(parts[0].clone())),
75            _ => {}
76        }
77        let combined = Part::combine(parts, |cols| ::arrow::compute::concat(cols).err_into())?;
78        Ok(Some(combined))
79    }
80
81    /// Executes [::arrow::compute::interleave] columnwise.
82    pub fn interleave(parts: &[Part], indices: &[(usize, usize)]) -> anyhow::Result<Self> {
83        Part::combine(parts, |cols| {
84            ::arrow::compute::interleave(cols, indices).err_into()
85        })
86    }
87
88    /// Iterate over the contents of this part, decoding as we go.
89    pub fn decode_iter<
90        'a,
91        K: Default + Clone + 'static,
92        V: Default + Clone + 'static,
93        T: Codec64,
94        D: Codec64,
95    >(
96        &'a self,
97        key_schema: &'a impl Schema<K>,
98        val_schema: &'a impl Schema<V>,
99    ) -> anyhow::Result<impl Iterator<Item = ((K, V), T, D)> + 'a> {
100        let key_decoder = key_schema.decoder_any(&*self.key)?;
101        let val_decoder = val_schema.decoder_any(&*self.val)?;
102        let mut key = K::default();
103        let mut val = V::default();
104        let iter = (0..self.len()).map(move |i| {
105            key_decoder.decode(i, &mut key);
106            val_decoder.decode(i, &mut val);
107            let time = T::decode(self.time.value(i).to_le_bytes());
108            let diff = D::decode(self.diff.value(i).to_le_bytes());
109            ((key.clone(), val.clone()), time, diff)
110        });
111        Ok(iter)
112    }
113
114    /// Convert the key/value columns to `ArrayOrd`.
115    pub fn as_ord(&self) -> PartOrd {
116        PartOrd {
117            key: ArrayOrd::new(&*self.key),
118            val: ArrayOrd::new(&*self.val),
119            time: self.time.clone(),
120            diff: self.diff.clone(),
121        }
122    }
123}
124
125/// A part with the key/value arrays downcast to `ArrayOrd` for convenience.
126#[derive(Debug, Clone)]
127pub struct PartOrd {
128    key: ArrayOrd,
129    val: ArrayOrd,
130    time: Int64Array,
131    diff: Int64Array,
132}
133
134impl PartOrd {
135    /// Iterate over the contents of the part in their un-decoded form.
136    pub fn iter(&self) -> impl Iterator<Item = (ArrayIdx<'_>, ArrayIdx<'_>, [u8; 8], [u8; 8])> {
137        (0..self.time.len()).map(move |i| {
138            let key = self.key.at(i);
139            let val = self.val.at(i);
140            let time = self.time.value(i).to_le_bytes();
141            let diff = self.diff.value(i).to_le_bytes();
142            (key, val, time, diff)
143        })
144    }
145}
146
147impl PartialEq for Part {
148    fn eq(&self, other: &Self) -> bool {
149        let Part {
150            key,
151            val,
152            time,
153            diff,
154        } = self;
155        let Part {
156            key: other_key,
157            val: other_val,
158            time: other_time,
159            diff: other_diff,
160        } = other;
161        key == other_key && val == other_val && time == other_time && diff == other_diff
162    }
163}
164
165/// A builder for [`Part`].
166#[derive(Debug)]
167pub struct PartBuilder<K, KS: Schema<K>, V, VS: Schema<V>> {
168    key: KS::Encoder,
169    val: VS::Encoder,
170    time: Codec64Mut,
171    diff: Codec64Mut,
172}
173
174impl<K, KS: Schema<K>, V, VS: Schema<V>> PartBuilder<K, KS, V, VS> {
175    /// Returns a new [`PartBuilder`].
176    pub fn new(key_schema: &KS, val_schema: &VS) -> Self {
177        let key = key_schema.encoder().unwrap();
178        let val = val_schema.encoder().unwrap();
179        let time = Codec64Mut(Vec::new());
180        let diff = Codec64Mut(Vec::new());
181
182        PartBuilder {
183            key,
184            val,
185            time,
186            diff,
187        }
188    }
189
190    /// Estimate the size of the part this builder will build.
191    pub fn goodbytes(&self) -> usize {
192        self.key.goodbytes() + self.val.goodbytes() + self.time.goodbytes() + self.diff.goodbytes()
193    }
194
195    /// Push a new row onto this [`PartBuilder`].
196    pub fn push<T: Codec64, D: Codec64>(&mut self, key: &K, val: &V, t: T, d: D) {
197        self.key.append(key);
198        self.val.append(val);
199        self.time.push(&t);
200        self.diff.push(&d);
201    }
202
203    /// Finishes the builder returning a [`Part`].
204    pub fn finish(self) -> Part {
205        let PartBuilder {
206            key,
207            val,
208            time,
209            diff,
210        } = self;
211
212        let key_col = key.finish();
213        let val_col = val.finish();
214        let time = Int64Array::from(time.0);
215        let diff = Int64Array::from(diff.0);
216
217        Part {
218            key: Arc::new(key_col),
219            val: Arc::new(val_col),
220            time,
221            diff,
222        }
223    }
224
225    /// Finish the builder and replace it with an empty one.
226    pub fn finish_and_replace(&mut self, key_schema: &KS, val_schema: &VS) -> Part {
227        let builder = mem::replace(self, PartBuilder::new(key_schema, val_schema));
228        builder.finish()
229    }
230}
231
232/// Mutable access to a column of a [`Codec64`] implementor.
233#[derive(Debug)]
234pub struct Codec64Mut(Vec<i64>);
235
236impl Codec64Mut {
237    /// Create a builder, pre-sized to an expected number of elements.
238    pub fn with_capacity(capacity: usize) -> Self {
239        Codec64Mut(Vec::with_capacity(capacity))
240    }
241
242    /// Returns the overall size of the stored data in bytes.
243    pub fn goodbytes(&self) -> usize {
244        self.0.len() * size_of::<i64>()
245    }
246
247    /// Returns the length of the column.
248    pub fn len(&self) -> usize {
249        self.0.len()
250    }
251
252    /// Pushes the given value into this column.
253    pub fn push(&mut self, val: &impl Codec64) {
254        self.push_raw(val.encode());
255    }
256
257    /// Pushes the given encoded value into this column.
258    pub fn push_raw(&mut self, val: [u8; 8]) {
259        self.0.push(i64::from_le_bytes(val));
260    }
261
262    /// Return the allocated array.
263    pub fn finish(self) -> Int64Array {
264        self.0.into()
265    }
266}
267
268#[cfg(test)]
269mod tests {
270    use std::marker::PhantomData;
271
272    use super::*;
273    use crate::codec_impls::UnitSchema;
274
275    // Make sure that the API structs are Sync + Send, so that they can be used in async tasks.
276    // NOTE: This is a compile-time only test. If it compiles, we're good.
277    #[allow(unused)]
278    fn sync_send() {
279        fn is_send_sync<T: Send + Sync>(_: PhantomData<T>) -> bool {
280            true
281        }
282
283        assert!(is_send_sync::<Part>(PhantomData));
284        assert!(is_send_sync::<PartBuilder<(), UnitSchema, (), UnitSchema>>(
285            PhantomData
286        ));
287    }
288}