mz_persist_types/
part.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A columnar representation of one blob's worth of data
11
12use std::mem;
13use std::sync::Arc;
14
15use arrow::array::{Array, ArrayRef, AsArray, Int64Array};
16use arrow::datatypes::ToByteSlice;
17use mz_ore::result::ResultExt;
18
19use crate::Codec64;
20use crate::arrow::ArrayOrd;
21use crate::columnar::{ColumnDecoder, ColumnEncoder, Schema};
22
23/// A structured columnar representation of one blob's worth of data.
24#[derive(Debug, Clone)]
25pub struct Part {
26    /// The 'k' values from a Part, generally `SourceData`.
27    pub key: Arc<dyn Array>,
28    /// The 'v' values from a Part, generally `()`.
29    pub val: Arc<dyn Array>,
30    /// The `ts` values from a Part.
31    pub time: Int64Array,
32    /// The `diff` values from a Part.
33    pub diff: Int64Array,
34}
35
36impl Part {
37    /// The length of each of the arrays in the part.
38    pub fn len(&self) -> usize {
39        self.key.len()
40    }
41
42    /// See [ArrayOrd::goodbytes].
43    pub fn goodbytes(&self) -> usize {
44        ArrayOrd::new(&self.key).goodbytes()
45            + ArrayOrd::new(&self.val).goodbytes()
46            + self.time.values().to_byte_slice().len()
47            + self.diff.values().to_byte_slice().len()
48    }
49
50    fn combine(
51        parts: &[Part],
52        mut combine_fn: impl FnMut(&[&dyn Array]) -> anyhow::Result<ArrayRef>,
53    ) -> anyhow::Result<Self> {
54        let mut field_array = Vec::with_capacity(parts.len());
55        let mut combine = |get: fn(&Part) -> &dyn Array| {
56            field_array.extend(parts.iter().map(get));
57            let res = combine_fn(&field_array);
58            field_array.clear();
59            res
60        };
61
62        Ok(Self {
63            key: combine(|p| &p.key)?,
64            val: combine(|p| &p.val)?,
65            time: combine(|p| &p.time)?.as_primitive().clone(),
66            diff: combine(|p| &p.diff)?.as_primitive().clone(),
67        })
68    }
69
70    /// Executes [::arrow::compute::concat] columnwise, or returns `None` if no parts are given.
71    pub fn concat(parts: &[Part]) -> anyhow::Result<Option<Self>> {
72        match parts.len() {
73            0 => return Ok(None),
74            1 => return Ok(Some(parts[0].clone())),
75            _ => {}
76        }
77        let combined = Part::combine(parts, |cols| ::arrow::compute::concat(cols).err_into())?;
78        Ok(Some(combined))
79    }
80
81    /// Executes [::arrow::compute::interleave] columnwise.
82    pub fn interleave(parts: &[Part], indices: &[(usize, usize)]) -> anyhow::Result<Self> {
83        Part::combine(parts, |cols| {
84            ::arrow::compute::interleave(cols, indices).err_into()
85        })
86    }
87
88    /// Iterate over the contents of this part, decoding as we go.
89    pub fn decode_iter<
90        'a,
91        K: Default + Clone + 'static,
92        V: Default + Clone + 'static,
93        T: Codec64,
94        D: Codec64,
95    >(
96        &'a self,
97        key_schema: &'a impl Schema<K>,
98        val_schema: &'a impl Schema<V>,
99    ) -> anyhow::Result<impl Iterator<Item = ((K, V), T, D)> + 'a> {
100        let key_decoder = key_schema.decoder_any(&*self.key)?;
101        let val_decoder = val_schema.decoder_any(&*self.val)?;
102        let mut key = K::default();
103        let mut val = V::default();
104        let iter = (0..self.len()).map(move |i| {
105            key_decoder.decode(i, &mut key);
106            val_decoder.decode(i, &mut val);
107            let time = T::decode(self.time.value(i).to_le_bytes());
108            let diff = D::decode(self.diff.value(i).to_le_bytes());
109            ((key.clone(), val.clone()), time, diff)
110        });
111        Ok(iter)
112    }
113}
114
115impl PartialEq for Part {
116    fn eq(&self, other: &Self) -> bool {
117        let Part {
118            key,
119            val,
120            time,
121            diff,
122        } = self;
123        let Part {
124            key: other_key,
125            val: other_val,
126            time: other_time,
127            diff: other_diff,
128        } = other;
129        key == other_key && val == other_val && time == other_time && diff == other_diff
130    }
131}
132
133/// A builder for [`Part`].
134#[derive(Debug)]
135pub struct PartBuilder<K, KS: Schema<K>, V, VS: Schema<V>> {
136    key: KS::Encoder,
137    val: VS::Encoder,
138    time: Codec64Mut,
139    diff: Codec64Mut,
140}
141
142impl<K, KS: Schema<K>, V, VS: Schema<V>> PartBuilder<K, KS, V, VS> {
143    /// Returns a new [`PartBuilder`].
144    pub fn new(key_schema: &KS, val_schema: &VS) -> Self {
145        let key = key_schema.encoder().unwrap();
146        let val = val_schema.encoder().unwrap();
147        let time = Codec64Mut(Vec::new());
148        let diff = Codec64Mut(Vec::new());
149
150        PartBuilder {
151            key,
152            val,
153            time,
154            diff,
155        }
156    }
157
158    /// Estimate the size of the part this builder will build.
159    pub fn goodbytes(&self) -> usize {
160        self.key.goodbytes() + self.val.goodbytes() + self.time.goodbytes() + self.diff.goodbytes()
161    }
162
163    /// Push a new row onto this [`PartBuilder`].
164    pub fn push<T: Codec64, D: Codec64>(&mut self, key: &K, val: &V, t: T, d: D) {
165        self.key.append(key);
166        self.val.append(val);
167        self.time.push(&t);
168        self.diff.push(&d);
169    }
170
171    /// Finishes the builder returning a [`Part`].
172    pub fn finish(self) -> Part {
173        let PartBuilder {
174            key,
175            val,
176            time,
177            diff,
178        } = self;
179
180        let key_col = key.finish();
181        let val_col = val.finish();
182        let time = Int64Array::from(time.0);
183        let diff = Int64Array::from(diff.0);
184
185        Part {
186            key: Arc::new(key_col),
187            val: Arc::new(val_col),
188            time,
189            diff,
190        }
191    }
192
193    /// Finish the builder and replace it with an empty one.
194    pub fn finish_and_replace(&mut self, key_schema: &KS, val_schema: &VS) -> Part {
195        let builder = mem::replace(self, PartBuilder::new(key_schema, val_schema));
196        builder.finish()
197    }
198}
199
200/// Mutable access to a column of a [`Codec64`] implementor.
201#[derive(Debug)]
202pub struct Codec64Mut(Vec<i64>);
203
204impl Codec64Mut {
205    /// Create a builder, pre-sized to an expected number of elements.
206    pub fn with_capacity(capacity: usize) -> Self {
207        Codec64Mut(Vec::with_capacity(capacity))
208    }
209
210    /// Returns the overall size of the stored data in bytes.
211    pub fn goodbytes(&self) -> usize {
212        self.0.len() * size_of::<i64>()
213    }
214
215    /// Returns the length of the column.
216    pub fn len(&self) -> usize {
217        self.0.len()
218    }
219
220    /// Pushes the given value into this column.
221    pub fn push(&mut self, val: &impl Codec64) {
222        self.push_raw(val.encode());
223    }
224
225    /// Pushes the given encoded value into this column.
226    pub fn push_raw(&mut self, val: [u8; 8]) {
227        self.0.push(i64::from_le_bytes(val));
228    }
229
230    /// Return the allocated array.
231    pub fn finish(self) -> Int64Array {
232        self.0.into()
233    }
234}
235
236#[cfg(test)]
237mod tests {
238    use std::marker::PhantomData;
239
240    use super::*;
241    use crate::codec_impls::UnitSchema;
242
243    // Make sure that the API structs are Sync + Send, so that they can be used in async tasks.
244    // NOTE: This is a compile-time only test. If it compiles, we're good.
245    #[allow(unused)]
246    fn sync_send() {
247        fn is_send_sync<T: Send + Sync>(_: PhantomData<T>) -> bool {
248            true
249        }
250
251        assert!(is_send_sync::<Part>(PhantomData));
252        assert!(is_send_sync::<PartBuilder<(), UnitSchema, (), UnitSchema>>(
253            PhantomData
254        ));
255    }
256}