Skip to main content

mz_repr/
update.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use bytes::Bytes;
11use itertools::Itertools;
12use serde::{Deserialize, Deserializer, Serialize, Serializer};
13use std::ops::{Deref, Range};
14use std::sync::Arc;
15use timely::progress::Timestamp;
16
17use crate::{Diff, RowRef};
18
19/// An immutable, shared slice. Morally, this is [bytes::Bytes] but with fewer features
20/// and supporting arbitrary types.
21#[derive(Debug, Clone)]
22pub struct SharedSlice<T> {
23    /// The range of offsets in the backing data that are present in the slice.
24    /// (This allows us to subset the slice without reallocating.)
25    range: Range<usize>,
26    data: Arc<[T]>,
27}
28
29impl<T> SharedSlice<T> {
30    /// Split this slice in half at the provided offset.
31    pub fn split_at(self, offset: usize) -> (Self, Self) {
32        let Self { range, data } = self;
33        assert!(offset <= range.len());
34        let offset = range.start + offset;
35        (
36            Self {
37                range: range.start..offset,
38                data: Arc::clone(&data),
39            },
40            Self {
41                range: offset..range.end,
42                data,
43            },
44        )
45    }
46}
47
48impl<T> Deref for SharedSlice<T> {
49    type Target = [T];
50
51    fn deref(&self) -> &Self::Target {
52        &self.data[self.range.clone()]
53    }
54}
55
56impl<T: Serialize> Serialize for SharedSlice<T> {
57    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
58    where
59        S: Serializer,
60    {
61        (**self).serialize(serializer)
62    }
63}
64
65impl<'de, T: Deserialize<'de>> Deserialize<'de> for SharedSlice<T> {
66    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
67    where
68        D: Deserializer<'de>,
69    {
70        let vec: Vec<T> = Deserialize::deserialize(deserializer)?;
71        Ok(vec.into())
72    }
73}
74
75impl<T: PartialEq> PartialEq for SharedSlice<T> {
76    fn eq(&self, other: &Self) -> bool {
77        **self == **other
78    }
79}
80
81impl<T: Eq> Eq for SharedSlice<T> {}
82
83impl<T> Default for SharedSlice<T> {
84    fn default() -> Self {
85        vec![].into()
86    }
87}
88
89impl<T> From<Vec<T>> for SharedSlice<T> {
90    fn from(data: Vec<T>) -> Self {
91        Self {
92            range: 0..data.len(),
93            data: data.into(),
94        }
95    }
96}
97
98/// See [Rows].
99#[derive(Debug)]
100pub struct RowsBuilder {
101    bytes: Vec<u8>,
102    run_ends: Vec<usize>,
103}
104
105impl RowsBuilder {
106    pub fn push(&mut self, row: &RowRef) {
107        self.bytes.extend(row.data());
108        self.run_ends.push(self.bytes.len());
109    }
110
111    pub fn build(self) -> Rows {
112        Rows {
113            bytes: self.bytes.into(),
114            run_start: 0,
115            run_ends: self.run_ends.into(),
116        }
117    }
118}
119
120/// A packed representation of a set of rows.
121#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
122pub struct Rows {
123    bytes: Bytes,
124    /// After a split, the bytes above will only cover the range of bytes currently contained in this
125    /// [Rows], but the shared slice ends contain offsets into the original un-split buffer.
126    /// That means we need to track the offset that corresponds to the first byte in the bytes buffer,
127    /// to offset the rest of the values with. That's what this value is!
128    run_start: usize,
129    run_ends: SharedSlice<usize>,
130}
131
132impl Rows {
133    pub fn builder(byte_size_hint: usize, row_size_hint: usize) -> RowsBuilder {
134        RowsBuilder {
135            bytes: Vec::with_capacity(byte_size_hint),
136            run_ends: Vec::with_capacity(row_size_hint),
137        }
138    }
139    pub fn get(&self, index: usize) -> Option<&RowRef> {
140        if index >= self.run_ends.len() {
141            return None;
142        }
143        let lo = if index == 0 {
144            0
145        } else {
146            self.run_ends[index - 1] - self.run_start
147        };
148        let hi = self.run_ends[index] - self.run_start;
149        // SAFETY: endpoints and data taken from a pushed encoded run.
150        Some(unsafe { RowRef::from_slice(&self.bytes[lo..hi]) })
151    }
152
153    pub fn iter(&self) -> impl Iterator<Item = &RowRef> {
154        [0].into_iter()
155            .chain(self.run_ends.iter().map(|i| *i - self.run_start))
156            .tuple_windows()
157            .map(|(lo, hi)| {
158                // SAFETY: endpoints and data taken from a pushed encoded run.
159                unsafe { RowRef::from_slice(&self.bytes[lo..hi]) }
160            })
161    }
162
163    pub fn split_at(mut self, mid: usize) -> (Self, Self) {
164        let (run_ends_a, run_ends_b) = self.run_ends.split_at(mid);
165        let byte_mid = run_ends_a.last().map_or(0, |e| *e - self.run_start);
166        let bytes_b = self.bytes.split_off(byte_mid);
167        (
168            Self {
169                bytes: self.bytes,
170                run_start: self.run_start,
171                run_ends: run_ends_a,
172            },
173            Self {
174                bytes: bytes_b,
175                run_start: self.run_start + byte_mid,
176                run_ends: run_ends_b,
177            },
178        )
179    }
180
181    pub fn len(&self) -> usize {
182        self.run_ends.len()
183    }
184
185    pub fn byte_len(&self) -> usize {
186        self.bytes.len()
187    }
188}
189
190#[derive(Debug)]
191pub struct UpdateCollectionBuilder<T = crate::Timestamp> {
192    rows: RowsBuilder,
193    times: Vec<T>,
194    diffs: Vec<Diff>,
195}
196
197impl<T: Timestamp> UpdateCollectionBuilder<T> {
198    pub fn push(&mut self, (row, time, diff): (&RowRef, &T, Diff)) {
199        self.rows.push(row);
200        self.times.push(time.clone());
201        self.diffs.push(diff);
202    }
203
204    pub fn build(self) -> UpdateCollection<T> {
205        UpdateCollection {
206            rows: self.rows.build(),
207            times: self.times.into(),
208            diffs: self.diffs.into(),
209        }
210    }
211}
212
213/// A collection of row-time-diff updates in a columnar format.
214#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
215pub struct UpdateCollection<T = crate::Timestamp> {
216    rows: Rows,
217    times: SharedSlice<T>,
218    diffs: SharedSlice<Diff>,
219}
220
221impl<T> Default for UpdateCollection<T> {
222    fn default() -> Self {
223        Self {
224            rows: Default::default(),
225            times: Default::default(),
226            diffs: Default::default(),
227        }
228    }
229}
230
231impl<'a, T: Timestamp> FromIterator<(&'a RowRef, &'a T, Diff)> for UpdateCollection<T> {
232    fn from_iter<I: IntoIterator<Item = (&'a RowRef, &'a T, Diff)>>(iter: I) -> Self {
233        let iter = iter.into_iter();
234        let len_hint = iter.size_hint().0;
235        let bytes_hint = len_hint * 8;
236        let mut builder = UpdateCollection::builder(bytes_hint, len_hint);
237        for row in iter {
238            builder.push(row);
239        }
240        builder.build()
241    }
242}
243
244impl<T> UpdateCollection<T> {
245    pub fn builder(byte_size_hint: usize, row_size_hint: usize) -> UpdateCollectionBuilder<T> {
246        UpdateCollectionBuilder {
247            rows: Rows::builder(byte_size_hint, row_size_hint),
248            times: Vec::with_capacity(row_size_hint),
249            diffs: Vec::with_capacity(row_size_hint),
250        }
251    }
252
253    pub fn get(&self, index: usize) -> Option<(&RowRef, &T, Diff)> {
254        Some((
255            self.rows.get(index)?,
256            self.times.get(index)?,
257            *self.diffs.get(index)?,
258        ))
259    }
260
261    pub fn iter(&self) -> impl Iterator<Item = (&RowRef, &T, Diff)> {
262        itertools::multizip((
263            self.rows.iter(),
264            self.times.iter(),
265            self.diffs.iter().cloned(),
266        ))
267    }
268
269    pub fn split_at(self, index: usize) -> (Self, Self) {
270        let (rows_a, rows_b) = self.rows.split_at(index);
271        let (times_a, times_b) = self.times.split_at(index);
272        let (diffs_a, diffs_b) = self.diffs.split_at(index);
273        (
274            Self {
275                rows: rows_a,
276                times: times_a,
277                diffs: diffs_a,
278            },
279            Self {
280                rows: rows_b,
281                times: times_b,
282                diffs: diffs_b,
283            },
284        )
285    }
286
287    pub fn times(&self) -> &[T] {
288        &*self.times
289    }
290
291    pub fn byte_len(&self) -> usize {
292        self.rows.byte_len()
293    }
294
295    pub fn len(&self) -> usize {
296        self.rows.len()
297    }
298}
299
300#[cfg(test)]
301mod tests {
302    use super::*;
303    use proptest::array::uniform;
304    use proptest::collection::vec;
305    use proptest::prelude::*;
306
307    #[mz_ore::test]
308    #[cfg_attr(miri, ignore)]
309    fn test_slice_splits() {
310        proptest!(|(data in vec(0u64..1000u64, 0..20), [a, b] in uniform(0usize..20))| {
311            let sliceable: SharedSlice<u64> = data.clone().into();
312            let mid = a.clamp(0, data.len());
313            let data = data.split_at(mid);
314            let sliceable = sliceable.split_at(mid);
315            assert_eq!(data.0, &*sliceable.0);
316            assert_eq!(data.1, &*sliceable.1);
317            let mid = b.clamp(0, data.0.len());
318            let data = data.0.split_at(mid);
319            let sliceable = sliceable.0.split_at(mid);
320            assert_eq!(data.0, &*sliceable.0);
321            assert_eq!(data.1, &*sliceable.1);
322        });
323    }
324
325    #[mz_ore::test]
326    #[cfg_attr(miri, ignore)]
327    fn test_rows_splits() {
328        proptest!(|(data in vec(any::<crate::Row>(), 0..8), [a, b] in uniform(0usize..8))| {
329            let mut rows = Rows::builder(0, 0);
330            for row in &data {
331                rows.push(row.as_row_ref());
332            }
333            let rows = rows.build();
334
335            let mid = a.clamp(0, data.len());
336            let data = data.split_at(mid);
337            let rows = rows.split_at(mid);
338            assert!(data.0.iter().map(|r| r.as_row_ref()).eq(rows.0.iter()));
339            assert_eq!(rows.0.len(), mid);
340
341            let mid = b.clamp(0, data.0.len());
342            let data_0 = data.0.split_at(mid);
343            let rows_0 = rows.0.split_at(mid);
344            assert!(data_0.0.iter().map(|r| r.as_row_ref()).eq(rows_0.0.iter()));
345            assert_eq!(rows_0.0.len(), mid);
346            // assert_eq!(rows_0.0.len() + rows_0.1.len(), rows.0.len());
347
348            let mid = b.clamp(0, data.1.len());
349            let data_1 = data.1.split_at(mid);
350            let rows_1 = rows.1.split_at(mid);
351            assert!(data_1.0.iter().map(|r| r.as_row_ref()).eq(rows_1.0.iter()));
352            assert_eq!(rows_1.0.len(), mid);
353        });
354    }
355}