mz_expr/row/
collection.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Defines types for working with collections of [`Row`].
11
12use std::cell::RefCell;
13use std::cmp::Reverse;
14use std::collections::BinaryHeap;
15use std::num::NonZeroUsize;
16use std::sync::Arc;
17
18use bytes::Bytes;
19use mz_repr::{DatumVec, IntoRowIterator, Row, RowIterator, RowRef};
20use serde::{Deserialize, Serialize};
21
22use crate::ColumnOrder;
23
24/// Collection of runs of sorted [`Row`]s represented as a single blob.
25///
26/// Note: the encoding format we use to represent [`Row`]s in this struct is
27/// not stable, and thus should never be persisted durably.
28#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
29pub struct RowCollection {
30    /// Contiguous blob of encoded Rows.
31    encoded: Bytes,
32    /// Metadata about an individual Row in the blob.
33    metadata: Arc<[EncodedRowMetadata]>,
34    /// Ends of non-empty, sorted runs of rows in index into `metadata`.
35    runs: Vec<usize>,
36}
37
38impl RowCollection {
39    /// Create a new [`RowCollection`] from a collection of [`Row`]s. Sorts data by `order_by`.
40    ///
41    /// Note that all row collections to be merged must be constructed with the same `order_by`
42    /// to ensure a consistent sort order. Anything else is undefined behavior.
43    // TODO: Remember the `order_by` and assert that it is the same for all collections.
44    pub fn new(mut rows: Vec<(Row, NonZeroUsize)>, order_by: &[ColumnOrder]) -> Self {
45        // Sort data to maintain sortedness invariants.
46        if order_by.is_empty() {
47            // Skip row decoding if not required.
48            rows.sort();
49        } else {
50            let (mut datum_vec1, mut datum_vec2) = (DatumVec::new(), DatumVec::new());
51            rows.sort_by(|(row1, _diff1), (row2, _diff2)| {
52                let borrow1 = datum_vec1.borrow_with(row1);
53                let borrow2 = datum_vec2.borrow_with(row2);
54                crate::compare_columns(order_by, &borrow1, &borrow2, || row1.cmp(row2))
55            });
56        }
57
58        // Pre-sizing our buffer should allow us to make just 1 allocation, and
59        // use the perfect amount of memory.
60        //
61        // Note(parkmycar): I didn't do any benchmarking to determine if this
62        // is faster, so feel free to change this if you'd like.
63        let encoded_size = rows.iter().map(|(row, _diff)| row.data_len()).sum();
64
65        let mut encoded = Vec::<u8>::with_capacity(encoded_size);
66        let mut metadata = Vec::<EncodedRowMetadata>::with_capacity(rows.len());
67        let runs = (!rows.is_empty())
68            .then(|| vec![rows.len()])
69            .unwrap_or_default();
70
71        for (row, diff) in rows {
72            encoded.extend(row.data());
73            metadata.push(EncodedRowMetadata {
74                offset: encoded.len(),
75                diff,
76            });
77        }
78
79        RowCollection {
80            encoded: Bytes::from(encoded),
81            metadata: metadata.into(),
82            runs,
83        }
84    }
85
86    /// Merge another [`RowCollection`] into `self`.
87    pub fn merge(&mut self, other: &RowCollection) {
88        if other.count(0, None) == 0 {
89            return;
90        }
91
92        // TODO(parkmycar): Using SegmentedBytes here would be nice.
93        let mut new_bytes = vec![0; self.encoded.len() + other.encoded.len()];
94        new_bytes[..self.encoded.len()].copy_from_slice(&self.encoded[..]);
95        new_bytes[self.encoded.len()..].copy_from_slice(&other.encoded[..]);
96
97        let mapped_metas = other.metadata.iter().map(|meta| EncodedRowMetadata {
98            offset: meta.offset + self.encoded.len(),
99            diff: meta.diff,
100        });
101        let self_len = self.metadata.len();
102
103        self.metadata = self.metadata.iter().cloned().chain(mapped_metas).collect();
104        self.encoded = Bytes::from(new_bytes);
105        self.runs.extend(other.runs.iter().map(|f| f + self_len));
106    }
107
108    /// Total count of [`Row`]s represented by this collection, considering a
109    /// possible `OFFSET` and `LIMIT`.
110    pub fn count(&self, offset: usize, limit: Option<usize>) -> usize {
111        let mut total: usize = self.metadata.iter().map(|meta| meta.diff.get()).sum();
112
113        // Consider a possible OFFSET.
114        total = total.saturating_sub(offset);
115
116        // Consider a possible LIMIT.
117        if let Some(limit) = limit {
118            total = std::cmp::min(limit, total);
119        }
120
121        total
122    }
123
124    /// Total count of ([`Row`], `EncodedRowMetadata`) pairs in this collection.
125    pub fn entries(&self) -> usize {
126        self.metadata.len()
127    }
128
129    /// Returns the number of bytes this [`RowCollection`] uses.
130    pub fn byte_len(&self) -> usize {
131        let row_data_size = self.encoded.len();
132        let metadata_size = self
133            .metadata
134            .len()
135            .saturating_mul(std::mem::size_of::<EncodedRowMetadata>());
136
137        row_data_size.saturating_add(metadata_size)
138    }
139
140    /// Returns a [`RowRef`] for the entry at `idx`, if one exists.
141    pub fn get(&self, idx: usize) -> Option<(&RowRef, &EncodedRowMetadata)> {
142        let (lower_offset, upper) = match idx {
143            0 => (0, self.metadata.get(idx)?),
144            _ => {
145                let lower = self.metadata.get(idx - 1).map(|m| m.offset)?;
146                let upper = self.metadata.get(idx)?;
147                (lower, upper)
148            }
149        };
150
151        let slice = &self.encoded[lower_offset..upper.offset];
152        let row = RowRef::from_slice(slice);
153
154        Some((row, upper))
155    }
156
157    /// "Sorts" the [`RowCollection`] by the column order in `order_by`. Returns a sorted view over
158    /// the collection.
159    pub fn sorted_view(self, order_by: &[ColumnOrder]) -> SortedRowCollection {
160        if order_by.is_empty() {
161            self.sorted_view_inner(&Ord::cmp)
162        } else {
163            let left_datum_vec = RefCell::new(mz_repr::DatumVec::new());
164            let right_datum_vec = RefCell::new(mz_repr::DatumVec::new());
165
166            let cmp = &|left: &RowRef, right: &RowRef| {
167                let (mut left_datum_vec, mut right_datum_vec) =
168                    (left_datum_vec.borrow_mut(), right_datum_vec.borrow_mut());
169                let left_datums = left_datum_vec.borrow_with(left);
170                let right_datums = right_datum_vec.borrow_with(right);
171                crate::compare_columns(order_by, &left_datums, &right_datums, || left.cmp(right))
172            };
173            self.sorted_view_inner(cmp)
174        }
175    }
176
177    fn sorted_view_inner<F>(self, cmp: &F) -> SortedRowCollection
178    where
179        F: Fn(&RowRef, &RowRef) -> std::cmp::Ordering,
180    {
181        let mut heap = BinaryHeap::with_capacity(self.runs.len());
182
183        for index in 0..self.runs.len() {
184            let start = (index > 0).then(|| self.runs[index - 1]).unwrap_or(0);
185            let end = self.runs[index];
186
187            heap.push(Reverse(RunIter {
188                collection: &self,
189                cmp,
190                range: start..end,
191            }));
192        }
193
194        let mut view = Vec::with_capacity(self.metadata.len());
195
196        while let Some(Reverse(mut run)) = heap.pop() {
197            if let Some(next) = run.range.next() {
198                view.push(next);
199                if !run.range.is_empty() {
200                    heap.push(Reverse(run));
201                }
202            }
203        }
204
205        SortedRowCollection {
206            collection: self,
207            sorted_view: view.into(),
208        }
209    }
210}
211
212/// Inner type of [`RowCollection`], describes a single Row.
213#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
214pub struct EncodedRowMetadata {
215    /// Offset into the binary blob of encoded rows.
216    ///
217    /// TODO(parkmycar): Consider making this a `u32`.
218    offset: usize,
219    /// Diff for the Row.
220    ///
221    /// TODO(parkmycar): Consider making this a smaller type, note that some compute introspection
222    /// collections, e.g. `mz_scheduling_elapsed_raw`, encodes nano seconds in the diff field which
223    /// requires a u64.
224    diff: NonZeroUsize,
225}
226
227/// Provides a sorted view of a [`RowCollection`].
228#[derive(Debug, Clone)]
229pub struct SortedRowCollection {
230    /// The inner [`RowCollection`].
231    collection: RowCollection,
232    /// Indexes into the inner collection that represent the sorted order.
233    sorted_view: Arc<[usize]>,
234}
235
236impl SortedRowCollection {
237    pub fn get(&self, idx: usize) -> Option<(&RowRef, &EncodedRowMetadata)> {
238        self.sorted_view
239            .get(idx)
240            .and_then(|inner_idx| self.collection.get(*inner_idx))
241    }
242}
243
244#[derive(Debug, Clone)]
245pub struct SortedRowCollectionIter {
246    /// Collection we're iterating over.
247    collection: SortedRowCollection,
248
249    /// Index for the row we're currently referencing.
250    row_idx: usize,
251    /// Number of diffs we've emitted for the current row.
252    diff_idx: usize,
253
254    /// Maximum number of rows this iterator will yield.
255    limit: Option<usize>,
256    /// Number of rows we're offset by.
257    ///
258    /// Note: We eagerly apply an offset, but we track it here so we can
259    /// accurately report [`RowIterator::count`].
260    offset: usize,
261
262    /// Columns to underlying rows to include.
263    projection: Option<Vec<usize>>,
264    /// Allocations that we reuse for every iteration to project columns.
265    projection_buf: (DatumVec, Row),
266}
267
268impl SortedRowCollectionIter {
269    /// Returns the inner `SortedRowCollection`.
270    pub fn into_inner(self) -> SortedRowCollection {
271        self.collection
272    }
273
274    /// Immediately applies an offset to this iterator.
275    pub fn apply_offset(mut self, offset: usize) -> SortedRowCollectionIter {
276        Self::advance_by(
277            &self.collection,
278            &mut self.row_idx,
279            &mut self.diff_idx,
280            offset,
281        );
282
283        // Keep track of how many rows we've offset by.
284        self.offset = self.offset.saturating_add(offset);
285
286        self
287    }
288
289    /// Sets the limit for this iterator.
290    pub fn with_limit(mut self, limit: usize) -> SortedRowCollectionIter {
291        self.limit = Some(limit);
292        self
293    }
294
295    /// Specify the columns that should be yielded.
296    pub fn with_projection(mut self, projection: Vec<usize>) -> SortedRowCollectionIter {
297        // Omit the projection if it would be a no-op to avoid a relatively expensive memcpy.
298        if let Some((row, _)) = self.collection.get(0) {
299            let cols = row.into_iter().enumerate().map(|(idx, _datum)| idx);
300            if projection.iter().copied().eq(cols) {
301                return self;
302            }
303        }
304
305        self.projection = Some(projection);
306        self
307    }
308
309    /// Helper method for implementing [`RowIterator`].
310    ///
311    /// Advances the internal pointers by the specified amount.
312    fn advance_by(
313        collection: &SortedRowCollection,
314        row_idx: &mut usize,
315        diff_idx: &mut usize,
316        mut count: usize,
317    ) {
318        while count > 0 {
319            let Some((_, row_meta)) = collection.get(*row_idx) else {
320                return;
321            };
322
323            let remaining_diff = row_meta.diff.get() - *diff_idx;
324            if remaining_diff <= count {
325                *diff_idx = 0;
326                *row_idx += 1;
327                count -= remaining_diff;
328            } else {
329                *diff_idx += count;
330                count = 0;
331            }
332        }
333    }
334
335    /// Helper method for implementing [`RowIterator`].
336    ///
337    /// Projects columns for the provided `row`.
338    fn project<'a>(
339        projection: Option<&[usize]>,
340        row: &'a RowRef,
341        datum_buf: &'a mut DatumVec,
342        row_buf: &'a mut Row,
343    ) -> &'a RowRef {
344        if let Some(projection) = projection {
345            // Copy the required columns into our reusable buffer.
346            {
347                let datums = datum_buf.borrow_with(row);
348                row_buf
349                    .packer()
350                    .extend(projection.iter().map(|i| &datums[*i]));
351            }
352
353            row_buf
354        } else {
355            row
356        }
357    }
358}
359
360impl RowIterator for SortedRowCollectionIter {
361    fn next(&mut self) -> Option<&RowRef> {
362        // Bail if we've reached our limit.
363        if let Some(0) = self.limit {
364            return None;
365        }
366
367        let row = self.collection.get(self.row_idx).map(|(r, _)| r)?;
368
369        // If we're about to yield a row, then subtract from our limit.
370        if let Some(limit) = &mut self.limit {
371            *limit = limit.saturating_sub(1);
372        }
373
374        // Advance to the next row.
375        Self::advance_by(&self.collection, &mut self.row_idx, &mut self.diff_idx, 1);
376
377        // Project away and/or re-order any columns.
378        let (datum_buf, row_buf) = &mut self.projection_buf;
379        Some(Self::project(
380            self.projection.as_deref(),
381            row,
382            datum_buf,
383            row_buf,
384        ))
385    }
386
387    fn peek(&mut self) -> Option<&RowRef> {
388        // Bail if we've reached our limit.
389        if let Some(0) = self.limit {
390            return None;
391        }
392
393        let row = self.collection.get(self.row_idx).map(|(r, _)| r)?;
394
395        // Note: Unlike `next()` we do not subtract from our limit, nor advance
396        // the internal pointers.
397
398        // Project away and/or re-order any columns.
399        let (datum_buf, row_buf) = &mut self.projection_buf;
400        Some(Self::project(
401            self.projection.as_deref(),
402            row,
403            datum_buf,
404            row_buf,
405        ))
406    }
407
408    fn count(&self) -> usize {
409        self.collection.collection.count(self.offset, self.limit)
410    }
411
412    fn box_clone(&self) -> Box<dyn RowIterator> {
413        Box::new(self.clone())
414    }
415}
416
417impl IntoRowIterator for SortedRowCollection {
418    type Iter = SortedRowCollectionIter;
419
420    fn into_row_iter(self) -> Self::Iter {
421        SortedRowCollectionIter {
422            collection: self,
423            row_idx: 0,
424            diff_idx: 0,
425            limit: None,
426            offset: 0,
427            projection: None,
428            // Note: Neither of these types allocate until elements are pushed in.
429            projection_buf: (DatumVec::new(), Row::default()),
430        }
431    }
432}
433
434/// Iterator-like struct to help with extracting rows in sorted order from `RowCollection`.
435struct RunIter<'a, F> {
436    collection: &'a RowCollection,
437    cmp: &'a F,
438    range: std::ops::Range<usize>,
439}
440
441impl<'a, F> PartialOrd for RunIter<'a, F>
442where
443    F: Fn(&RowRef, &RowRef) -> std::cmp::Ordering,
444{
445    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
446        Some(self.cmp(other))
447    }
448}
449
450impl<'a, F> Ord for RunIter<'a, F>
451where
452    F: Fn(&RowRef, &RowRef) -> std::cmp::Ordering,
453{
454    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
455        let left = self.collection.get(self.range.start).unwrap().0;
456        let right = self.collection.get(other.range.start).unwrap().0;
457        (self.cmp)(left, right)
458    }
459}
460
461impl<'a, F> PartialEq for RunIter<'a, F>
462where
463    F: Fn(&RowRef, &RowRef) -> std::cmp::Ordering,
464{
465    fn eq(&self, other: &Self) -> bool {
466        self.cmp(other) == std::cmp::Ordering::Equal
467    }
468}
469
470impl<'a, F> Eq for RunIter<'a, F> where F: Fn(&RowRef, &RowRef) -> std::cmp::Ordering {}
471
472#[cfg(test)]
473mod tests {
474    use std::borrow::Borrow;
475
476    use mz_ore::assert_none;
477    use mz_repr::Datum;
478    use proptest::prelude::*;
479    use proptest::test_runner::Config;
480
481    use super::*;
482
483    impl<'a, T: IntoIterator<Item = &'a Row>> From<T> for RowCollection {
484        fn from(rows: T) -> Self {
485            let mut encoded = Vec::<u8>::new();
486            let mut metadata = Vec::<EncodedRowMetadata>::new();
487
488            for row in rows {
489                encoded.extend(row.data());
490                metadata.push(EncodedRowMetadata {
491                    offset: encoded.len(),
492                    diff: NonZeroUsize::MIN,
493                });
494            }
495            let runs = vec![metadata.len()];
496
497            RowCollection {
498                encoded: Bytes::from(encoded),
499                metadata: metadata.into(),
500                runs,
501            }
502        }
503    }
504
505    #[mz_ore::test]
506    fn test_row_collection() {
507        let a = Row::pack_slice(&[Datum::False, Datum::String("hello world"), Datum::Int16(42)]);
508        let b = Row::pack_slice(&[Datum::MzTimestamp(mz_repr::Timestamp::new(10))]);
509
510        let collection = RowCollection::from([&a, &b]);
511
512        let (a_rnd, _) = collection.get(0).unwrap();
513        assert_eq!(a_rnd, a.borrow());
514
515        let (b_rnd, _) = collection.get(1).unwrap();
516        assert_eq!(b_rnd, b.borrow());
517    }
518
519    #[mz_ore::test]
520    fn test_merge() {
521        let a = Row::pack_slice(&[Datum::False, Datum::String("hello world"), Datum::Int16(42)]);
522        let b = Row::pack_slice(&[Datum::MzTimestamp(mz_repr::Timestamp::new(10))]);
523
524        let mut a_col = RowCollection::from([&a]);
525        let b_col = RowCollection::from([&b]);
526
527        a_col.merge(&b_col);
528
529        assert_eq!(a_col.count(0, None), 2);
530        assert_eq!(a_col.get(0).map(|(r, _)| r), Some(a.borrow()));
531        assert_eq!(a_col.get(1).map(|(r, _)| r), Some(b.borrow()));
532    }
533
534    #[mz_ore::test]
535    fn test_sort() {
536        let a = Row::pack_slice(&[Datum::False, Datum::String("hello world"), Datum::Int16(42)]);
537        let b = Row::pack_slice(&[Datum::MzTimestamp(mz_repr::Timestamp::new(10))]);
538        let c = Row::pack_slice(&[Datum::True, Datum::String("hello world"), Datum::Int16(42)]);
539        let d = Row::pack_slice(&[Datum::MzTimestamp(mz_repr::Timestamp::new(9))]);
540
541        let col = {
542            let mut part = [&a, &b];
543            part.sort_by(|a, b| a.cmp(b));
544            let mut part1 = RowCollection::from(part);
545            let mut part = [&c, &d];
546            part.sort_by(|a, b| a.cmp(b));
547            let part2 = RowCollection::from(part);
548            part1.merge(&part2);
549            part1
550        };
551        let mut rows = [a, b, c, d];
552
553        let sorted_view = col.sorted_view(&[]);
554        rows.sort_by(|a, b| a.cmp(b));
555
556        for i in 0..rows.len() {
557            let (row_x, _) = sorted_view.get(i).unwrap();
558            let row_y = rows.get(i).unwrap();
559
560            assert_eq!(row_x, row_y.borrow());
561        }
562    }
563
564    #[mz_ore::test]
565    fn test_sorted_iter() {
566        let a = Row::pack_slice(&[Datum::String("hello world")]);
567        let b = Row::pack_slice(&[Datum::UInt32(42)]);
568        let mut col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(3).unwrap())], &[]);
569        col.merge(&RowCollection::new(
570            vec![(b.clone(), NonZeroUsize::new(2).unwrap())],
571            &[],
572        ));
573        let col = col.sorted_view(&[]);
574        let mut iter = col.into_row_iter();
575
576        // Peek shouldn't advance the iterator.
577        assert_eq!(iter.peek(), Some(b.borrow()));
578
579        assert_eq!(iter.next(), Some(b.borrow()));
580        assert_eq!(iter.next(), Some(b.borrow()));
581        assert_eq!(iter.next(), Some(a.borrow()));
582        assert_eq!(iter.next(), Some(a.borrow()));
583        assert_eq!(iter.next(), Some(a.borrow()));
584        assert_eq!(iter.next(), None);
585
586        // For good measure make sure we don't panic.
587        assert_eq!(iter.next(), None);
588        assert_eq!(iter.peek(), None);
589    }
590
591    #[mz_ore::test]
592    fn test_sorted_iter_offset() {
593        let a = Row::pack_slice(&[Datum::String("hello world")]);
594        let b = Row::pack_slice(&[Datum::UInt32(42)]);
595        let mut col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(3).unwrap())], &[]);
596        col.merge(&RowCollection::new(
597            vec![(b.clone(), NonZeroUsize::new(2).unwrap())],
598            &[],
599        ));
600        let col = col.sorted_view(&[]);
601
602        // Test with a reasonable offset that does not span rows.
603        let mut iter = col.into_row_iter().apply_offset(1);
604        assert_eq!(iter.next(), Some(b.borrow()));
605        assert_eq!(iter.next(), Some(a.borrow()));
606        assert_eq!(iter.next(), Some(a.borrow()));
607        assert_eq!(iter.next(), Some(a.borrow()));
608        assert_eq!(iter.next(), None);
609        assert_eq!(iter.next(), None);
610
611        let col = iter.into_inner();
612
613        // Test with an offset that spans the first row.
614        let mut iter = col.into_row_iter().apply_offset(3);
615
616        assert_eq!(iter.peek(), Some(a.borrow()));
617
618        assert_eq!(iter.next(), Some(a.borrow()));
619        assert_eq!(iter.next(), Some(a.borrow()));
620        assert_eq!(iter.next(), None);
621        assert_eq!(iter.next(), None);
622
623        let col = iter.into_inner();
624
625        // Test with an offset that passes the entire collection.
626        let mut iter = col.into_row_iter().apply_offset(100);
627        assert_eq!(iter.peek(), None);
628        assert_eq!(iter.next(), None);
629        assert_eq!(iter.peek(), None);
630        assert_eq!(iter.next(), None);
631    }
632
633    #[mz_ore::test]
634    fn test_sorted_iter_limit() {
635        let a = Row::pack_slice(&[Datum::String("hello world")]);
636        let b = Row::pack_slice(&[Datum::UInt32(42)]);
637        let mut col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(3).unwrap())], &[]);
638        col.merge(&RowCollection::new(
639            vec![(b.clone(), NonZeroUsize::new(2).unwrap())],
640            &[],
641        ));
642        let col = col.sorted_view(&[]);
643
644        // Test with a limit that spans only the first row.
645        let mut iter = col.into_row_iter().with_limit(1);
646        assert_eq!(iter.next(), Some(b.borrow()));
647        assert_eq!(iter.next(), None);
648        assert_eq!(iter.next(), None);
649
650        let col = iter.into_inner();
651
652        // Test with a limit that spans both rows.
653        let mut iter = col.into_row_iter().with_limit(4);
654        assert_eq!(iter.peek(), Some(b.borrow()));
655        assert_eq!(iter.next(), Some(b.borrow()));
656        assert_eq!(iter.next(), Some(b.borrow()));
657
658        assert_eq!(iter.peek(), Some(a.borrow()));
659        assert_eq!(iter.next(), Some(a.borrow()));
660        assert_eq!(iter.next(), Some(a.borrow()));
661
662        assert_eq!(iter.next(), None);
663        assert_eq!(iter.next(), None);
664
665        let col = iter.into_inner();
666
667        // Test with a limit that is more rows than we have.
668        let mut iter = col.into_row_iter().with_limit(1000);
669        assert_eq!(iter.next(), Some(b.borrow()));
670        assert_eq!(iter.next(), Some(b.borrow()));
671        assert_eq!(iter.next(), Some(a.borrow()));
672        assert_eq!(iter.next(), Some(a.borrow()));
673        assert_eq!(iter.next(), Some(a.borrow()));
674        assert_eq!(iter.next(), None);
675        assert_eq!(iter.next(), None);
676
677        let col = iter.into_inner();
678
679        // Test with a limit of 0.
680        let mut iter = col.into_row_iter().with_limit(0);
681        assert_eq!(iter.peek(), None);
682        assert_eq!(iter.next(), None);
683        assert_eq!(iter.next(), None);
684    }
685
686    #[mz_ore::test]
687    fn test_mapped_row_iterator() {
688        let a = Row::pack_slice(&[Datum::String("hello world")]);
689        let col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(3).unwrap())], &[]);
690        let col = col.sorted_view(&[]);
691
692        // Make sure we can call `.map` on a `dyn RowIterator`.
693        let iter: Box<dyn RowIterator> = Box::new(col.into_row_iter());
694
695        let mut mapped = iter.map(|f| f.to_owned());
696        assert!(mapped.next().is_some());
697        assert!(mapped.next().is_some());
698        assert!(mapped.next().is_some());
699        assert_none!(mapped.next());
700        assert_none!(mapped.next());
701    }
702
703    #[mz_ore::test]
704    fn test_projected_row_iterator() {
705        let a = Row::pack_slice(&[Datum::String("hello world"), Datum::Int16(42)]);
706        let col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(2).unwrap())], &[]);
707        let col = col.sorted_view(&[]);
708
709        // Project away the first column.
710        let mut iter = col.into_row_iter().with_projection(vec![1]);
711
712        let projected_a = Row::pack_slice(&[Datum::Int16(42)]);
713        assert_eq!(iter.next(), Some(projected_a.as_ref()));
714        assert_eq!(iter.next(), Some(projected_a.as_ref()));
715        assert_eq!(iter.next(), None);
716        assert_eq!(iter.next(), None);
717
718        let col = iter.into_inner();
719
720        // Project away all columns.
721        let mut iter = col.into_row_iter().with_projection(vec![]);
722
723        let projected_a = Row::default();
724        assert_eq!(iter.next(), Some(projected_a.as_ref()));
725        assert_eq!(iter.next(), Some(projected_a.as_ref()));
726        assert_eq!(iter.next(), None);
727        assert_eq!(iter.next(), None);
728
729        let col = iter.into_inner();
730
731        // Include all columns.
732        let mut iter = col.into_row_iter().with_projection(vec![0, 1]);
733
734        assert_eq!(iter.next(), Some(a.as_ref()));
735        assert_eq!(iter.next(), Some(a.as_ref()));
736        assert_eq!(iter.next(), None);
737        assert_eq!(iter.next(), None);
738
739        let col = iter.into_inner();
740
741        // Swap the order of columns.
742        let mut iter = col.into_row_iter().with_projection(vec![1, 0]);
743
744        let projected_a = Row::pack_slice(&[Datum::Int16(42), Datum::String("hello world")]);
745        assert_eq!(iter.next(), Some(projected_a.as_ref()));
746        assert_eq!(iter.next(), Some(projected_a.as_ref()));
747        assert_eq!(iter.next(), None);
748        assert_eq!(iter.next(), None);
749    }
750
751    #[mz_ore::test]
752    fn test_count_respects_limit_and_offset() {
753        let a = Row::pack_slice(&[Datum::String("hello world")]);
754        let b = Row::pack_slice(&[Datum::UInt32(42)]);
755        let col = RowCollection::new(
756            vec![
757                (a.clone(), NonZeroUsize::new(3).unwrap()),
758                (b.clone(), NonZeroUsize::new(2).unwrap()),
759            ],
760            &[],
761        );
762        let col = col.sorted_view(&[]);
763
764        // How many total rows there are.
765        let iter = col.into_row_iter();
766        assert_eq!(iter.count(), 5);
767
768        let col = iter.into_inner();
769
770        // With a LIMIT.
771        let iter = col.into_row_iter().with_limit(1);
772        assert_eq!(iter.count(), 1);
773
774        let col = iter.into_inner();
775
776        // With a LIMIT larger than the total number of rows.
777        let iter = col.into_row_iter().with_limit(100);
778        assert_eq!(iter.count(), 5);
779
780        let col = iter.into_inner();
781
782        // With an OFFSET.
783        let iter = col.into_row_iter().apply_offset(3);
784        assert_eq!(iter.count(), 2);
785
786        let col = iter.into_inner();
787
788        // With an OFFSET greater than the total number of rows.
789        let iter = col.into_row_iter().apply_offset(100);
790        assert_eq!(iter.count(), 0);
791
792        let col = iter.into_inner();
793
794        // With a LIMIT and an OFFSET.
795        let iter = col.into_row_iter().with_limit(2).apply_offset(4);
796        assert_eq!(iter.count(), 1);
797    }
798
799    #[mz_ore::test]
800    #[cfg_attr(miri, ignore)] // too slow
801    fn proptest_row_collection() {
802        fn row_collection_roundtrips(rows: Vec<Row>) {
803            let collection = RowCollection::from(&rows);
804
805            for i in 0..rows.len() {
806                let (a, _) = collection.get(i).unwrap();
807                let b = rows.get(i).unwrap().borrow();
808
809                assert_eq!(a, b);
810            }
811        }
812
813        // This test is slow, so we limit the default number of test cases.
814        proptest!(
815            Config { cases: 5, ..Default::default() },
816            |(rows in any::<Vec<Row>>())| {
817                // The proptest! macro interferes with rustfmt.
818                row_collection_roundtrips(rows)
819            }
820        );
821    }
822
823    #[mz_ore::test]
824    #[cfg_attr(miri, ignore)] // too slow
825    fn proptest_merge() {
826        fn row_collection_merge(a: Vec<Row>, b: Vec<Row>) {
827            let mut a_col = RowCollection::from(&a);
828            let b_col = RowCollection::from(&b);
829
830            a_col.merge(&b_col);
831
832            let all_rows = a.iter().chain(b.iter());
833            for (idx, row) in all_rows.enumerate() {
834                let (col_row, _) = a_col.get(idx).unwrap();
835                assert_eq!(col_row, row.borrow());
836            }
837        }
838
839        // This test is slow, so we limit the default number of test cases.
840        proptest!(
841            Config { cases: 3, ..Default::default() },
842            |(a in any::<Vec<Row>>(), b in any::<Vec<Row>>())| {
843                // The proptest! macro interferes with rustfmt.
844                row_collection_merge(a, b)
845            }
846        );
847    }
848
849    #[mz_ore::test]
850    #[cfg_attr(miri, ignore)] // too slow
851    fn proptest_sort() {
852        fn row_collection_sort(mut a: Vec<Row>, mut b: Vec<Row>) {
853            a.sort_by(|a, b| a.cmp(b));
854            b.sort_by(|a, b| a.cmp(b));
855            let mut col = RowCollection::from(&a);
856            col.merge(&RowCollection::from(&b));
857
858            let sorted_view = col.sorted_view(&[]);
859
860            a.append(&mut b);
861            a.sort_by(|a, b| a.cmp(b));
862
863            for i in 0..a.len() {
864                let (row_x, _) = sorted_view.get(i).unwrap();
865                let row_y = a.get(i).unwrap();
866
867                assert_eq!(row_x, row_y.borrow());
868            }
869        }
870
871        // This test is slow, so we limit the default number of test cases.
872        proptest!(
873            Config { cases: 5, ..Default::default() },
874            |(a in any::<Vec<Row>>(), b in any::<Vec<Row>>())| {
875                // The proptest! macro interferes with rustfmt.
876                row_collection_sort(a, b)
877            }
878        );
879    }
880}