Skip to main content

mz_expr/row/
collection.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Defines types for working with collections of [`Row`].
11
12use std::cell::RefCell;
13use std::cmp::Reverse;
14use std::collections::BinaryHeap;
15use std::num::NonZeroUsize;
16use std::sync::Arc;
17
18use bytes::Bytes;
19use mz_repr::{DatumVec, IntoRowIterator, Row, RowIterator, RowRef};
20use serde::{Deserialize, Serialize};
21
22use crate::ColumnOrder;
23
24/// Collection of runs of sorted [`Row`]s represented as a single blob.
25///
26/// Note: the encoding format we use to represent [`Row`]s in this struct is
27/// not stable, and thus should never be persisted durably.
28#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
29pub struct RowCollection {
30    /// Contiguous blob of encoded Rows.
31    encoded: Bytes,
32    /// Metadata about an individual Row in the blob.
33    metadata: Arc<[EncodedRowMetadata]>,
34    /// Ends of non-empty, sorted runs of rows in index into `metadata`.
35    runs: Vec<usize>,
36}
37
38impl RowCollection {
39    /// Create a new [`RowCollection`] from a collection of [`Row`]s. Sorts data by `order_by`.
40    ///
41    /// Note that all row collections to be merged must be constructed with the same `order_by`
42    /// to ensure a consistent sort order. Anything else is undefined behavior.
43    // TODO: Remember the `order_by` and assert that it is the same for all collections.
44    pub fn new(mut rows: Vec<(Row, NonZeroUsize)>, order_by: &[ColumnOrder]) -> Self {
45        // Sort data to maintain sortedness invariants.
46        if order_by.is_empty() {
47            // Skip row decoding if not required.
48            rows.sort();
49        } else {
50            let (mut datum_vec1, mut datum_vec2) = (DatumVec::new(), DatumVec::new());
51            rows.sort_by(|(row1, _diff1), (row2, _diff2)| {
52                let borrow1 = datum_vec1.borrow_with(row1);
53                let borrow2 = datum_vec2.borrow_with(row2);
54                crate::compare_columns(order_by, &borrow1, &borrow2, || row1.cmp(row2))
55            });
56        }
57
58        // Pre-sizing our buffer should allow us to make just 1 allocation, and
59        // use the perfect amount of memory.
60        //
61        // Note(parkmycar): I didn't do any benchmarking to determine if this
62        // is faster, so feel free to change this if you'd like.
63        let encoded_size = rows.iter().map(|(row, _diff)| row.data_len()).sum();
64
65        let mut encoded = Vec::<u8>::with_capacity(encoded_size);
66        let mut metadata = Vec::<EncodedRowMetadata>::with_capacity(rows.len());
67        let runs = (!rows.is_empty())
68            .then(|| vec![rows.len()])
69            .unwrap_or_default();
70
71        for (row, diff) in rows {
72            encoded.extend(row.data());
73            metadata.push(EncodedRowMetadata {
74                offset: encoded.len(),
75                diff,
76            });
77        }
78
79        RowCollection {
80            encoded: Bytes::from(encoded),
81            metadata: metadata.into(),
82            runs,
83        }
84    }
85
86    /// Merge another [`RowCollection`] into `self`.
87    pub fn merge(&mut self, other: &RowCollection) {
88        if other.count(0, None) == 0 {
89            return;
90        }
91
92        // TODO(parkmycar): Using SegmentedBytes here would be nice.
93        let mut new_bytes = vec![0; self.encoded.len() + other.encoded.len()];
94        new_bytes[..self.encoded.len()].copy_from_slice(&self.encoded[..]);
95        new_bytes[self.encoded.len()..].copy_from_slice(&other.encoded[..]);
96
97        let mapped_metas = other.metadata.iter().map(|meta| EncodedRowMetadata {
98            offset: meta.offset + self.encoded.len(),
99            diff: meta.diff,
100        });
101        let self_len = self.metadata.len();
102
103        self.metadata = self.metadata.iter().cloned().chain(mapped_metas).collect();
104        self.encoded = Bytes::from(new_bytes);
105        self.runs.extend(other.runs.iter().map(|f| f + self_len));
106    }
107
108    /// Total count of [`Row`]s represented by this collection, considering a
109    /// possible `OFFSET` and `LIMIT`.
110    pub fn count(&self, offset: usize, limit: Option<usize>) -> usize {
111        let mut total: usize = self.metadata.iter().map(|meta| meta.diff.get()).sum();
112
113        // Consider a possible OFFSET.
114        total = total.saturating_sub(offset);
115
116        // Consider a possible LIMIT.
117        if let Some(limit) = limit {
118            total = std::cmp::min(limit, total);
119        }
120
121        total
122    }
123
124    /// Total count of ([`Row`], `EncodedRowMetadata`) pairs in this collection.
125    pub fn entries(&self) -> usize {
126        self.metadata.len()
127    }
128
129    /// Returns the number of bytes this [`RowCollection`] uses.
130    pub fn byte_len(&self) -> usize {
131        let row_data_size = self.encoded.len();
132        let metadata_size = self
133            .metadata
134            .len()
135            .saturating_mul(std::mem::size_of::<EncodedRowMetadata>());
136
137        row_data_size.saturating_add(metadata_size)
138    }
139
140    /// Returns a [`RowRef`] for the entry at `idx`, if one exists.
141    pub fn get(&self, idx: usize) -> Option<(&RowRef, &EncodedRowMetadata)> {
142        let (lower_offset, upper) = match idx {
143            0 => (0, self.metadata.get(idx)?),
144            _ => {
145                let lower = self.metadata.get(idx - 1).map(|m| m.offset)?;
146                let upper = self.metadata.get(idx)?;
147                (lower, upper)
148            }
149        };
150
151        let slice = &self.encoded[lower_offset..upper.offset];
152        // SAFETY: self.encoded contains only valid row data, and the metadata delimits only ranges
153        // that correspond to the original rows.
154        let row = unsafe { RowRef::from_slice(slice) };
155
156        Some((row, upper))
157    }
158
159    /// "Sorts" the [`RowCollection`] by the column order in `order_by`. Returns a sorted view over
160    /// the collection.
161    pub fn sorted_view(self, order_by: &[ColumnOrder]) -> SortedRowCollection {
162        if order_by.is_empty() {
163            self.sorted_view_inner(&Ord::cmp)
164        } else {
165            let left_datum_vec = RefCell::new(mz_repr::DatumVec::new());
166            let right_datum_vec = RefCell::new(mz_repr::DatumVec::new());
167
168            let cmp = &|left: &RowRef, right: &RowRef| {
169                let (mut left_datum_vec, mut right_datum_vec) =
170                    (left_datum_vec.borrow_mut(), right_datum_vec.borrow_mut());
171                let left_datums = left_datum_vec.borrow_with(left);
172                let right_datums = right_datum_vec.borrow_with(right);
173                crate::compare_columns(order_by, &left_datums, &right_datums, || left.cmp(right))
174            };
175            self.sorted_view_inner(cmp)
176        }
177    }
178
179    fn sorted_view_inner<F>(self, cmp: &F) -> SortedRowCollection
180    where
181        F: Fn(&RowRef, &RowRef) -> std::cmp::Ordering,
182    {
183        let mut heap = BinaryHeap::with_capacity(self.runs.len());
184
185        for index in 0..self.runs.len() {
186            let start = (index > 0).then(|| self.runs[index - 1]).unwrap_or(0);
187            let end = self.runs[index];
188
189            heap.push(Reverse(RunIter {
190                collection: &self,
191                cmp,
192                range: start..end,
193            }));
194        }
195
196        let mut view = Vec::with_capacity(self.metadata.len());
197
198        while let Some(Reverse(mut run)) = heap.pop() {
199            if let Some(next) = run.range.next() {
200                view.push(next);
201                if !run.range.is_empty() {
202                    heap.push(Reverse(run));
203                }
204            }
205        }
206
207        SortedRowCollection {
208            collection: self,
209            sorted_view: view.into(),
210        }
211    }
212}
213
214/// Inner type of [`RowCollection`], describes a single Row.
215#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
216pub struct EncodedRowMetadata {
217    /// Offset into the binary blob of encoded rows.
218    ///
219    /// TODO(parkmycar): Consider making this a `u32`.
220    offset: usize,
221    /// Diff for the Row.
222    ///
223    /// TODO(parkmycar): Consider making this a smaller type, note that some compute introspection
224    /// collections, e.g. `mz_scheduling_elapsed_raw`, encodes nano seconds in the diff field which
225    /// requires a u64.
226    diff: NonZeroUsize,
227}
228
229/// Provides a sorted view of a [`RowCollection`].
230#[derive(Debug, Clone)]
231pub struct SortedRowCollection {
232    /// The inner [`RowCollection`].
233    collection: RowCollection,
234    /// Indexes into the inner collection that represent the sorted order.
235    sorted_view: Arc<[usize]>,
236}
237
238impl SortedRowCollection {
239    pub fn get(&self, idx: usize) -> Option<(&RowRef, &EncodedRowMetadata)> {
240        self.sorted_view
241            .get(idx)
242            .and_then(|inner_idx| self.collection.get(*inner_idx))
243    }
244}
245
246#[derive(Debug, Clone)]
247pub struct SortedRowCollectionIter {
248    /// Collection we're iterating over.
249    collection: SortedRowCollection,
250
251    /// Index for the row we're currently referencing.
252    row_idx: usize,
253    /// Number of diffs we've emitted for the current row.
254    diff_idx: usize,
255
256    /// Maximum number of rows this iterator will yield.
257    limit: Option<usize>,
258    /// Number of rows we're offset by.
259    ///
260    /// Note: We eagerly apply an offset, but we track it here so we can
261    /// accurately report [`RowIterator::count`].
262    offset: usize,
263
264    /// Columns to underlying rows to include.
265    projection: Option<Vec<usize>>,
266    /// Allocations that we reuse for every iteration to project columns.
267    projection_buf: (DatumVec, Row),
268}
269
270impl SortedRowCollectionIter {
271    /// Returns the inner `SortedRowCollection`.
272    pub fn into_inner(self) -> SortedRowCollection {
273        self.collection
274    }
275
276    /// Immediately applies an offset to this iterator.
277    pub fn apply_offset(mut self, offset: usize) -> SortedRowCollectionIter {
278        Self::advance_by(
279            &self.collection,
280            &mut self.row_idx,
281            &mut self.diff_idx,
282            offset,
283        );
284
285        // Keep track of how many rows we've offset by.
286        self.offset = self.offset.saturating_add(offset);
287
288        self
289    }
290
291    /// Sets the limit for this iterator.
292    pub fn with_limit(mut self, limit: usize) -> SortedRowCollectionIter {
293        self.limit = Some(limit);
294        self
295    }
296
297    /// Specify the columns that should be yielded.
298    pub fn with_projection(mut self, projection: Vec<usize>) -> SortedRowCollectionIter {
299        // Omit the projection if it would be a no-op to avoid a relatively expensive memcpy.
300        if let Some((row, _)) = self.collection.get(0) {
301            let cols = row.into_iter().enumerate().map(|(idx, _datum)| idx);
302            if projection.iter().copied().eq(cols) {
303                return self;
304            }
305        }
306
307        self.projection = Some(projection);
308        self
309    }
310
311    /// Helper method for implementing [`RowIterator`].
312    ///
313    /// Advances the internal pointers by the specified amount.
314    fn advance_by(
315        collection: &SortedRowCollection,
316        row_idx: &mut usize,
317        diff_idx: &mut usize,
318        mut count: usize,
319    ) {
320        while count > 0 {
321            let Some((_, row_meta)) = collection.get(*row_idx) else {
322                return;
323            };
324
325            let remaining_diff = row_meta.diff.get() - *diff_idx;
326            if remaining_diff <= count {
327                *diff_idx = 0;
328                *row_idx += 1;
329                count -= remaining_diff;
330            } else {
331                *diff_idx += count;
332                count = 0;
333            }
334        }
335    }
336
337    /// Helper method for implementing [`RowIterator`].
338    ///
339    /// Projects columns for the provided `row`.
340    fn project<'a>(
341        projection: Option<&[usize]>,
342        row: &'a RowRef,
343        datum_buf: &'a mut DatumVec,
344        row_buf: &'a mut Row,
345    ) -> &'a RowRef {
346        if let Some(projection) = projection {
347            // Copy the required columns into our reusable buffer.
348            {
349                let datums = datum_buf.borrow_with(row);
350                row_buf
351                    .packer()
352                    .extend(projection.iter().map(|i| &datums[*i]));
353            }
354
355            row_buf
356        } else {
357            row
358        }
359    }
360}
361
362impl RowIterator for SortedRowCollectionIter {
363    fn next(&mut self) -> Option<&RowRef> {
364        // Bail if we've reached our limit.
365        if let Some(0) = self.limit {
366            return None;
367        }
368
369        let row = self.collection.get(self.row_idx).map(|(r, _)| r)?;
370
371        // If we're about to yield a row, then subtract from our limit.
372        if let Some(limit) = &mut self.limit {
373            *limit = limit.saturating_sub(1);
374        }
375
376        // Advance to the next row.
377        Self::advance_by(&self.collection, &mut self.row_idx, &mut self.diff_idx, 1);
378
379        // Project away and/or re-order any columns.
380        let (datum_buf, row_buf) = &mut self.projection_buf;
381        Some(Self::project(
382            self.projection.as_deref(),
383            row,
384            datum_buf,
385            row_buf,
386        ))
387    }
388
389    fn peek(&mut self) -> Option<&RowRef> {
390        // Bail if we've reached our limit.
391        if let Some(0) = self.limit {
392            return None;
393        }
394
395        let row = self.collection.get(self.row_idx).map(|(r, _)| r)?;
396
397        // Note: Unlike `next()` we do not subtract from our limit, nor advance
398        // the internal pointers.
399
400        // Project away and/or re-order any columns.
401        let (datum_buf, row_buf) = &mut self.projection_buf;
402        Some(Self::project(
403            self.projection.as_deref(),
404            row,
405            datum_buf,
406            row_buf,
407        ))
408    }
409
410    fn count(&self) -> usize {
411        self.collection.collection.count(self.offset, self.limit)
412    }
413
414    fn box_clone(&self) -> Box<dyn RowIterator> {
415        Box::new(self.clone())
416    }
417}
418
419impl IntoRowIterator for SortedRowCollection {
420    type Iter = SortedRowCollectionIter;
421
422    fn into_row_iter(self) -> Self::Iter {
423        SortedRowCollectionIter {
424            collection: self,
425            row_idx: 0,
426            diff_idx: 0,
427            limit: None,
428            offset: 0,
429            projection: None,
430            // Note: Neither of these types allocate until elements are pushed in.
431            projection_buf: (DatumVec::new(), Row::default()),
432        }
433    }
434}
435
436/// Iterator-like struct to help with extracting rows in sorted order from `RowCollection`.
437struct RunIter<'a, F> {
438    collection: &'a RowCollection,
439    cmp: &'a F,
440    range: std::ops::Range<usize>,
441}
442
443impl<'a, F> PartialOrd for RunIter<'a, F>
444where
445    F: Fn(&RowRef, &RowRef) -> std::cmp::Ordering,
446{
447    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
448        Some(self.cmp(other))
449    }
450}
451
452impl<'a, F> Ord for RunIter<'a, F>
453where
454    F: Fn(&RowRef, &RowRef) -> std::cmp::Ordering,
455{
456    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
457        let left = self.collection.get(self.range.start).unwrap().0;
458        let right = self.collection.get(other.range.start).unwrap().0;
459        (self.cmp)(left, right)
460    }
461}
462
463impl<'a, F> PartialEq for RunIter<'a, F>
464where
465    F: Fn(&RowRef, &RowRef) -> std::cmp::Ordering,
466{
467    fn eq(&self, other: &Self) -> bool {
468        self.cmp(other) == std::cmp::Ordering::Equal
469    }
470}
471
472impl<'a, F> Eq for RunIter<'a, F> where F: Fn(&RowRef, &RowRef) -> std::cmp::Ordering {}
473
474#[cfg(test)]
475mod tests {
476    use std::borrow::Borrow;
477
478    use mz_ore::assert_none;
479    use mz_repr::Datum;
480    use proptest::prelude::*;
481    use proptest::test_runner::Config;
482
483    use super::*;
484
485    impl<'a, T: IntoIterator<Item = &'a Row>> From<T> for RowCollection {
486        fn from(rows: T) -> Self {
487            let mut encoded = Vec::<u8>::new();
488            let mut metadata = Vec::<EncodedRowMetadata>::new();
489
490            for row in rows {
491                encoded.extend(row.data());
492                metadata.push(EncodedRowMetadata {
493                    offset: encoded.len(),
494                    diff: NonZeroUsize::MIN,
495                });
496            }
497            let runs = vec![metadata.len()];
498
499            RowCollection {
500                encoded: Bytes::from(encoded),
501                metadata: metadata.into(),
502                runs,
503            }
504        }
505    }
506
507    #[mz_ore::test]
508    fn test_row_collection() {
509        let a = Row::pack_slice(&[Datum::False, Datum::String("hello world"), Datum::Int16(42)]);
510        let b = Row::pack_slice(&[Datum::MzTimestamp(mz_repr::Timestamp::new(10))]);
511
512        let collection = RowCollection::from([&a, &b]);
513
514        let (a_rnd, _) = collection.get(0).unwrap();
515        assert_eq!(a_rnd, a.borrow());
516
517        let (b_rnd, _) = collection.get(1).unwrap();
518        assert_eq!(b_rnd, b.borrow());
519    }
520
521    #[mz_ore::test]
522    fn test_merge() {
523        let a = Row::pack_slice(&[Datum::False, Datum::String("hello world"), Datum::Int16(42)]);
524        let b = Row::pack_slice(&[Datum::MzTimestamp(mz_repr::Timestamp::new(10))]);
525
526        let mut a_col = RowCollection::from([&a]);
527        let b_col = RowCollection::from([&b]);
528
529        a_col.merge(&b_col);
530
531        assert_eq!(a_col.count(0, None), 2);
532        assert_eq!(a_col.get(0).map(|(r, _)| r), Some(a.borrow()));
533        assert_eq!(a_col.get(1).map(|(r, _)| r), Some(b.borrow()));
534    }
535
536    #[mz_ore::test]
537    fn test_sort() {
538        let a = Row::pack_slice(&[Datum::False, Datum::String("hello world"), Datum::Int16(42)]);
539        let b = Row::pack_slice(&[Datum::MzTimestamp(mz_repr::Timestamp::new(10))]);
540        let c = Row::pack_slice(&[Datum::True, Datum::String("hello world"), Datum::Int16(42)]);
541        let d = Row::pack_slice(&[Datum::MzTimestamp(mz_repr::Timestamp::new(9))]);
542
543        let col = {
544            let mut part = [&a, &b];
545            part.sort_by(|a, b| a.cmp(b));
546            let mut part1 = RowCollection::from(part);
547            let mut part = [&c, &d];
548            part.sort_by(|a, b| a.cmp(b));
549            let part2 = RowCollection::from(part);
550            part1.merge(&part2);
551            part1
552        };
553        let mut rows = [a, b, c, d];
554
555        let sorted_view = col.sorted_view(&[]);
556        rows.sort_by(|a, b| a.cmp(b));
557
558        for i in 0..rows.len() {
559            let (row_x, _) = sorted_view.get(i).unwrap();
560            let row_y = rows.get(i).unwrap();
561
562            assert_eq!(row_x, row_y.borrow());
563        }
564    }
565
566    #[mz_ore::test]
567    fn test_sorted_iter() {
568        let a = Row::pack_slice(&[Datum::String("hello world")]);
569        let b = Row::pack_slice(&[Datum::UInt32(42)]);
570        let mut col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(3).unwrap())], &[]);
571        col.merge(&RowCollection::new(
572            vec![(b.clone(), NonZeroUsize::new(2).unwrap())],
573            &[],
574        ));
575        let col = col.sorted_view(&[]);
576        let mut iter = col.into_row_iter();
577
578        // Peek shouldn't advance the iterator.
579        assert_eq!(iter.peek(), Some(b.borrow()));
580
581        assert_eq!(iter.next(), Some(b.borrow()));
582        assert_eq!(iter.next(), Some(b.borrow()));
583        assert_eq!(iter.next(), Some(a.borrow()));
584        assert_eq!(iter.next(), Some(a.borrow()));
585        assert_eq!(iter.next(), Some(a.borrow()));
586        assert_eq!(iter.next(), None);
587
588        // For good measure make sure we don't panic.
589        assert_eq!(iter.next(), None);
590        assert_eq!(iter.peek(), None);
591    }
592
593    #[mz_ore::test]
594    fn test_sorted_iter_offset() {
595        let a = Row::pack_slice(&[Datum::String("hello world")]);
596        let b = Row::pack_slice(&[Datum::UInt32(42)]);
597        let mut col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(3).unwrap())], &[]);
598        col.merge(&RowCollection::new(
599            vec![(b.clone(), NonZeroUsize::new(2).unwrap())],
600            &[],
601        ));
602        let col = col.sorted_view(&[]);
603
604        // Test with a reasonable offset that does not span rows.
605        let mut iter = col.into_row_iter().apply_offset(1);
606        assert_eq!(iter.next(), Some(b.borrow()));
607        assert_eq!(iter.next(), Some(a.borrow()));
608        assert_eq!(iter.next(), Some(a.borrow()));
609        assert_eq!(iter.next(), Some(a.borrow()));
610        assert_eq!(iter.next(), None);
611        assert_eq!(iter.next(), None);
612
613        let col = iter.into_inner();
614
615        // Test with an offset that spans the first row.
616        let mut iter = col.into_row_iter().apply_offset(3);
617
618        assert_eq!(iter.peek(), Some(a.borrow()));
619
620        assert_eq!(iter.next(), Some(a.borrow()));
621        assert_eq!(iter.next(), Some(a.borrow()));
622        assert_eq!(iter.next(), None);
623        assert_eq!(iter.next(), None);
624
625        let col = iter.into_inner();
626
627        // Test with an offset that passes the entire collection.
628        let mut iter = col.into_row_iter().apply_offset(100);
629        assert_eq!(iter.peek(), None);
630        assert_eq!(iter.next(), None);
631        assert_eq!(iter.peek(), None);
632        assert_eq!(iter.next(), None);
633    }
634
635    #[mz_ore::test]
636    fn test_sorted_iter_limit() {
637        let a = Row::pack_slice(&[Datum::String("hello world")]);
638        let b = Row::pack_slice(&[Datum::UInt32(42)]);
639        let mut col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(3).unwrap())], &[]);
640        col.merge(&RowCollection::new(
641            vec![(b.clone(), NonZeroUsize::new(2).unwrap())],
642            &[],
643        ));
644        let col = col.sorted_view(&[]);
645
646        // Test with a limit that spans only the first row.
647        let mut iter = col.into_row_iter().with_limit(1);
648        assert_eq!(iter.next(), Some(b.borrow()));
649        assert_eq!(iter.next(), None);
650        assert_eq!(iter.next(), None);
651
652        let col = iter.into_inner();
653
654        // Test with a limit that spans both rows.
655        let mut iter = col.into_row_iter().with_limit(4);
656        assert_eq!(iter.peek(), Some(b.borrow()));
657        assert_eq!(iter.next(), Some(b.borrow()));
658        assert_eq!(iter.next(), Some(b.borrow()));
659
660        assert_eq!(iter.peek(), Some(a.borrow()));
661        assert_eq!(iter.next(), Some(a.borrow()));
662        assert_eq!(iter.next(), Some(a.borrow()));
663
664        assert_eq!(iter.next(), None);
665        assert_eq!(iter.next(), None);
666
667        let col = iter.into_inner();
668
669        // Test with a limit that is more rows than we have.
670        let mut iter = col.into_row_iter().with_limit(1000);
671        assert_eq!(iter.next(), Some(b.borrow()));
672        assert_eq!(iter.next(), Some(b.borrow()));
673        assert_eq!(iter.next(), Some(a.borrow()));
674        assert_eq!(iter.next(), Some(a.borrow()));
675        assert_eq!(iter.next(), Some(a.borrow()));
676        assert_eq!(iter.next(), None);
677        assert_eq!(iter.next(), None);
678
679        let col = iter.into_inner();
680
681        // Test with a limit of 0.
682        let mut iter = col.into_row_iter().with_limit(0);
683        assert_eq!(iter.peek(), None);
684        assert_eq!(iter.next(), None);
685        assert_eq!(iter.next(), None);
686    }
687
688    #[mz_ore::test]
689    fn test_mapped_row_iterator() {
690        let a = Row::pack_slice(&[Datum::String("hello world")]);
691        let col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(3).unwrap())], &[]);
692        let col = col.sorted_view(&[]);
693
694        // Make sure we can call `.map` on a `dyn RowIterator`.
695        let iter: Box<dyn RowIterator> = Box::new(col.into_row_iter());
696
697        let mut mapped = iter.map(|f| f.to_owned());
698        assert!(mapped.next().is_some());
699        assert!(mapped.next().is_some());
700        assert!(mapped.next().is_some());
701        assert_none!(mapped.next());
702        assert_none!(mapped.next());
703    }
704
705    #[mz_ore::test]
706    fn test_projected_row_iterator() {
707        let a = Row::pack_slice(&[Datum::String("hello world"), Datum::Int16(42)]);
708        let col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(2).unwrap())], &[]);
709        let col = col.sorted_view(&[]);
710
711        // Project away the first column.
712        let mut iter = col.into_row_iter().with_projection(vec![1]);
713
714        let projected_a = Row::pack_slice(&[Datum::Int16(42)]);
715        assert_eq!(iter.next(), Some(projected_a.as_ref()));
716        assert_eq!(iter.next(), Some(projected_a.as_ref()));
717        assert_eq!(iter.next(), None);
718        assert_eq!(iter.next(), None);
719
720        let col = iter.into_inner();
721
722        // Project away all columns.
723        let mut iter = col.into_row_iter().with_projection(vec![]);
724
725        let projected_a = Row::default();
726        assert_eq!(iter.next(), Some(projected_a.as_ref()));
727        assert_eq!(iter.next(), Some(projected_a.as_ref()));
728        assert_eq!(iter.next(), None);
729        assert_eq!(iter.next(), None);
730
731        let col = iter.into_inner();
732
733        // Include all columns.
734        let mut iter = col.into_row_iter().with_projection(vec![0, 1]);
735
736        assert_eq!(iter.next(), Some(a.as_ref()));
737        assert_eq!(iter.next(), Some(a.as_ref()));
738        assert_eq!(iter.next(), None);
739        assert_eq!(iter.next(), None);
740
741        let col = iter.into_inner();
742
743        // Swap the order of columns.
744        let mut iter = col.into_row_iter().with_projection(vec![1, 0]);
745
746        let projected_a = Row::pack_slice(&[Datum::Int16(42), Datum::String("hello world")]);
747        assert_eq!(iter.next(), Some(projected_a.as_ref()));
748        assert_eq!(iter.next(), Some(projected_a.as_ref()));
749        assert_eq!(iter.next(), None);
750        assert_eq!(iter.next(), None);
751    }
752
753    #[mz_ore::test]
754    fn test_count_respects_limit_and_offset() {
755        let a = Row::pack_slice(&[Datum::String("hello world")]);
756        let b = Row::pack_slice(&[Datum::UInt32(42)]);
757        let col = RowCollection::new(
758            vec![
759                (a.clone(), NonZeroUsize::new(3).unwrap()),
760                (b.clone(), NonZeroUsize::new(2).unwrap()),
761            ],
762            &[],
763        );
764        let col = col.sorted_view(&[]);
765
766        // How many total rows there are.
767        let iter = col.into_row_iter();
768        assert_eq!(iter.count(), 5);
769
770        let col = iter.into_inner();
771
772        // With a LIMIT.
773        let iter = col.into_row_iter().with_limit(1);
774        assert_eq!(iter.count(), 1);
775
776        let col = iter.into_inner();
777
778        // With a LIMIT larger than the total number of rows.
779        let iter = col.into_row_iter().with_limit(100);
780        assert_eq!(iter.count(), 5);
781
782        let col = iter.into_inner();
783
784        // With an OFFSET.
785        let iter = col.into_row_iter().apply_offset(3);
786        assert_eq!(iter.count(), 2);
787
788        let col = iter.into_inner();
789
790        // With an OFFSET greater than the total number of rows.
791        let iter = col.into_row_iter().apply_offset(100);
792        assert_eq!(iter.count(), 0);
793
794        let col = iter.into_inner();
795
796        // With a LIMIT and an OFFSET.
797        let iter = col.into_row_iter().with_limit(2).apply_offset(4);
798        assert_eq!(iter.count(), 1);
799    }
800
801    #[mz_ore::test]
802    #[cfg_attr(miri, ignore)] // too slow
803    fn proptest_row_collection() {
804        fn row_collection_roundtrips(rows: Vec<Row>) {
805            let collection = RowCollection::from(&rows);
806
807            for i in 0..rows.len() {
808                let (a, _) = collection.get(i).unwrap();
809                let b = rows.get(i).unwrap().borrow();
810
811                assert_eq!(a, b);
812            }
813        }
814
815        // This test is slow, so we limit the default number of test cases.
816        proptest!(
817            Config { cases: 5, ..Default::default() },
818            |(rows in any::<Vec<Row>>())| {
819                // The proptest! macro interferes with rustfmt.
820                row_collection_roundtrips(rows)
821            }
822        );
823    }
824
825    #[mz_ore::test]
826    #[cfg_attr(miri, ignore)] // too slow
827    fn proptest_merge() {
828        fn row_collection_merge(a: Vec<Row>, b: Vec<Row>) {
829            let mut a_col = RowCollection::from(&a);
830            let b_col = RowCollection::from(&b);
831
832            a_col.merge(&b_col);
833
834            let all_rows = a.iter().chain(b.iter());
835            for (idx, row) in all_rows.enumerate() {
836                let (col_row, _) = a_col.get(idx).unwrap();
837                assert_eq!(col_row, row.borrow());
838            }
839        }
840
841        // This test is slow, so we limit the default number of test cases.
842        proptest!(
843            Config { cases: 3, ..Default::default() },
844            |(a in any::<Vec<Row>>(), b in any::<Vec<Row>>())| {
845                // The proptest! macro interferes with rustfmt.
846                row_collection_merge(a, b)
847            }
848        );
849    }
850
851    #[mz_ore::test]
852    #[cfg_attr(miri, ignore)] // too slow
853    fn proptest_sort() {
854        fn row_collection_sort(mut a: Vec<Row>, mut b: Vec<Row>) {
855            a.sort_by(|a, b| a.cmp(b));
856            b.sort_by(|a, b| a.cmp(b));
857            let mut col = RowCollection::from(&a);
858            col.merge(&RowCollection::from(&b));
859
860            let sorted_view = col.sorted_view(&[]);
861
862            a.append(&mut b);
863            a.sort_by(|a, b| a.cmp(b));
864
865            for i in 0..a.len() {
866                let (row_x, _) = sorted_view.get(i).unwrap();
867                let row_y = a.get(i).unwrap();
868
869                assert_eq!(row_x, row_y.borrow());
870            }
871        }
872
873        // This test is slow, so we limit the default number of test cases.
874        proptest!(
875            Config { cases: 5, ..Default::default() },
876            |(a in any::<Vec<Row>>(), b in any::<Vec<Row>>())| {
877                // The proptest! macro interferes with rustfmt.
878                row_collection_sort(a, b)
879            }
880        );
881    }
882}