Skip to main content

mz_expr/row/
collection.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Defines types for working with collections of [`Row`].
11
12use std::num::NonZeroUsize;
13
14use itertools::Itertools;
15use mz_repr::{
16    DatumVec, IntoRowIterator, Row, RowIterator, RowRef, Rows, RowsBuilder, SharedSlice,
17};
18use serde::{Deserialize, Serialize};
19
20use crate::{ColumnOrder, RowComparator};
21
22/// Collection of runs of sorted [`Row`]s represented as a single blob.
23///
24/// Note: the encoding format we use to represent [`Row`]s in this struct is
25/// not stable, and thus should never be persisted durably.
26#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
27pub struct RowCollection {
28    /// Contiguous blob of encoded Rows.
29    rows: Rows,
30    /// Metadata about an individual Row in the blob.
31    diffs: SharedSlice<NonZeroUsize>,
32}
33
34#[derive(Debug)]
35pub struct RowCollectionBuilder {
36    rows: RowsBuilder,
37    diffs: Vec<NonZeroUsize>,
38}
39
40impl RowCollectionBuilder {
41    pub fn push(&mut self, row: &RowRef, diff: NonZeroUsize) {
42        self.rows.push(row);
43        self.diffs.push(diff);
44    }
45
46    pub fn build(self) -> RowCollection {
47        RowCollection {
48            rows: self.rows.build(),
49            diffs: self.diffs.into(),
50        }
51    }
52}
53
54impl RowCollection {
55    /// Create a new [`RowCollection`] from a collection of [`Row`]s.
56    ///
57    /// The order in which elements are pushed will be preserved.
58    pub fn builder(byte_len_hint: usize, len_hint: usize) -> RowCollectionBuilder {
59        RowCollectionBuilder {
60            rows: Rows::builder(byte_len_hint, len_hint),
61            diffs: Vec::with_capacity(len_hint),
62        }
63    }
64
65    /// Create a new [`RowCollection`] from a collection of [`Row`]s. Sorts data by `order_by`.
66    ///
67    /// Note that all row collections to be merged must be constructed with the same `order_by`
68    /// to ensure a consistent sort order. Anything else is undefined behavior.
69    // TODO: Remember the `order_by` and assert that it is the same for all collections.
70    pub fn new(mut rows: Vec<(Row, NonZeroUsize)>, order_by: &[ColumnOrder]) -> Self {
71        let comparator = RowComparator::new(order_by);
72        // Sort data to maintain sortedness invariants.
73        rows.sort_by(|(row1, _diff1), (row2, _diff2)| {
74            comparator.compare_rows(row1, row2, || row1.cmp(row2))
75        });
76
77        // Pre-sizing our buffer should allow us to make just 1 allocation, and
78        // use the perfect amount of memory.
79        //
80        // Note(parkmycar): I didn't do any benchmarking to determine if this
81        // is faster, so feel free to change this if you'd like.
82        let encoded_size = rows.iter().map(|(row, _diff)| row.data_len()).sum();
83
84        let mut builder = Self::builder(encoded_size, rows.len());
85        for (row, diff) in rows {
86            builder.push(row.as_row_ref(), diff);
87        }
88        builder.build()
89    }
90
91    fn iter(&self) -> impl Iterator<Item = (&RowRef, NonZeroUsize)> {
92        self.rows.iter().zip_eq(self.diffs.iter().copied())
93    }
94
95    /// Concatenate another [`RowCollection`] onto `self`, copying and reallocating both sets of rows.
96    ///
97    /// This does not reorder the rows; the output will be sorted only if the inputs are.
98    pub fn concat(&mut self, other: &RowCollection) {
99        if other.count() == 0 {
100            return;
101        }
102
103        // TODO(parkmycar): Using SegmentedBytes here would be nice.
104        let byte_len = self.rows.byte_len() + other.rows.byte_len();
105        let len = self.rows.len() + other.rows.len();
106        let mut builder = Self::builder(byte_len, len);
107        for (row, diff) in self.iter().chain(other.iter()) {
108            builder.push(row, diff);
109        }
110        *self = builder.build();
111    }
112
113    /// Adjust a row count for the provided offset and limit.
114    ///
115    /// This is only marginally related to row collections, but many callers need to make
116    /// this adjustment.
117    pub fn offset_limit(mut total: usize, offset: usize, limit: Option<usize>) -> usize {
118        // Consider a possible OFFSET.
119        total = total.saturating_sub(offset);
120
121        // Consider a possible LIMIT.
122        if let Some(limit) = limit {
123            total = std::cmp::min(limit, total);
124        }
125
126        total
127    }
128
129    /// Total count of [`Row`]s represented by this collection.
130    pub fn count(&self) -> usize {
131        self.diffs.iter().map(|u| u.get()).sum()
132    }
133
134    /// Total count of ([`Row`], `EncodedRowMetadata`) pairs in this collection.
135    pub fn entries(&self) -> usize {
136        self.rows.len()
137    }
138
139    /// Returns the number of bytes this [`RowCollection`] uses.
140    pub fn byte_len(&self) -> usize {
141        // Count both the bytes in the byte array and the size of the offsets themselves.
142        let row_data_size = self
143            .rows
144            .byte_len()
145            .saturating_add(self.rows.len().saturating_mul(size_of::<usize>()));
146        let diff_size = self.diffs.len().saturating_mul(size_of::<NonZeroUsize>());
147        row_data_size.saturating_add(diff_size)
148    }
149
150    /// Returns a [`RowRef`] for the entry at `idx`, if one exists.
151    pub fn get(&self, idx: usize) -> Option<(&RowRef, &NonZeroUsize)> {
152        Some((self.rows.get(idx)?, self.diffs.get(idx)?))
153    }
154
155    /// "Sorts" the [`RowCollection`] by the column order in `order_by`. The output will be sorted
156    /// if the inputs were all sorted by the given order; otherwise, the order is unspecified.
157    /// In either case, the output will be a [RowCollection] that contains the full contents of all
158    /// the input collections.
159    pub fn merge_sorted(runs: &[Self], order_by: &[ColumnOrder]) -> RowCollection {
160        let comparator = RowComparator::new(order_by);
161        Self::merge_sorted_inner(runs, |a, b| comparator.compare_rows(a, b, || a.cmp(b)))
162    }
163
164    fn merge_sorted_inner<F>(runs: &[Self], cmp: F) -> RowCollection
165    where
166        F: Fn(&RowRef, &RowRef) -> std::cmp::Ordering,
167    {
168        let mut metadata_len = 0;
169        let mut encoded_len = 0;
170        for collection in runs.iter() {
171            metadata_len += collection.rows.len();
172            encoded_len += collection.rows.byte_len();
173        }
174
175        let mut builder = Self::builder(encoded_len, metadata_len);
176
177        for (row, diff) in
178            mz_ore::iter::merge_iters_by(runs.iter().map(|r| r.iter()), |(r0, _), (r1, _)| {
179                cmp(r0, r1)
180            })
181        {
182            builder.push(row, diff);
183        }
184        builder.build()
185    }
186}
187
188#[derive(Debug, Clone)]
189pub struct RowCollectionIter {
190    /// Collection we're iterating over.
191    collection: RowCollection,
192
193    /// Index for the row we're currently referencing.
194    row_idx: usize,
195    /// Number of diffs we've emitted for the current row.
196    diff_idx: usize,
197
198    /// Maximum number of rows this iterator will yield.
199    limit: Option<usize>,
200    /// Number of rows we're offset by.
201    ///
202    /// Note: We eagerly apply an offset, but we track it here so we can
203    /// accurately report [`RowIterator::count`].
204    offset: usize,
205
206    /// Columns to underlying rows to include.
207    projection: Option<Vec<usize>>,
208    /// Allocations that we reuse for every iteration to project columns.
209    projection_buf: (DatumVec, Row),
210}
211
212impl RowCollectionIter {
213    /// Returns the inner `RowCollection`.
214    pub fn into_inner(self) -> RowCollection {
215        self.collection
216    }
217
218    /// Immediately applies an offset to this iterator.
219    pub fn apply_offset(mut self, offset: usize) -> RowCollectionIter {
220        Self::advance_by(
221            &self.collection,
222            &mut self.row_idx,
223            &mut self.diff_idx,
224            offset,
225        );
226
227        // Keep track of how many rows we've offset by.
228        self.offset = self.offset.saturating_add(offset);
229
230        self
231    }
232
233    /// Sets the limit for this iterator.
234    pub fn with_limit(mut self, limit: usize) -> RowCollectionIter {
235        self.limit = Some(limit);
236        self
237    }
238
239    /// Specify the columns that should be yielded.
240    pub fn with_projection(mut self, projection: Vec<usize>) -> RowCollectionIter {
241        // Omit the projection if it would be a no-op to avoid a relatively expensive memcpy.
242        if let Some((row, _)) = self.collection.get(0) {
243            let cols = row.into_iter().enumerate().map(|(idx, _datum)| idx);
244            if projection.iter().copied().eq(cols) {
245                return self;
246            }
247        }
248
249        self.projection = Some(projection);
250        self
251    }
252
253    /// Helper method for implementing [`RowIterator`].
254    ///
255    /// Advances the internal pointers by the specified amount.
256    fn advance_by(
257        collection: &RowCollection,
258        row_idx: &mut usize,
259        diff_idx: &mut usize,
260        mut count: usize,
261    ) {
262        while count > 0 {
263            let Some((_, row_meta)) = collection.get(*row_idx) else {
264                return;
265            };
266
267            let remaining_diff = row_meta.get() - *diff_idx;
268            if remaining_diff <= count {
269                *diff_idx = 0;
270                *row_idx += 1;
271                count -= remaining_diff;
272            } else {
273                *diff_idx += count;
274                count = 0;
275            }
276        }
277    }
278
279    /// Helper method for implementing [`RowIterator`].
280    ///
281    /// Projects columns for the provided `row`.
282    fn project<'a>(
283        projection: Option<&[usize]>,
284        row: &'a RowRef,
285        datum_buf: &'a mut DatumVec,
286        row_buf: &'a mut Row,
287    ) -> &'a RowRef {
288        if let Some(projection) = projection {
289            // Copy the required columns into our reusable buffer.
290            {
291                let datums = datum_buf.borrow_with(row);
292                row_buf
293                    .packer()
294                    .extend(projection.iter().map(|i| &datums[*i]));
295            }
296
297            row_buf
298        } else {
299            row
300        }
301    }
302}
303
304impl RowIterator for RowCollectionIter {
305    fn next(&mut self) -> Option<&RowRef> {
306        // Bail if we've reached our limit.
307        if let Some(0) = self.limit {
308            return None;
309        }
310
311        let row = self.collection.get(self.row_idx).map(|(r, _)| r)?;
312
313        // If we're about to yield a row, then subtract from our limit.
314        if let Some(limit) = &mut self.limit {
315            *limit = limit.saturating_sub(1);
316        }
317
318        // Advance to the next row.
319        Self::advance_by(&self.collection, &mut self.row_idx, &mut self.diff_idx, 1);
320
321        // Project away and/or re-order any columns.
322        let (datum_buf, row_buf) = &mut self.projection_buf;
323        Some(Self::project(
324            self.projection.as_deref(),
325            row,
326            datum_buf,
327            row_buf,
328        ))
329    }
330
331    fn peek(&mut self) -> Option<&RowRef> {
332        // Bail if we've reached our limit.
333        if let Some(0) = self.limit {
334            return None;
335        }
336
337        let row = self.collection.get(self.row_idx).map(|(r, _)| r)?;
338
339        // Note: Unlike `next()` we do not subtract from our limit, nor advance
340        // the internal pointers.
341
342        // Project away and/or re-order any columns.
343        let (datum_buf, row_buf) = &mut self.projection_buf;
344        Some(Self::project(
345            self.projection.as_deref(),
346            row,
347            datum_buf,
348            row_buf,
349        ))
350    }
351
352    fn count(&self) -> usize {
353        RowCollection::offset_limit(self.collection.count(), self.offset, self.limit)
354    }
355
356    fn box_clone(&self) -> Box<dyn RowIterator> {
357        Box::new(self.clone())
358    }
359}
360
361impl IntoRowIterator for RowCollection {
362    type Iter = RowCollectionIter;
363
364    fn into_row_iter(self) -> Self::Iter {
365        RowCollectionIter {
366            collection: self,
367            row_idx: 0,
368            diff_idx: 0,
369            limit: None,
370            offset: 0,
371            projection: None,
372            // Note: Neither of these types allocate until elements are pushed in.
373            projection_buf: (DatumVec::new(), Row::default()),
374        }
375    }
376}
377
378#[cfg(test)]
379mod tests {
380    use std::borrow::Borrow;
381
382    use mz_ore::assert_none;
383    use mz_repr::Datum;
384    use proptest::prelude::*;
385    use proptest::test_runner::Config;
386
387    use super::*;
388
389    impl<'a, T: IntoIterator<Item = &'a Row>> From<T> for RowCollection {
390        fn from(rows: T) -> Self {
391            let mut encoded = Rows::builder(0, 0);
392            let mut diffs = vec![];
393
394            for row in rows {
395                encoded.push(row.as_row_ref());
396                diffs.push(NonZeroUsize::MIN);
397            }
398
399            RowCollection {
400                rows: encoded.build(),
401                diffs: diffs.into(),
402            }
403        }
404    }
405
406    #[mz_ore::test]
407    fn test_row_collection() {
408        let a = Row::pack_slice(&[Datum::False, Datum::String("hello world"), Datum::Int16(42)]);
409        let b = Row::pack_slice(&[Datum::MzTimestamp(mz_repr::Timestamp::new(10))]);
410
411        let collection = RowCollection::from([&a, &b]);
412
413        let (a_rnd, _) = collection.get(0).unwrap();
414        assert_eq!(a_rnd, a.borrow());
415
416        let (b_rnd, _) = collection.get(1).unwrap();
417        assert_eq!(b_rnd, b.borrow());
418    }
419
420    #[mz_ore::test]
421    fn test_merge() {
422        let a = Row::pack_slice(&[Datum::False, Datum::String("hello world"), Datum::Int16(42)]);
423        let b = Row::pack_slice(&[Datum::MzTimestamp(mz_repr::Timestamp::new(10))]);
424
425        let mut a_col = RowCollection::from([&a]);
426        let b_col = RowCollection::from([&b]);
427
428        a_col.concat(&b_col);
429
430        assert_eq!(a_col.count(), 2);
431        assert_eq!(a_col.get(0).map(|(r, _)| r), Some(a.borrow()));
432        assert_eq!(a_col.get(1).map(|(r, _)| r), Some(b.borrow()));
433    }
434
435    #[mz_ore::test]
436    fn test_sort() {
437        let a = Row::pack_slice(&[Datum::False, Datum::String("hello world"), Datum::Int16(42)]);
438        let b = Row::pack_slice(&[Datum::MzTimestamp(mz_repr::Timestamp::new(10))]);
439        let c = Row::pack_slice(&[Datum::True, Datum::String("hello world"), Datum::Int16(42)]);
440        let d = Row::pack_slice(&[Datum::MzTimestamp(mz_repr::Timestamp::new(9))]);
441
442        let cols = {
443            let mut part = [&a, &b];
444            part.sort_by(|a, b| a.cmp(b));
445            let part1 = RowCollection::from(part);
446            let mut part = [&c, &d];
447            part.sort_by(|a, b| a.cmp(b));
448            let part2 = RowCollection::from(part);
449            vec![part1, part2]
450        };
451        let mut rows = [a, b, c, d];
452
453        let sorted_view = RowCollection::merge_sorted(&cols, &[]);
454        rows.sort_by(|a, b| a.cmp(b));
455
456        for i in 0..rows.len() {
457            let (row_x, _) = sorted_view.get(i).unwrap();
458            let row_y = rows.get(i).unwrap();
459
460            assert_eq!(row_x, row_y.borrow());
461        }
462    }
463
464    #[mz_ore::test]
465    fn test_sorted_iter() {
466        let a = Row::pack_slice(&[Datum::String("hello world")]);
467        let b = Row::pack_slice(&[Datum::UInt32(42)]);
468        let col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(3).unwrap())], &[]);
469        let col = RowCollection::merge_sorted(
470            &[
471                col,
472                RowCollection::new(vec![(b.clone(), NonZeroUsize::new(2).unwrap())], &[]),
473            ],
474            &[],
475        );
476        let mut iter = col.into_row_iter();
477
478        // Peek shouldn't advance the iterator.
479        assert_eq!(iter.peek(), Some(b.borrow()));
480
481        assert_eq!(iter.next(), Some(b.borrow()));
482        assert_eq!(iter.next(), Some(b.borrow()));
483        assert_eq!(iter.next(), Some(a.borrow()));
484        assert_eq!(iter.next(), Some(a.borrow()));
485        assert_eq!(iter.next(), Some(a.borrow()));
486        assert_eq!(iter.next(), None);
487
488        // For good measure make sure we don't panic.
489        assert_eq!(iter.next(), None);
490        assert_eq!(iter.peek(), None);
491    }
492
493    #[mz_ore::test]
494    fn test_sorted_iter_offset() {
495        let a = Row::pack_slice(&[Datum::String("hello world")]);
496        let b = Row::pack_slice(&[Datum::UInt32(42)]);
497        let col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(3).unwrap())], &[]);
498        let col = RowCollection::merge_sorted(
499            &[
500                col,
501                RowCollection::new(vec![(b.clone(), NonZeroUsize::new(2).unwrap())], &[]),
502            ],
503            &[],
504        );
505
506        // Test with a reasonable offset that does not span rows.
507        let mut iter = col.into_row_iter().apply_offset(1);
508        assert_eq!(iter.next(), Some(b.borrow()));
509        assert_eq!(iter.next(), Some(a.borrow()));
510        assert_eq!(iter.next(), Some(a.borrow()));
511        assert_eq!(iter.next(), Some(a.borrow()));
512        assert_eq!(iter.next(), None);
513        assert_eq!(iter.next(), None);
514
515        let col = iter.into_inner();
516
517        // Test with an offset that spans the first row.
518        let mut iter = col.into_row_iter().apply_offset(3);
519
520        assert_eq!(iter.peek(), Some(a.borrow()));
521
522        assert_eq!(iter.next(), Some(a.borrow()));
523        assert_eq!(iter.next(), Some(a.borrow()));
524        assert_eq!(iter.next(), None);
525        assert_eq!(iter.next(), None);
526
527        let col = iter.into_inner();
528
529        // Test with an offset that passes the entire collection.
530        let mut iter = col.into_row_iter().apply_offset(100);
531        assert_eq!(iter.peek(), None);
532        assert_eq!(iter.next(), None);
533        assert_eq!(iter.peek(), None);
534        assert_eq!(iter.next(), None);
535    }
536
537    #[mz_ore::test]
538    fn test_sorted_iter_limit() {
539        let a = Row::pack_slice(&[Datum::String("hello world")]);
540        let b = Row::pack_slice(&[Datum::UInt32(42)]);
541        let col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(3).unwrap())], &[]);
542        let col = RowCollection::merge_sorted(
543            &[
544                col,
545                RowCollection::new(vec![(b.clone(), NonZeroUsize::new(2).unwrap())], &[]),
546            ],
547            &[],
548        );
549
550        // Test with a limit that spans only the first row.
551        let mut iter = col.into_row_iter().with_limit(1);
552        assert_eq!(iter.next(), Some(b.borrow()));
553        assert_eq!(iter.next(), None);
554        assert_eq!(iter.next(), None);
555
556        let col = iter.into_inner();
557
558        // Test with a limit that spans both rows.
559        let mut iter = col.into_row_iter().with_limit(4);
560        assert_eq!(iter.peek(), Some(b.borrow()));
561        assert_eq!(iter.next(), Some(b.borrow()));
562        assert_eq!(iter.next(), Some(b.borrow()));
563
564        assert_eq!(iter.peek(), Some(a.borrow()));
565        assert_eq!(iter.next(), Some(a.borrow()));
566        assert_eq!(iter.next(), Some(a.borrow()));
567
568        assert_eq!(iter.next(), None);
569        assert_eq!(iter.next(), None);
570
571        let col = iter.into_inner();
572
573        // Test with a limit that is more rows than we have.
574        let mut iter = col.into_row_iter().with_limit(1000);
575        assert_eq!(iter.next(), Some(b.borrow()));
576        assert_eq!(iter.next(), Some(b.borrow()));
577        assert_eq!(iter.next(), Some(a.borrow()));
578        assert_eq!(iter.next(), Some(a.borrow()));
579        assert_eq!(iter.next(), Some(a.borrow()));
580        assert_eq!(iter.next(), None);
581        assert_eq!(iter.next(), None);
582
583        let col = iter.into_inner();
584
585        // Test with a limit of 0.
586        let mut iter = col.into_row_iter().with_limit(0);
587        assert_eq!(iter.peek(), None);
588        assert_eq!(iter.next(), None);
589        assert_eq!(iter.next(), None);
590    }
591
592    #[mz_ore::test]
593    fn test_mapped_row_iterator() {
594        let a = Row::pack_slice(&[Datum::String("hello world")]);
595        let col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(3).unwrap())], &[]);
596
597        // Make sure we can call `.map` on a `dyn RowIterator`.
598        let iter: Box<dyn RowIterator> = Box::new(col.into_row_iter());
599
600        let mut mapped = iter.map(|f| f.to_owned());
601        assert!(mapped.next().is_some());
602        assert!(mapped.next().is_some());
603        assert!(mapped.next().is_some());
604        assert_none!(mapped.next());
605        assert_none!(mapped.next());
606    }
607
608    #[mz_ore::test]
609    fn test_projected_row_iterator() {
610        let a = Row::pack_slice(&[Datum::String("hello world"), Datum::Int16(42)]);
611        let col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(2).unwrap())], &[]);
612
613        // Project away the first column.
614        let mut iter = col.into_row_iter().with_projection(vec![1]);
615
616        let projected_a = Row::pack_slice(&[Datum::Int16(42)]);
617        assert_eq!(iter.next(), Some(projected_a.as_ref()));
618        assert_eq!(iter.next(), Some(projected_a.as_ref()));
619        assert_eq!(iter.next(), None);
620        assert_eq!(iter.next(), None);
621
622        let col = iter.into_inner();
623
624        // Project away all columns.
625        let mut iter = col.into_row_iter().with_projection(vec![]);
626
627        let projected_a = Row::default();
628        assert_eq!(iter.next(), Some(projected_a.as_ref()));
629        assert_eq!(iter.next(), Some(projected_a.as_ref()));
630        assert_eq!(iter.next(), None);
631        assert_eq!(iter.next(), None);
632
633        let col = iter.into_inner();
634
635        // Include all columns.
636        let mut iter = col.into_row_iter().with_projection(vec![0, 1]);
637
638        assert_eq!(iter.next(), Some(a.as_ref()));
639        assert_eq!(iter.next(), Some(a.as_ref()));
640        assert_eq!(iter.next(), None);
641        assert_eq!(iter.next(), None);
642
643        let col = iter.into_inner();
644
645        // Swap the order of columns.
646        let mut iter = col.into_row_iter().with_projection(vec![1, 0]);
647
648        let projected_a = Row::pack_slice(&[Datum::Int16(42), Datum::String("hello world")]);
649        assert_eq!(iter.next(), Some(projected_a.as_ref()));
650        assert_eq!(iter.next(), Some(projected_a.as_ref()));
651        assert_eq!(iter.next(), None);
652        assert_eq!(iter.next(), None);
653    }
654
655    #[mz_ore::test]
656    fn test_count_respects_limit_and_offset() {
657        let a = Row::pack_slice(&[Datum::String("hello world")]);
658        let b = Row::pack_slice(&[Datum::UInt32(42)]);
659        let col = RowCollection::new(
660            vec![
661                (a.clone(), NonZeroUsize::new(3).unwrap()),
662                (b.clone(), NonZeroUsize::new(2).unwrap()),
663            ],
664            &[],
665        );
666
667        // How many total rows there are.
668        let iter = col.into_row_iter();
669        assert_eq!(iter.count(), 5);
670
671        let col = iter.into_inner();
672
673        // With a LIMIT.
674        let iter = col.into_row_iter().with_limit(1);
675        assert_eq!(iter.count(), 1);
676
677        let col = iter.into_inner();
678
679        // With a LIMIT larger than the total number of rows.
680        let iter = col.into_row_iter().with_limit(100);
681        assert_eq!(iter.count(), 5);
682
683        let col = iter.into_inner();
684
685        // With an OFFSET.
686        let iter = col.into_row_iter().apply_offset(3);
687        assert_eq!(iter.count(), 2);
688
689        let col = iter.into_inner();
690
691        // With an OFFSET greater than the total number of rows.
692        let iter = col.into_row_iter().apply_offset(100);
693        assert_eq!(iter.count(), 0);
694
695        let col = iter.into_inner();
696
697        // With a LIMIT and an OFFSET.
698        let iter = col.into_row_iter().with_limit(2).apply_offset(4);
699        assert_eq!(iter.count(), 1);
700    }
701
702    #[mz_ore::test]
703    #[cfg_attr(miri, ignore)] // too slow
704    fn proptest_row_collection() {
705        fn row_collection_roundtrips(rows: Vec<Row>) {
706            let collection = RowCollection::from(&rows);
707
708            for i in 0..rows.len() {
709                let (a, _) = collection.get(i).unwrap();
710                let b = rows.get(i).unwrap().borrow();
711
712                assert_eq!(a, b);
713            }
714        }
715
716        // This test is slow, so we limit the default number of test cases.
717        proptest!(
718            Config { cases: 5, ..Default::default() },
719            |(rows in any::<Vec<Row>>())| {
720                // The proptest! macro interferes with rustfmt.
721                row_collection_roundtrips(rows)
722            }
723        );
724    }
725
726    #[mz_ore::test]
727    #[cfg_attr(miri, ignore)] // too slow
728    fn proptest_merge() {
729        fn row_collection_merge(a: Vec<Row>, b: Vec<Row>) {
730            let mut a_col = RowCollection::from(&a);
731            let b_col = RowCollection::from(&b);
732
733            a_col.concat(&b_col);
734
735            let all_rows = a.iter().chain(b.iter());
736            for (idx, row) in all_rows.enumerate() {
737                let (col_row, _) = a_col.get(idx).unwrap();
738                assert_eq!(col_row, row.borrow());
739            }
740        }
741
742        // This test is slow, so we limit the default number of test cases.
743        proptest!(
744            Config { cases: 3, ..Default::default() },
745            |(a in any::<Vec<Row>>(), b in any::<Vec<Row>>())| {
746                // The proptest! macro interferes with rustfmt.
747                row_collection_merge(a, b)
748            }
749        );
750    }
751
752    #[mz_ore::test]
753    #[cfg_attr(miri, ignore)] // too slow
754    fn proptest_sort() {
755        fn row_collection_sort(mut a: Vec<Row>, mut b: Vec<Row>) {
756            a.sort_by(|a, b| a.cmp(b));
757            b.sort_by(|a, b| a.cmp(b));
758
759            let sorted_view = RowCollection::merge_sorted(
760                &[RowCollection::from(&a), RowCollection::from(&b)],
761                &[],
762            );
763
764            a.append(&mut b);
765            a.sort_by(|a, b| a.cmp(b));
766
767            for i in 0..a.len() {
768                let (row_x, _) = sorted_view.get(i).unwrap();
769                let row_y = a.get(i).unwrap();
770
771                assert_eq!(row_x, row_y.borrow());
772            }
773        }
774
775        // This test is slow, so we limit the default number of test cases.
776        proptest!(
777            Config { cases: 5, ..Default::default() },
778            |(a in any::<Vec<Row>>(), b in any::<Vec<Row>>())| {
779                // The proptest! macro interferes with rustfmt.
780                row_collection_sort(a, b)
781            }
782        );
783    }
784}