Skip to main content

mz_expr/row/
collection.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Defines types for working with collections of [`Row`].
11
12use std::cell::RefCell;
13use std::cmp::Reverse;
14use std::collections::BinaryHeap;
15use std::collections::binary_heap::PeekMut;
16use std::num::NonZeroUsize;
17use std::sync::Arc;
18
19use bytes::Bytes;
20use mz_repr::{DatumVec, IntoRowIterator, Row, RowIterator, RowRef};
21use serde::{Deserialize, Serialize};
22
23use crate::ColumnOrder;
24
25/// Collection of runs of sorted [`Row`]s represented as a single blob.
26///
27/// Note: the encoding format we use to represent [`Row`]s in this struct is
28/// not stable, and thus should never be persisted durably.
29#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
30pub struct RowCollection {
31    /// Contiguous blob of encoded Rows.
32    encoded: Bytes,
33    /// Metadata about an individual Row in the blob.
34    metadata: Arc<[EncodedRowMetadata]>,
35}
36
37impl RowCollection {
38    /// Create a new [`RowCollection`] from a collection of [`Row`]s. Sorts data by `order_by`.
39    ///
40    /// Note that all row collections to be merged must be constructed with the same `order_by`
41    /// to ensure a consistent sort order. Anything else is undefined behavior.
42    // TODO: Remember the `order_by` and assert that it is the same for all collections.
43    pub fn new(mut rows: Vec<(Row, NonZeroUsize)>, order_by: &[ColumnOrder]) -> Self {
44        // Sort data to maintain sortedness invariants.
45        if order_by.is_empty() {
46            // Skip row decoding if not required.
47            rows.sort();
48        } else {
49            let (mut datum_vec1, mut datum_vec2) = (DatumVec::new(), DatumVec::new());
50            rows.sort_by(|(row1, _diff1), (row2, _diff2)| {
51                let borrow1 = datum_vec1.borrow_with(row1);
52                let borrow2 = datum_vec2.borrow_with(row2);
53                crate::compare_columns(order_by, &borrow1, &borrow2, || row1.cmp(row2))
54            });
55        }
56
57        // Pre-sizing our buffer should allow us to make just 1 allocation, and
58        // use the perfect amount of memory.
59        //
60        // Note(parkmycar): I didn't do any benchmarking to determine if this
61        // is faster, so feel free to change this if you'd like.
62        let encoded_size = rows.iter().map(|(row, _diff)| row.data_len()).sum();
63
64        let mut encoded = Vec::<u8>::with_capacity(encoded_size);
65        let mut metadata = Vec::<EncodedRowMetadata>::with_capacity(rows.len());
66
67        for (row, diff) in rows {
68            encoded.extend(row.data());
69            metadata.push(EncodedRowMetadata {
70                offset: encoded.len(),
71                diff,
72            });
73        }
74
75        RowCollection {
76            encoded: Bytes::from(encoded),
77            metadata: metadata.into(),
78        }
79    }
80
81    /// Concatenate another [`RowCollection`] onto `self`, copying and reallocating both sets of rows.
82    ///
83    /// This does not reorder the rows; the output will be sorted only if the inputs are.
84    pub fn concat(&mut self, other: &RowCollection) {
85        if other.count() == 0 {
86            return;
87        }
88
89        // TODO(parkmycar): Using SegmentedBytes here would be nice.
90        let mut new_bytes = vec![0; self.encoded.len() + other.encoded.len()];
91        new_bytes[..self.encoded.len()].copy_from_slice(&self.encoded[..]);
92        new_bytes[self.encoded.len()..].copy_from_slice(&other.encoded[..]);
93
94        let mapped_metas = other.metadata.iter().map(|meta| EncodedRowMetadata {
95            offset: meta.offset + self.encoded.len(),
96            diff: meta.diff,
97        });
98
99        self.metadata = self.metadata.iter().cloned().chain(mapped_metas).collect();
100        self.encoded = Bytes::from(new_bytes);
101    }
102
103    /// Adjust a row count for the provided offset and limit.
104    ///
105    /// This is only marginally related to row collections, but many callers need to make
106    /// this adjustment.
107    pub fn offset_limit(mut total: usize, offset: usize, limit: Option<usize>) -> usize {
108        // Consider a possible OFFSET.
109        total = total.saturating_sub(offset);
110
111        // Consider a possible LIMIT.
112        if let Some(limit) = limit {
113            total = std::cmp::min(limit, total);
114        }
115
116        total
117    }
118
119    /// Total count of [`Row`]s represented by this collection.
120    pub fn count(&self) -> usize {
121        self.metadata.iter().map(|meta| meta.diff.get()).sum()
122    }
123
124    /// Total count of ([`Row`], `EncodedRowMetadata`) pairs in this collection.
125    pub fn entries(&self) -> usize {
126        self.metadata.len()
127    }
128
129    /// Returns the number of bytes this [`RowCollection`] uses.
130    pub fn byte_len(&self) -> usize {
131        let row_data_size = self.encoded.len();
132        let metadata_size = self
133            .metadata
134            .len()
135            .saturating_mul(std::mem::size_of::<EncodedRowMetadata>());
136
137        row_data_size.saturating_add(metadata_size)
138    }
139
140    /// Returns a [`RowRef`] for the entry at `idx`, if one exists.
141    pub fn get(&self, idx: usize) -> Option<(&RowRef, &EncodedRowMetadata)> {
142        let (lower_offset, upper) = match idx {
143            0 => (0, self.metadata.get(idx)?),
144            _ => {
145                let lower = self.metadata.get(idx - 1).map(|m| m.offset)?;
146                let upper = self.metadata.get(idx)?;
147                (lower, upper)
148            }
149        };
150
151        let slice = &self.encoded[lower_offset..upper.offset];
152        // SAFETY: self.encoded contains only valid row data, and the metadata delimits only ranges
153        // that correspond to the original rows.
154        let row = unsafe { RowRef::from_slice(slice) };
155
156        Some((row, upper))
157    }
158
159    /// "Sorts" the [`RowCollection`] by the column order in `order_by`. The output will be sorted
160    /// if the inputs were all sorted by the given order; otherwise, the order is unspecified.
161    /// In either case, the output will be a [RowCollection] that contains the full contents of all
162    /// the input collections.
163    pub fn merge_sorted(runs: &[Self], order_by: &[ColumnOrder]) -> RowCollection {
164        if order_by.is_empty() {
165            Self::merge_sorted_inner(runs, &Ord::cmp)
166        } else {
167            let left_datum_vec = RefCell::new(mz_repr::DatumVec::new());
168            let right_datum_vec = RefCell::new(mz_repr::DatumVec::new());
169
170            let cmp = &|left: &RowRef, right: &RowRef| {
171                let (mut left_datum_vec, mut right_datum_vec) =
172                    (left_datum_vec.borrow_mut(), right_datum_vec.borrow_mut());
173                let left_datums = left_datum_vec.borrow_with(left);
174                let right_datums = right_datum_vec.borrow_with(right);
175                crate::compare_columns(order_by, &left_datums, &right_datums, || left.cmp(right))
176            };
177            Self::merge_sorted_inner(runs, cmp)
178        }
179    }
180
181    fn merge_sorted_inner<F>(runs: &[Self], cmp: &F) -> RowCollection
182    where
183        F: Fn(&RowRef, &RowRef) -> std::cmp::Ordering,
184    {
185        let mut heap = BinaryHeap::with_capacity(runs.len());
186
187        let mut metadata_len = 0;
188        let mut encoded_len = 0;
189        for collection in runs.iter() {
190            if collection.metadata.is_empty() {
191                continue;
192            }
193            metadata_len += collection.metadata.len();
194            encoded_len += collection.byte_len();
195            heap.push(Reverse(RunIter {
196                range: 0..collection.metadata.len(),
197                collection,
198                cmp,
199            }));
200        }
201
202        let mut encoded = Vec::with_capacity(encoded_len);
203        let mut metadata = Vec::with_capacity(metadata_len);
204
205        while let Some(mut peek) = heap.peek_mut() {
206            let Reverse(run) = &mut *peek;
207            if let Some(next) = run.range.next() {
208                let (row, meta) = run.collection.get(next).unwrap();
209                encoded.extend(row.data());
210                metadata.push(EncodedRowMetadata {
211                    offset: encoded.len(),
212                    diff: meta.diff,
213                });
214                if run.range.is_empty() {
215                    PeekMut::pop(peek);
216                }
217            }
218        }
219
220        RowCollection {
221            encoded: encoded.into(),
222            metadata: metadata.into(),
223        }
224    }
225}
226
227/// Inner type of [`RowCollection`], describes a single Row.
228#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
229pub struct EncodedRowMetadata {
230    /// Offset into the binary blob of encoded rows.
231    ///
232    /// TODO(parkmycar): Consider making this a `u32`.
233    offset: usize,
234    /// Diff for the Row.
235    ///
236    /// TODO(parkmycar): Consider making this a smaller type, note that some compute introspection
237    /// collections, e.g. `mz_scheduling_elapsed_raw`, encodes nano seconds in the diff field which
238    /// requires a u64.
239    diff: NonZeroUsize,
240}
241
242#[derive(Debug, Clone)]
243pub struct RowCollectionIter {
244    /// Collection we're iterating over.
245    collection: RowCollection,
246
247    /// Index for the row we're currently referencing.
248    row_idx: usize,
249    /// Number of diffs we've emitted for the current row.
250    diff_idx: usize,
251
252    /// Maximum number of rows this iterator will yield.
253    limit: Option<usize>,
254    /// Number of rows we're offset by.
255    ///
256    /// Note: We eagerly apply an offset, but we track it here so we can
257    /// accurately report [`RowIterator::count`].
258    offset: usize,
259
260    /// Columns to underlying rows to include.
261    projection: Option<Vec<usize>>,
262    /// Allocations that we reuse for every iteration to project columns.
263    projection_buf: (DatumVec, Row),
264}
265
266impl RowCollectionIter {
267    /// Returns the inner `RowCollection`.
268    pub fn into_inner(self) -> RowCollection {
269        self.collection
270    }
271
272    /// Immediately applies an offset to this iterator.
273    pub fn apply_offset(mut self, offset: usize) -> RowCollectionIter {
274        Self::advance_by(
275            &self.collection,
276            &mut self.row_idx,
277            &mut self.diff_idx,
278            offset,
279        );
280
281        // Keep track of how many rows we've offset by.
282        self.offset = self.offset.saturating_add(offset);
283
284        self
285    }
286
287    /// Sets the limit for this iterator.
288    pub fn with_limit(mut self, limit: usize) -> RowCollectionIter {
289        self.limit = Some(limit);
290        self
291    }
292
293    /// Specify the columns that should be yielded.
294    pub fn with_projection(mut self, projection: Vec<usize>) -> RowCollectionIter {
295        // Omit the projection if it would be a no-op to avoid a relatively expensive memcpy.
296        if let Some((row, _)) = self.collection.get(0) {
297            let cols = row.into_iter().enumerate().map(|(idx, _datum)| idx);
298            if projection.iter().copied().eq(cols) {
299                return self;
300            }
301        }
302
303        self.projection = Some(projection);
304        self
305    }
306
307    /// Helper method for implementing [`RowIterator`].
308    ///
309    /// Advances the internal pointers by the specified amount.
310    fn advance_by(
311        collection: &RowCollection,
312        row_idx: &mut usize,
313        diff_idx: &mut usize,
314        mut count: usize,
315    ) {
316        while count > 0 {
317            let Some((_, row_meta)) = collection.get(*row_idx) else {
318                return;
319            };
320
321            let remaining_diff = row_meta.diff.get() - *diff_idx;
322            if remaining_diff <= count {
323                *diff_idx = 0;
324                *row_idx += 1;
325                count -= remaining_diff;
326            } else {
327                *diff_idx += count;
328                count = 0;
329            }
330        }
331    }
332
333    /// Helper method for implementing [`RowIterator`].
334    ///
335    /// Projects columns for the provided `row`.
336    fn project<'a>(
337        projection: Option<&[usize]>,
338        row: &'a RowRef,
339        datum_buf: &'a mut DatumVec,
340        row_buf: &'a mut Row,
341    ) -> &'a RowRef {
342        if let Some(projection) = projection {
343            // Copy the required columns into our reusable buffer.
344            {
345                let datums = datum_buf.borrow_with(row);
346                row_buf
347                    .packer()
348                    .extend(projection.iter().map(|i| &datums[*i]));
349            }
350
351            row_buf
352        } else {
353            row
354        }
355    }
356}
357
358impl RowIterator for RowCollectionIter {
359    fn next(&mut self) -> Option<&RowRef> {
360        // Bail if we've reached our limit.
361        if let Some(0) = self.limit {
362            return None;
363        }
364
365        let row = self.collection.get(self.row_idx).map(|(r, _)| r)?;
366
367        // If we're about to yield a row, then subtract from our limit.
368        if let Some(limit) = &mut self.limit {
369            *limit = limit.saturating_sub(1);
370        }
371
372        // Advance to the next row.
373        Self::advance_by(&self.collection, &mut self.row_idx, &mut self.diff_idx, 1);
374
375        // Project away and/or re-order any columns.
376        let (datum_buf, row_buf) = &mut self.projection_buf;
377        Some(Self::project(
378            self.projection.as_deref(),
379            row,
380            datum_buf,
381            row_buf,
382        ))
383    }
384
385    fn peek(&mut self) -> Option<&RowRef> {
386        // Bail if we've reached our limit.
387        if let Some(0) = self.limit {
388            return None;
389        }
390
391        let row = self.collection.get(self.row_idx).map(|(r, _)| r)?;
392
393        // Note: Unlike `next()` we do not subtract from our limit, nor advance
394        // the internal pointers.
395
396        // Project away and/or re-order any columns.
397        let (datum_buf, row_buf) = &mut self.projection_buf;
398        Some(Self::project(
399            self.projection.as_deref(),
400            row,
401            datum_buf,
402            row_buf,
403        ))
404    }
405
406    fn count(&self) -> usize {
407        RowCollection::offset_limit(self.collection.count(), self.offset, self.limit)
408    }
409
410    fn box_clone(&self) -> Box<dyn RowIterator> {
411        Box::new(self.clone())
412    }
413}
414
415impl IntoRowIterator for RowCollection {
416    type Iter = RowCollectionIter;
417
418    fn into_row_iter(self) -> Self::Iter {
419        RowCollectionIter {
420            collection: self,
421            row_idx: 0,
422            diff_idx: 0,
423            limit: None,
424            offset: 0,
425            projection: None,
426            // Note: Neither of these types allocate until elements are pushed in.
427            projection_buf: (DatumVec::new(), Row::default()),
428        }
429    }
430}
431
432/// Iterator-like struct to help with extracting rows in sorted order from `RowCollection`.
433struct RunIter<'a, F> {
434    collection: &'a RowCollection,
435    cmp: &'a F,
436    range: std::ops::Range<usize>,
437}
438
439impl<'a, F> PartialOrd for RunIter<'a, F>
440where
441    F: Fn(&RowRef, &RowRef) -> std::cmp::Ordering,
442{
443    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
444        Some(self.cmp(other))
445    }
446}
447
448impl<'a, F> Ord for RunIter<'a, F>
449where
450    F: Fn(&RowRef, &RowRef) -> std::cmp::Ordering,
451{
452    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
453        let left = self.collection.get(self.range.start).unwrap().0;
454        let right = other.collection.get(other.range.start).unwrap().0;
455        (self.cmp)(left, right)
456    }
457}
458
459impl<'a, F> PartialEq for RunIter<'a, F>
460where
461    F: Fn(&RowRef, &RowRef) -> std::cmp::Ordering,
462{
463    fn eq(&self, other: &Self) -> bool {
464        self.cmp(other) == std::cmp::Ordering::Equal
465    }
466}
467
468impl<'a, F> Eq for RunIter<'a, F> where F: Fn(&RowRef, &RowRef) -> std::cmp::Ordering {}
469
470#[cfg(test)]
471mod tests {
472    use std::borrow::Borrow;
473
474    use mz_ore::assert_none;
475    use mz_repr::Datum;
476    use proptest::prelude::*;
477    use proptest::test_runner::Config;
478
479    use super::*;
480
481    impl<'a, T: IntoIterator<Item = &'a Row>> From<T> for RowCollection {
482        fn from(rows: T) -> Self {
483            let mut encoded = Vec::<u8>::new();
484            let mut metadata = Vec::<EncodedRowMetadata>::new();
485
486            for row in rows {
487                encoded.extend(row.data());
488                metadata.push(EncodedRowMetadata {
489                    offset: encoded.len(),
490                    diff: NonZeroUsize::MIN,
491                });
492            }
493
494            RowCollection {
495                encoded: Bytes::from(encoded),
496                metadata: metadata.into(),
497            }
498        }
499    }
500
501    #[mz_ore::test]
502    fn test_row_collection() {
503        let a = Row::pack_slice(&[Datum::False, Datum::String("hello world"), Datum::Int16(42)]);
504        let b = Row::pack_slice(&[Datum::MzTimestamp(mz_repr::Timestamp::new(10))]);
505
506        let collection = RowCollection::from([&a, &b]);
507
508        let (a_rnd, _) = collection.get(0).unwrap();
509        assert_eq!(a_rnd, a.borrow());
510
511        let (b_rnd, _) = collection.get(1).unwrap();
512        assert_eq!(b_rnd, b.borrow());
513    }
514
515    #[mz_ore::test]
516    fn test_merge() {
517        let a = Row::pack_slice(&[Datum::False, Datum::String("hello world"), Datum::Int16(42)]);
518        let b = Row::pack_slice(&[Datum::MzTimestamp(mz_repr::Timestamp::new(10))]);
519
520        let mut a_col = RowCollection::from([&a]);
521        let b_col = RowCollection::from([&b]);
522
523        a_col.concat(&b_col);
524
525        assert_eq!(a_col.count(), 2);
526        assert_eq!(a_col.get(0).map(|(r, _)| r), Some(a.borrow()));
527        assert_eq!(a_col.get(1).map(|(r, _)| r), Some(b.borrow()));
528    }
529
530    #[mz_ore::test]
531    fn test_sort() {
532        let a = Row::pack_slice(&[Datum::False, Datum::String("hello world"), Datum::Int16(42)]);
533        let b = Row::pack_slice(&[Datum::MzTimestamp(mz_repr::Timestamp::new(10))]);
534        let c = Row::pack_slice(&[Datum::True, Datum::String("hello world"), Datum::Int16(42)]);
535        let d = Row::pack_slice(&[Datum::MzTimestamp(mz_repr::Timestamp::new(9))]);
536
537        let cols = {
538            let mut part = [&a, &b];
539            part.sort_by(|a, b| a.cmp(b));
540            let part1 = RowCollection::from(part);
541            let mut part = [&c, &d];
542            part.sort_by(|a, b| a.cmp(b));
543            let part2 = RowCollection::from(part);
544            vec![part1, part2]
545        };
546        let mut rows = [a, b, c, d];
547
548        let sorted_view = RowCollection::merge_sorted(&cols, &[]);
549        rows.sort_by(|a, b| a.cmp(b));
550
551        for i in 0..rows.len() {
552            let (row_x, _) = sorted_view.get(i).unwrap();
553            let row_y = rows.get(i).unwrap();
554
555            assert_eq!(row_x, row_y.borrow());
556        }
557    }
558
559    #[mz_ore::test]
560    fn test_sorted_iter() {
561        let a = Row::pack_slice(&[Datum::String("hello world")]);
562        let b = Row::pack_slice(&[Datum::UInt32(42)]);
563        let col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(3).unwrap())], &[]);
564        let col = RowCollection::merge_sorted(
565            &[
566                col,
567                RowCollection::new(vec![(b.clone(), NonZeroUsize::new(2).unwrap())], &[]),
568            ],
569            &[],
570        );
571        let mut iter = col.into_row_iter();
572
573        // Peek shouldn't advance the iterator.
574        assert_eq!(iter.peek(), Some(b.borrow()));
575
576        assert_eq!(iter.next(), Some(b.borrow()));
577        assert_eq!(iter.next(), Some(b.borrow()));
578        assert_eq!(iter.next(), Some(a.borrow()));
579        assert_eq!(iter.next(), Some(a.borrow()));
580        assert_eq!(iter.next(), Some(a.borrow()));
581        assert_eq!(iter.next(), None);
582
583        // For good measure make sure we don't panic.
584        assert_eq!(iter.next(), None);
585        assert_eq!(iter.peek(), None);
586    }
587
588    #[mz_ore::test]
589    fn test_sorted_iter_offset() {
590        let a = Row::pack_slice(&[Datum::String("hello world")]);
591        let b = Row::pack_slice(&[Datum::UInt32(42)]);
592        let col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(3).unwrap())], &[]);
593        let col = RowCollection::merge_sorted(
594            &[
595                col,
596                RowCollection::new(vec![(b.clone(), NonZeroUsize::new(2).unwrap())], &[]),
597            ],
598            &[],
599        );
600
601        // Test with a reasonable offset that does not span rows.
602        let mut iter = col.into_row_iter().apply_offset(1);
603        assert_eq!(iter.next(), Some(b.borrow()));
604        assert_eq!(iter.next(), Some(a.borrow()));
605        assert_eq!(iter.next(), Some(a.borrow()));
606        assert_eq!(iter.next(), Some(a.borrow()));
607        assert_eq!(iter.next(), None);
608        assert_eq!(iter.next(), None);
609
610        let col = iter.into_inner();
611
612        // Test with an offset that spans the first row.
613        let mut iter = col.into_row_iter().apply_offset(3);
614
615        assert_eq!(iter.peek(), Some(a.borrow()));
616
617        assert_eq!(iter.next(), Some(a.borrow()));
618        assert_eq!(iter.next(), Some(a.borrow()));
619        assert_eq!(iter.next(), None);
620        assert_eq!(iter.next(), None);
621
622        let col = iter.into_inner();
623
624        // Test with an offset that passes the entire collection.
625        let mut iter = col.into_row_iter().apply_offset(100);
626        assert_eq!(iter.peek(), None);
627        assert_eq!(iter.next(), None);
628        assert_eq!(iter.peek(), None);
629        assert_eq!(iter.next(), None);
630    }
631
632    #[mz_ore::test]
633    fn test_sorted_iter_limit() {
634        let a = Row::pack_slice(&[Datum::String("hello world")]);
635        let b = Row::pack_slice(&[Datum::UInt32(42)]);
636        let col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(3).unwrap())], &[]);
637        let col = RowCollection::merge_sorted(
638            &[
639                col,
640                RowCollection::new(vec![(b.clone(), NonZeroUsize::new(2).unwrap())], &[]),
641            ],
642            &[],
643        );
644
645        // Test with a limit that spans only the first row.
646        let mut iter = col.into_row_iter().with_limit(1);
647        assert_eq!(iter.next(), Some(b.borrow()));
648        assert_eq!(iter.next(), None);
649        assert_eq!(iter.next(), None);
650
651        let col = iter.into_inner();
652
653        // Test with a limit that spans both rows.
654        let mut iter = col.into_row_iter().with_limit(4);
655        assert_eq!(iter.peek(), Some(b.borrow()));
656        assert_eq!(iter.next(), Some(b.borrow()));
657        assert_eq!(iter.next(), Some(b.borrow()));
658
659        assert_eq!(iter.peek(), Some(a.borrow()));
660        assert_eq!(iter.next(), Some(a.borrow()));
661        assert_eq!(iter.next(), Some(a.borrow()));
662
663        assert_eq!(iter.next(), None);
664        assert_eq!(iter.next(), None);
665
666        let col = iter.into_inner();
667
668        // Test with a limit that is more rows than we have.
669        let mut iter = col.into_row_iter().with_limit(1000);
670        assert_eq!(iter.next(), Some(b.borrow()));
671        assert_eq!(iter.next(), Some(b.borrow()));
672        assert_eq!(iter.next(), Some(a.borrow()));
673        assert_eq!(iter.next(), Some(a.borrow()));
674        assert_eq!(iter.next(), Some(a.borrow()));
675        assert_eq!(iter.next(), None);
676        assert_eq!(iter.next(), None);
677
678        let col = iter.into_inner();
679
680        // Test with a limit of 0.
681        let mut iter = col.into_row_iter().with_limit(0);
682        assert_eq!(iter.peek(), None);
683        assert_eq!(iter.next(), None);
684        assert_eq!(iter.next(), None);
685    }
686
687    #[mz_ore::test]
688    fn test_mapped_row_iterator() {
689        let a = Row::pack_slice(&[Datum::String("hello world")]);
690        let col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(3).unwrap())], &[]);
691
692        // Make sure we can call `.map` on a `dyn RowIterator`.
693        let iter: Box<dyn RowIterator> = Box::new(col.into_row_iter());
694
695        let mut mapped = iter.map(|f| f.to_owned());
696        assert!(mapped.next().is_some());
697        assert!(mapped.next().is_some());
698        assert!(mapped.next().is_some());
699        assert_none!(mapped.next());
700        assert_none!(mapped.next());
701    }
702
703    #[mz_ore::test]
704    fn test_projected_row_iterator() {
705        let a = Row::pack_slice(&[Datum::String("hello world"), Datum::Int16(42)]);
706        let col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(2).unwrap())], &[]);
707
708        // Project away the first column.
709        let mut iter = col.into_row_iter().with_projection(vec![1]);
710
711        let projected_a = Row::pack_slice(&[Datum::Int16(42)]);
712        assert_eq!(iter.next(), Some(projected_a.as_ref()));
713        assert_eq!(iter.next(), Some(projected_a.as_ref()));
714        assert_eq!(iter.next(), None);
715        assert_eq!(iter.next(), None);
716
717        let col = iter.into_inner();
718
719        // Project away all columns.
720        let mut iter = col.into_row_iter().with_projection(vec![]);
721
722        let projected_a = Row::default();
723        assert_eq!(iter.next(), Some(projected_a.as_ref()));
724        assert_eq!(iter.next(), Some(projected_a.as_ref()));
725        assert_eq!(iter.next(), None);
726        assert_eq!(iter.next(), None);
727
728        let col = iter.into_inner();
729
730        // Include all columns.
731        let mut iter = col.into_row_iter().with_projection(vec![0, 1]);
732
733        assert_eq!(iter.next(), Some(a.as_ref()));
734        assert_eq!(iter.next(), Some(a.as_ref()));
735        assert_eq!(iter.next(), None);
736        assert_eq!(iter.next(), None);
737
738        let col = iter.into_inner();
739
740        // Swap the order of columns.
741        let mut iter = col.into_row_iter().with_projection(vec![1, 0]);
742
743        let projected_a = Row::pack_slice(&[Datum::Int16(42), Datum::String("hello world")]);
744        assert_eq!(iter.next(), Some(projected_a.as_ref()));
745        assert_eq!(iter.next(), Some(projected_a.as_ref()));
746        assert_eq!(iter.next(), None);
747        assert_eq!(iter.next(), None);
748    }
749
750    #[mz_ore::test]
751    fn test_count_respects_limit_and_offset() {
752        let a = Row::pack_slice(&[Datum::String("hello world")]);
753        let b = Row::pack_slice(&[Datum::UInt32(42)]);
754        let col = RowCollection::new(
755            vec![
756                (a.clone(), NonZeroUsize::new(3).unwrap()),
757                (b.clone(), NonZeroUsize::new(2).unwrap()),
758            ],
759            &[],
760        );
761
762        // How many total rows there are.
763        let iter = col.into_row_iter();
764        assert_eq!(iter.count(), 5);
765
766        let col = iter.into_inner();
767
768        // With a LIMIT.
769        let iter = col.into_row_iter().with_limit(1);
770        assert_eq!(iter.count(), 1);
771
772        let col = iter.into_inner();
773
774        // With a LIMIT larger than the total number of rows.
775        let iter = col.into_row_iter().with_limit(100);
776        assert_eq!(iter.count(), 5);
777
778        let col = iter.into_inner();
779
780        // With an OFFSET.
781        let iter = col.into_row_iter().apply_offset(3);
782        assert_eq!(iter.count(), 2);
783
784        let col = iter.into_inner();
785
786        // With an OFFSET greater than the total number of rows.
787        let iter = col.into_row_iter().apply_offset(100);
788        assert_eq!(iter.count(), 0);
789
790        let col = iter.into_inner();
791
792        // With a LIMIT and an OFFSET.
793        let iter = col.into_row_iter().with_limit(2).apply_offset(4);
794        assert_eq!(iter.count(), 1);
795    }
796
797    #[mz_ore::test]
798    #[cfg_attr(miri, ignore)] // too slow
799    fn proptest_row_collection() {
800        fn row_collection_roundtrips(rows: Vec<Row>) {
801            let collection = RowCollection::from(&rows);
802
803            for i in 0..rows.len() {
804                let (a, _) = collection.get(i).unwrap();
805                let b = rows.get(i).unwrap().borrow();
806
807                assert_eq!(a, b);
808            }
809        }
810
811        // This test is slow, so we limit the default number of test cases.
812        proptest!(
813            Config { cases: 5, ..Default::default() },
814            |(rows in any::<Vec<Row>>())| {
815                // The proptest! macro interferes with rustfmt.
816                row_collection_roundtrips(rows)
817            }
818        );
819    }
820
821    #[mz_ore::test]
822    #[cfg_attr(miri, ignore)] // too slow
823    fn proptest_merge() {
824        fn row_collection_merge(a: Vec<Row>, b: Vec<Row>) {
825            let mut a_col = RowCollection::from(&a);
826            let b_col = RowCollection::from(&b);
827
828            a_col.concat(&b_col);
829
830            let all_rows = a.iter().chain(b.iter());
831            for (idx, row) in all_rows.enumerate() {
832                let (col_row, _) = a_col.get(idx).unwrap();
833                assert_eq!(col_row, row.borrow());
834            }
835        }
836
837        // This test is slow, so we limit the default number of test cases.
838        proptest!(
839            Config { cases: 3, ..Default::default() },
840            |(a in any::<Vec<Row>>(), b in any::<Vec<Row>>())| {
841                // The proptest! macro interferes with rustfmt.
842                row_collection_merge(a, b)
843            }
844        );
845    }
846
847    #[mz_ore::test]
848    #[cfg_attr(miri, ignore)] // too slow
849    fn proptest_sort() {
850        fn row_collection_sort(mut a: Vec<Row>, mut b: Vec<Row>) {
851            a.sort_by(|a, b| a.cmp(b));
852            b.sort_by(|a, b| a.cmp(b));
853
854            let sorted_view = RowCollection::merge_sorted(
855                &[RowCollection::from(&a), RowCollection::from(&b)],
856                &[],
857            );
858
859            a.append(&mut b);
860            a.sort_by(|a, b| a.cmp(b));
861
862            for i in 0..a.len() {
863                let (row_x, _) = sorted_view.get(i).unwrap();
864                let row_y = a.get(i).unwrap();
865
866                assert_eq!(row_x, row_y.borrow());
867            }
868        }
869
870        // This test is slow, so we limit the default number of test cases.
871        proptest!(
872            Config { cases: 5, ..Default::default() },
873            |(a in any::<Vec<Row>>(), b in any::<Vec<Row>>())| {
874                // The proptest! macro interferes with rustfmt.
875                row_collection_sort(a, b)
876            }
877        );
878    }
879}