1use bytes::Bytes;
11use itertools::Itertools;
12use serde::{Deserialize, Deserializer, Serialize, Serializer};
13use std::ops::{Deref, Range};
14use std::sync::Arc;
15use timely::progress::Timestamp;
16
17use crate::{Diff, RowRef};
18
19#[derive(Debug, Clone)]
22pub struct SharedSlice<T> {
23 range: Range<usize>,
26 data: Arc<[T]>,
27}
28
29impl<T> SharedSlice<T> {
30 pub fn split_at(self, offset: usize) -> (Self, Self) {
32 let Self { range, data } = self;
33 assert!(offset <= range.len());
34 let offset = range.start + offset;
35 (
36 Self {
37 range: range.start..offset,
38 data: Arc::clone(&data),
39 },
40 Self {
41 range: offset..range.end,
42 data,
43 },
44 )
45 }
46}
47
48impl<T> Deref for SharedSlice<T> {
49 type Target = [T];
50
51 fn deref(&self) -> &Self::Target {
52 &self.data[self.range.clone()]
53 }
54}
55
56impl<T: Serialize> Serialize for SharedSlice<T> {
57 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
58 where
59 S: Serializer,
60 {
61 (**self).serialize(serializer)
62 }
63}
64
65impl<'de, T: Deserialize<'de>> Deserialize<'de> for SharedSlice<T> {
66 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
67 where
68 D: Deserializer<'de>,
69 {
70 let vec: Vec<T> = Deserialize::deserialize(deserializer)?;
71 Ok(vec.into())
72 }
73}
74
75impl<T: PartialEq> PartialEq for SharedSlice<T> {
76 fn eq(&self, other: &Self) -> bool {
77 **self == **other
78 }
79}
80
81impl<T: Eq> Eq for SharedSlice<T> {}
82
83impl<T> Default for SharedSlice<T> {
84 fn default() -> Self {
85 vec![].into()
86 }
87}
88
89impl<T> From<Vec<T>> for SharedSlice<T> {
90 fn from(data: Vec<T>) -> Self {
91 Self {
92 range: 0..data.len(),
93 data: data.into(),
94 }
95 }
96}
97
98#[derive(Debug)]
100pub struct RowsBuilder {
101 bytes: Vec<u8>,
102 run_ends: Vec<usize>,
103}
104
105impl RowsBuilder {
106 pub fn push(&mut self, row: &RowRef) {
107 self.bytes.extend(row.data());
108 self.run_ends.push(self.bytes.len());
109 }
110
111 pub fn build(self) -> Rows {
112 Rows {
113 bytes: self.bytes.into(),
114 run_start: 0,
115 run_ends: self.run_ends.into(),
116 }
117 }
118}
119
120#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
122pub struct Rows {
123 bytes: Bytes,
124 run_start: usize,
129 run_ends: SharedSlice<usize>,
130}
131
132impl Rows {
133 pub fn builder(byte_size_hint: usize, row_size_hint: usize) -> RowsBuilder {
134 RowsBuilder {
135 bytes: Vec::with_capacity(byte_size_hint),
136 run_ends: Vec::with_capacity(row_size_hint),
137 }
138 }
139 pub fn get(&self, index: usize) -> Option<&RowRef> {
140 if index >= self.run_ends.len() {
141 return None;
142 }
143 let lo = if index == 0 {
144 0
145 } else {
146 self.run_ends[index - 1] - self.run_start
147 };
148 let hi = self.run_ends[index] - self.run_start;
149 Some(unsafe { RowRef::from_slice(&self.bytes[lo..hi]) })
151 }
152
153 pub fn iter(&self) -> impl Iterator<Item = &RowRef> {
154 [0].into_iter()
155 .chain(self.run_ends.iter().map(|i| *i - self.run_start))
156 .tuple_windows()
157 .map(|(lo, hi)| {
158 unsafe { RowRef::from_slice(&self.bytes[lo..hi]) }
160 })
161 }
162
163 pub fn split_at(mut self, mid: usize) -> (Self, Self) {
164 let (run_ends_a, run_ends_b) = self.run_ends.split_at(mid);
165 let byte_mid = run_ends_a.last().map_or(0, |e| *e - self.run_start);
166 let bytes_b = self.bytes.split_off(byte_mid);
167 (
168 Self {
169 bytes: self.bytes,
170 run_start: self.run_start,
171 run_ends: run_ends_a,
172 },
173 Self {
174 bytes: bytes_b,
175 run_start: self.run_start + byte_mid,
176 run_ends: run_ends_b,
177 },
178 )
179 }
180
181 pub fn len(&self) -> usize {
182 self.run_ends.len()
183 }
184
185 pub fn byte_len(&self) -> usize {
186 self.bytes.len()
187 }
188}
189
190#[derive(Debug)]
191pub struct UpdateCollectionBuilder<T = crate::Timestamp> {
192 rows: RowsBuilder,
193 times: Vec<T>,
194 diffs: Vec<Diff>,
195}
196
197impl<T: Timestamp> UpdateCollectionBuilder<T> {
198 pub fn push(&mut self, (row, time, diff): (&RowRef, &T, Diff)) {
199 self.rows.push(row);
200 self.times.push(time.clone());
201 self.diffs.push(diff);
202 }
203
204 pub fn build(self) -> UpdateCollection<T> {
205 UpdateCollection {
206 rows: self.rows.build(),
207 times: self.times.into(),
208 diffs: self.diffs.into(),
209 }
210 }
211}
212
213#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
215pub struct UpdateCollection<T = crate::Timestamp> {
216 rows: Rows,
217 times: SharedSlice<T>,
218 diffs: SharedSlice<Diff>,
219}
220
221impl<T> Default for UpdateCollection<T> {
222 fn default() -> Self {
223 Self {
224 rows: Default::default(),
225 times: Default::default(),
226 diffs: Default::default(),
227 }
228 }
229}
230
231impl<'a, T: Timestamp> FromIterator<(&'a RowRef, &'a T, Diff)> for UpdateCollection<T> {
232 fn from_iter<I: IntoIterator<Item = (&'a RowRef, &'a T, Diff)>>(iter: I) -> Self {
233 let iter = iter.into_iter();
234 let len_hint = iter.size_hint().0;
235 let bytes_hint = len_hint * 8;
236 let mut builder = UpdateCollection::builder(bytes_hint, len_hint);
237 for row in iter {
238 builder.push(row);
239 }
240 builder.build()
241 }
242}
243
244impl<T> UpdateCollection<T> {
245 pub fn builder(byte_size_hint: usize, row_size_hint: usize) -> UpdateCollectionBuilder<T> {
246 UpdateCollectionBuilder {
247 rows: Rows::builder(byte_size_hint, row_size_hint),
248 times: Vec::with_capacity(row_size_hint),
249 diffs: Vec::with_capacity(row_size_hint),
250 }
251 }
252
253 pub fn get(&self, index: usize) -> Option<(&RowRef, &T, Diff)> {
254 Some((
255 self.rows.get(index)?,
256 self.times.get(index)?,
257 *self.diffs.get(index)?,
258 ))
259 }
260
261 pub fn iter(&self) -> impl Iterator<Item = (&RowRef, &T, Diff)> {
262 itertools::multizip((
263 self.rows.iter(),
264 self.times.iter(),
265 self.diffs.iter().cloned(),
266 ))
267 }
268
269 pub fn split_at(self, index: usize) -> (Self, Self) {
270 let (rows_a, rows_b) = self.rows.split_at(index);
271 let (times_a, times_b) = self.times.split_at(index);
272 let (diffs_a, diffs_b) = self.diffs.split_at(index);
273 (
274 Self {
275 rows: rows_a,
276 times: times_a,
277 diffs: diffs_a,
278 },
279 Self {
280 rows: rows_b,
281 times: times_b,
282 diffs: diffs_b,
283 },
284 )
285 }
286
287 pub fn times(&self) -> &[T] {
288 &*self.times
289 }
290
291 pub fn byte_len(&self) -> usize {
292 self.rows.byte_len()
293 }
294
295 pub fn len(&self) -> usize {
296 self.rows.len()
297 }
298}
299
300#[cfg(test)]
301mod tests {
302 use super::*;
303 use proptest::array::uniform;
304 use proptest::collection::vec;
305 use proptest::prelude::*;
306
307 #[mz_ore::test]
308 #[cfg_attr(miri, ignore)]
309 fn test_slice_splits() {
310 proptest!(|(data in vec(0u64..1000u64, 0..20), [a, b] in uniform(0usize..20))| {
311 let sliceable: SharedSlice<u64> = data.clone().into();
312 let mid = a.clamp(0, data.len());
313 let data = data.split_at(mid);
314 let sliceable = sliceable.split_at(mid);
315 assert_eq!(data.0, &*sliceable.0);
316 assert_eq!(data.1, &*sliceable.1);
317 let mid = b.clamp(0, data.0.len());
318 let data = data.0.split_at(mid);
319 let sliceable = sliceable.0.split_at(mid);
320 assert_eq!(data.0, &*sliceable.0);
321 assert_eq!(data.1, &*sliceable.1);
322 });
323 }
324
325 #[mz_ore::test]
326 #[cfg_attr(miri, ignore)]
327 fn test_rows_splits() {
328 proptest!(|(data in vec(any::<crate::Row>(), 0..8), [a, b] in uniform(0usize..8))| {
329 let mut rows = Rows::builder(0, 0);
330 for row in &data {
331 rows.push(row.as_row_ref());
332 }
333 let rows = rows.build();
334
335 let mid = a.clamp(0, data.len());
336 let data = data.split_at(mid);
337 let rows = rows.split_at(mid);
338 assert!(data.0.iter().map(|r| r.as_row_ref()).eq(rows.0.iter()));
339 assert_eq!(rows.0.len(), mid);
340
341 let mid = b.clamp(0, data.0.len());
342 let data_0 = data.0.split_at(mid);
343 let rows_0 = rows.0.split_at(mid);
344 assert!(data_0.0.iter().map(|r| r.as_row_ref()).eq(rows_0.0.iter()));
345 assert_eq!(rows_0.0.len(), mid);
346 let mid = b.clamp(0, data.1.len());
349 let data_1 = data.1.split_at(mid);
350 let rows_1 = rows.1.split_at(mid);
351 assert!(data_1.0.iter().map(|r| r.as_row_ref()).eq(rows_1.0.iter()));
352 assert_eq!(rows_1.0.len(), mid);
353 });
354 }
355}