mz_expr/row/
collection.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Defines types for working with collections of [`Row`].
11
12use std::cell::RefCell;
13use std::cmp::Reverse;
14use std::collections::BinaryHeap;
15use std::num::NonZeroUsize;
16use std::sync::Arc;
17
18use bytes::Bytes;
19use mz_ore::cast::CastFrom;
20use mz_proto::RustType;
21use mz_repr::{DatumVec, IntoRowIterator, Row, RowIterator, RowRef};
22use serde::{Deserialize, Serialize};
23
24use crate::ColumnOrder;
25
26include!(concat!(env!("OUT_DIR"), "/mz_expr.row.collection.rs"));
27
28/// Collection of runs of sorted [`Row`]s represented as a single blob.
29///
30/// Note: the encoding format we use to represent [`Row`]s in this struct is
31/// not stable, and thus should never be persisted durably.
32#[derive(Default, Debug, Clone, PartialEq)]
33pub struct RowCollection {
34    /// Contiguous blob of encoded Rows.
35    encoded: Bytes,
36    /// Metadata about an individual Row in the blob.
37    metadata: Arc<[EncodedRowMetadata]>,
38    /// Ends of non-empty, sorted runs of rows in index into `metadata`.
39    runs: Vec<usize>,
40}
41
42impl RowCollection {
43    /// Create a new [`RowCollection`] from a collection of [`Row`]s. Sorts data by `order_by`.
44    ///
45    /// Note that all row collections to be merged must be constructed with the same `order_by`
46    /// to ensure a consistent sort order. Anything else is undefined behavior.
47    // TODO: Remember the `order_by` and assert that it is the same for all collections.
48    pub fn new(mut rows: Vec<(Row, NonZeroUsize)>, order_by: &[ColumnOrder]) -> Self {
49        // Sort data to maintain sortedness invariants.
50        if order_by.is_empty() {
51            // Skip row decoding if not required.
52            rows.sort();
53        } else {
54            let (mut datum_vec1, mut datum_vec2) = (DatumVec::new(), DatumVec::new());
55            rows.sort_by(|(row1, _diff1), (row2, _diff2)| {
56                let borrow1 = datum_vec1.borrow_with(row1);
57                let borrow2 = datum_vec2.borrow_with(row2);
58                crate::compare_columns(order_by, &borrow1, &borrow2, || row1.cmp(row2))
59            });
60        }
61
62        // Pre-sizing our buffer should allow us to make just 1 allocation, and
63        // use the perfect amount of memory.
64        //
65        // Note(parkmycar): I didn't do any benchmarking to determine if this
66        // is faster, so feel free to change this if you'd like.
67        let encoded_size = rows.iter().map(|(row, _diff)| row.data_len()).sum();
68
69        let mut encoded = Vec::<u8>::with_capacity(encoded_size);
70        let mut metadata = Vec::<EncodedRowMetadata>::with_capacity(rows.len());
71        let runs = (!rows.is_empty())
72            .then(|| vec![rows.len()])
73            .unwrap_or_default();
74
75        for (row, diff) in rows {
76            encoded.extend(row.data());
77            metadata.push(EncodedRowMetadata {
78                offset: encoded.len(),
79                diff,
80            });
81        }
82
83        RowCollection {
84            encoded: Bytes::from(encoded),
85            metadata: metadata.into(),
86            runs,
87        }
88    }
89
90    /// Merge another [`RowCollection`] into `self`.
91    pub fn merge(&mut self, other: &RowCollection) {
92        if other.count(0, None) == 0 {
93            return;
94        }
95
96        // TODO(parkmycar): Using SegmentedBytes here would be nice.
97        let mut new_bytes = vec![0; self.encoded.len() + other.encoded.len()];
98        new_bytes[..self.encoded.len()].copy_from_slice(&self.encoded[..]);
99        new_bytes[self.encoded.len()..].copy_from_slice(&other.encoded[..]);
100
101        let mapped_metas = other.metadata.iter().map(|meta| EncodedRowMetadata {
102            offset: meta.offset + self.encoded.len(),
103            diff: meta.diff,
104        });
105        let self_len = self.metadata.len();
106
107        self.metadata = self.metadata.iter().cloned().chain(mapped_metas).collect();
108        self.encoded = Bytes::from(new_bytes);
109        self.runs.extend(other.runs.iter().map(|f| f + self_len));
110    }
111
112    /// Total count of [`Row`]s represented by this collection, considering a
113    /// possible `OFFSET` and `LIMIT`.
114    pub fn count(&self, offset: usize, limit: Option<usize>) -> usize {
115        let mut total: usize = self.metadata.iter().map(|meta| meta.diff.get()).sum();
116
117        // Consider a possible OFFSET.
118        total = total.saturating_sub(offset);
119
120        // Consider a possible LIMIT.
121        if let Some(limit) = limit {
122            total = std::cmp::min(limit, total);
123        }
124
125        total
126    }
127
128    /// Total count of ([`Row`], `EncodedRowMetadata`) pairs in this collection.
129    pub fn entries(&self) -> usize {
130        self.metadata.len()
131    }
132
133    /// Returns the number of bytes this [`RowCollection`] uses.
134    pub fn byte_len(&self) -> usize {
135        let row_data_size = self.encoded.len();
136        let metadata_size = self
137            .metadata
138            .len()
139            .saturating_mul(std::mem::size_of::<EncodedRowMetadata>());
140
141        row_data_size.saturating_add(metadata_size)
142    }
143
144    /// Returns a [`RowRef`] for the entry at `idx`, if one exists.
145    pub fn get(&self, idx: usize) -> Option<(&RowRef, &EncodedRowMetadata)> {
146        let (lower_offset, upper) = match idx {
147            0 => (0, self.metadata.get(idx)?),
148            _ => {
149                let lower = self.metadata.get(idx - 1).map(|m| m.offset)?;
150                let upper = self.metadata.get(idx)?;
151                (lower, upper)
152            }
153        };
154
155        let slice = &self.encoded[lower_offset..upper.offset];
156        let row = RowRef::from_slice(slice);
157
158        Some((row, upper))
159    }
160
161    /// "Sorts" the [`RowCollection`] by the column order in `order_by`. Returns a sorted view over
162    /// the collection.
163    pub fn sorted_view(self, order_by: &[ColumnOrder]) -> SortedRowCollection {
164        if order_by.is_empty() {
165            self.sorted_view_inner(&Ord::cmp)
166        } else {
167            let left_datum_vec = RefCell::new(mz_repr::DatumVec::new());
168            let right_datum_vec = RefCell::new(mz_repr::DatumVec::new());
169
170            let cmp = &|left: &RowRef, right: &RowRef| {
171                let (mut left_datum_vec, mut right_datum_vec) =
172                    (left_datum_vec.borrow_mut(), right_datum_vec.borrow_mut());
173                let left_datums = left_datum_vec.borrow_with(left);
174                let right_datums = right_datum_vec.borrow_with(right);
175                crate::compare_columns(order_by, &left_datums, &right_datums, || left.cmp(right))
176            };
177            self.sorted_view_inner(cmp)
178        }
179    }
180
181    fn sorted_view_inner<F>(self, cmp: &F) -> SortedRowCollection
182    where
183        F: Fn(&RowRef, &RowRef) -> std::cmp::Ordering,
184    {
185        let mut heap = BinaryHeap::with_capacity(self.runs.len());
186
187        for index in 0..self.runs.len() {
188            let start = (index > 0).then(|| self.runs[index - 1]).unwrap_or(0);
189            let end = self.runs[index];
190
191            heap.push(Reverse(RunIter {
192                collection: &self,
193                cmp,
194                range: start..end,
195            }));
196        }
197
198        let mut view = Vec::with_capacity(self.metadata.len());
199
200        while let Some(Reverse(mut run)) = heap.pop() {
201            if let Some(next) = run.range.next() {
202                view.push(next);
203                if !run.range.is_empty() {
204                    heap.push(Reverse(run));
205                }
206            }
207        }
208
209        SortedRowCollection {
210            collection: self,
211            sorted_view: view.into(),
212        }
213    }
214}
215
216impl RustType<ProtoRowCollection> for RowCollection {
217    fn into_proto(&self) -> ProtoRowCollection {
218        ProtoRowCollection {
219            encoded: Bytes::clone(&self.encoded),
220            metadata: self
221                .metadata
222                .iter()
223                .map(EncodedRowMetadata::into_proto)
224                .collect(),
225            runs: self.runs.iter().copied().map(u64::cast_from).collect(),
226        }
227    }
228
229    fn from_proto(proto: ProtoRowCollection) -> Result<Self, mz_proto::TryFromProtoError> {
230        Ok(RowCollection {
231            encoded: proto.encoded,
232            metadata: proto
233                .metadata
234                .into_iter()
235                .map(EncodedRowMetadata::from_proto)
236                .collect::<Result<_, _>>()?,
237            runs: proto.runs.into_iter().map(usize::cast_from).collect(),
238        })
239    }
240}
241
242/// Inner type of [`RowCollection`], describes a single Row.
243#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
244pub struct EncodedRowMetadata {
245    /// Offset into the binary blob of encoded rows.
246    ///
247    /// TODO(parkmycar): Consider making this a `u32`.
248    offset: usize,
249    /// Diff for the Row.
250    ///
251    /// TODO(parkmycar): Consider making this a smaller type, note that some compute introspection
252    /// collections, e.g. `mz_scheduling_elapsed_raw`, encodes nano seconds in the diff field which
253    /// requires a u64.
254    diff: NonZeroUsize,
255}
256
257impl RustType<ProtoEncodedRowMetadata> for EncodedRowMetadata {
258    fn into_proto(&self) -> ProtoEncodedRowMetadata {
259        ProtoEncodedRowMetadata {
260            offset: u64::cast_from(self.offset),
261            diff: self.diff.into_proto(),
262        }
263    }
264
265    fn from_proto(proto: ProtoEncodedRowMetadata) -> Result<Self, mz_proto::TryFromProtoError> {
266        Ok(EncodedRowMetadata {
267            offset: usize::cast_from(proto.offset),
268            diff: NonZeroUsize::from_proto(proto.diff)?,
269        })
270    }
271}
272
273/// Provides a sorted view of a [`RowCollection`].
274#[derive(Debug, Clone)]
275pub struct SortedRowCollection {
276    /// The inner [`RowCollection`].
277    collection: RowCollection,
278    /// Indexes into the inner collection that represent the sorted order.
279    sorted_view: Arc<[usize]>,
280}
281
282impl SortedRowCollection {
283    pub fn get(&self, idx: usize) -> Option<(&RowRef, &EncodedRowMetadata)> {
284        self.sorted_view
285            .get(idx)
286            .and_then(|inner_idx| self.collection.get(*inner_idx))
287    }
288}
289
290#[derive(Debug, Clone)]
291pub struct SortedRowCollectionIter {
292    /// Collection we're iterating over.
293    collection: SortedRowCollection,
294
295    /// Index for the row we're currently referencing.
296    row_idx: usize,
297    /// Number of diffs we've emitted for the current row.
298    diff_idx: usize,
299
300    /// Maximum number of rows this iterator will yield.
301    limit: Option<usize>,
302    /// Number of rows we're offset by.
303    ///
304    /// Note: We eagerly apply an offset, but we track it here so we can
305    /// accurately report [`RowIterator::count`].
306    offset: usize,
307
308    /// Columns to underlying rows to include.
309    projection: Option<Vec<usize>>,
310    /// Allocations that we reuse for every iteration to project columns.
311    projection_buf: (DatumVec, Row),
312}
313
314impl SortedRowCollectionIter {
315    /// Returns the inner `SortedRowCollection`.
316    pub fn into_inner(self) -> SortedRowCollection {
317        self.collection
318    }
319
320    /// Immediately applies an offset to this iterator.
321    pub fn apply_offset(mut self, offset: usize) -> SortedRowCollectionIter {
322        Self::advance_by(
323            &self.collection,
324            &mut self.row_idx,
325            &mut self.diff_idx,
326            offset,
327        );
328
329        // Keep track of how many rows we've offset by.
330        self.offset = self.offset.saturating_add(offset);
331
332        self
333    }
334
335    /// Sets the limit for this iterator.
336    pub fn with_limit(mut self, limit: usize) -> SortedRowCollectionIter {
337        self.limit = Some(limit);
338        self
339    }
340
341    /// Specify the columns that should be yielded.
342    pub fn with_projection(mut self, projection: Vec<usize>) -> SortedRowCollectionIter {
343        // Omit the projection if it would be a no-op to avoid a relatively expensive memcpy.
344        if let Some((row, _)) = self.collection.get(0) {
345            let cols = row.into_iter().enumerate().map(|(idx, _datum)| idx);
346            if projection.iter().copied().eq(cols) {
347                return self;
348            }
349        }
350
351        self.projection = Some(projection);
352        self
353    }
354
355    /// Helper method for implementing [`RowIterator`].
356    ///
357    /// Advances the internal pointers by the specified amount.
358    fn advance_by(
359        collection: &SortedRowCollection,
360        row_idx: &mut usize,
361        diff_idx: &mut usize,
362        mut count: usize,
363    ) {
364        while count > 0 {
365            let Some((_, row_meta)) = collection.get(*row_idx) else {
366                return;
367            };
368
369            let remaining_diff = row_meta.diff.get() - *diff_idx;
370            if remaining_diff <= count {
371                *diff_idx = 0;
372                *row_idx += 1;
373                count -= remaining_diff;
374            } else {
375                *diff_idx += count;
376                count = 0;
377            }
378        }
379    }
380
381    /// Helper method for implementing [`RowIterator`].
382    ///
383    /// Projects columns for the provided `row`.
384    fn project<'a>(
385        projection: Option<&[usize]>,
386        row: &'a RowRef,
387        datum_buf: &'a mut DatumVec,
388        row_buf: &'a mut Row,
389    ) -> &'a RowRef {
390        if let Some(projection) = projection {
391            // Copy the required columns into our reusable buffer.
392            {
393                let datums = datum_buf.borrow_with(row);
394                row_buf
395                    .packer()
396                    .extend(projection.iter().map(|i| &datums[*i]));
397            }
398
399            row_buf
400        } else {
401            row
402        }
403    }
404}
405
406impl RowIterator for SortedRowCollectionIter {
407    fn next(&mut self) -> Option<&RowRef> {
408        // Bail if we've reached our limit.
409        if let Some(0) = self.limit {
410            return None;
411        }
412
413        let row = self.collection.get(self.row_idx).map(|(r, _)| r)?;
414
415        // If we're about to yield a row, then subtract from our limit.
416        if let Some(limit) = &mut self.limit {
417            *limit = limit.saturating_sub(1);
418        }
419
420        // Advance to the next row.
421        Self::advance_by(&self.collection, &mut self.row_idx, &mut self.diff_idx, 1);
422
423        // Project away and/or re-order any columns.
424        let (datum_buf, row_buf) = &mut self.projection_buf;
425        Some(Self::project(
426            self.projection.as_deref(),
427            row,
428            datum_buf,
429            row_buf,
430        ))
431    }
432
433    fn peek(&mut self) -> Option<&RowRef> {
434        // Bail if we've reached our limit.
435        if let Some(0) = self.limit {
436            return None;
437        }
438
439        let row = self.collection.get(self.row_idx).map(|(r, _)| r)?;
440
441        // Note: Unlike `next()` we do not subtract from our limit, nor advance
442        // the internal pointers.
443
444        // Project away and/or re-order any columns.
445        let (datum_buf, row_buf) = &mut self.projection_buf;
446        Some(Self::project(
447            self.projection.as_deref(),
448            row,
449            datum_buf,
450            row_buf,
451        ))
452    }
453
454    fn count(&self) -> usize {
455        self.collection.collection.count(self.offset, self.limit)
456    }
457
458    fn box_clone(&self) -> Box<dyn RowIterator> {
459        Box::new(self.clone())
460    }
461}
462
463impl IntoRowIterator for SortedRowCollection {
464    type Iter = SortedRowCollectionIter;
465
466    fn into_row_iter(self) -> Self::Iter {
467        SortedRowCollectionIter {
468            collection: self,
469            row_idx: 0,
470            diff_idx: 0,
471            limit: None,
472            offset: 0,
473            projection: None,
474            // Note: Neither of these types allocate until elements are pushed in.
475            projection_buf: (DatumVec::new(), Row::default()),
476        }
477    }
478}
479
480/// Iterator-like struct to help with extracting rows in sorted order from `RowCollection`.
481struct RunIter<'a, F> {
482    collection: &'a RowCollection,
483    cmp: &'a F,
484    range: std::ops::Range<usize>,
485}
486
487impl<'a, F> PartialOrd for RunIter<'a, F>
488where
489    F: Fn(&RowRef, &RowRef) -> std::cmp::Ordering,
490{
491    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
492        Some(self.cmp(other))
493    }
494}
495
496impl<'a, F> Ord for RunIter<'a, F>
497where
498    F: Fn(&RowRef, &RowRef) -> std::cmp::Ordering,
499{
500    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
501        let left = self.collection.get(self.range.start).unwrap().0;
502        let right = self.collection.get(other.range.start).unwrap().0;
503        (self.cmp)(left, right)
504    }
505}
506
507impl<'a, F> PartialEq for RunIter<'a, F>
508where
509    F: Fn(&RowRef, &RowRef) -> std::cmp::Ordering,
510{
511    fn eq(&self, other: &Self) -> bool {
512        self.cmp(other) == std::cmp::Ordering::Equal
513    }
514}
515
516impl<'a, F> Eq for RunIter<'a, F> where F: Fn(&RowRef, &RowRef) -> std::cmp::Ordering {}
517
518#[cfg(test)]
519mod tests {
520    use std::borrow::Borrow;
521
522    use mz_ore::assert_none;
523    use mz_repr::Datum;
524    use proptest::prelude::*;
525    use proptest::test_runner::Config;
526
527    use super::*;
528
529    impl<'a, T: IntoIterator<Item = &'a Row>> From<T> for RowCollection {
530        fn from(rows: T) -> Self {
531            let mut encoded = Vec::<u8>::new();
532            let mut metadata = Vec::<EncodedRowMetadata>::new();
533
534            for row in rows {
535                encoded.extend(row.data());
536                metadata.push(EncodedRowMetadata {
537                    offset: encoded.len(),
538                    diff: NonZeroUsize::MIN,
539                });
540            }
541            let runs = vec![metadata.len()];
542
543            RowCollection {
544                encoded: Bytes::from(encoded),
545                metadata: metadata.into(),
546                runs,
547            }
548        }
549    }
550
551    #[mz_ore::test]
552    fn test_row_collection() {
553        let a = Row::pack_slice(&[Datum::False, Datum::String("hello world"), Datum::Int16(42)]);
554        let b = Row::pack_slice(&[Datum::MzTimestamp(mz_repr::Timestamp::new(10))]);
555
556        let collection = RowCollection::from([&a, &b]);
557
558        let (a_rnd, _) = collection.get(0).unwrap();
559        assert_eq!(a_rnd, a.borrow());
560
561        let (b_rnd, _) = collection.get(1).unwrap();
562        assert_eq!(b_rnd, b.borrow());
563    }
564
565    #[mz_ore::test]
566    fn test_merge() {
567        let a = Row::pack_slice(&[Datum::False, Datum::String("hello world"), Datum::Int16(42)]);
568        let b = Row::pack_slice(&[Datum::MzTimestamp(mz_repr::Timestamp::new(10))]);
569
570        let mut a_col = RowCollection::from([&a]);
571        let b_col = RowCollection::from([&b]);
572
573        a_col.merge(&b_col);
574
575        assert_eq!(a_col.count(0, None), 2);
576        assert_eq!(a_col.get(0).map(|(r, _)| r), Some(a.borrow()));
577        assert_eq!(a_col.get(1).map(|(r, _)| r), Some(b.borrow()));
578    }
579
580    #[mz_ore::test]
581    fn test_sort() {
582        let a = Row::pack_slice(&[Datum::False, Datum::String("hello world"), Datum::Int16(42)]);
583        let b = Row::pack_slice(&[Datum::MzTimestamp(mz_repr::Timestamp::new(10))]);
584        let c = Row::pack_slice(&[Datum::True, Datum::String("hello world"), Datum::Int16(42)]);
585        let d = Row::pack_slice(&[Datum::MzTimestamp(mz_repr::Timestamp::new(9))]);
586
587        let col = {
588            let mut part = [&a, &b];
589            part.sort_by(|a, b| a.cmp(b));
590            let mut part1 = RowCollection::from(part);
591            let mut part = [&c, &d];
592            part.sort_by(|a, b| a.cmp(b));
593            let part2 = RowCollection::from(part);
594            part1.merge(&part2);
595            part1
596        };
597        let mut rows = [a, b, c, d];
598
599        let sorted_view = col.sorted_view(&[]);
600        rows.sort_by(|a, b| a.cmp(b));
601
602        for i in 0..rows.len() {
603            let (row_x, _) = sorted_view.get(i).unwrap();
604            let row_y = rows.get(i).unwrap();
605
606            assert_eq!(row_x, row_y.borrow());
607        }
608    }
609
610    #[mz_ore::test]
611    fn test_sorted_iter() {
612        let a = Row::pack_slice(&[Datum::String("hello world")]);
613        let b = Row::pack_slice(&[Datum::UInt32(42)]);
614        let mut col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(3).unwrap())], &[]);
615        col.merge(&RowCollection::new(
616            vec![(b.clone(), NonZeroUsize::new(2).unwrap())],
617            &[],
618        ));
619        let col = col.sorted_view(&[]);
620        let mut iter = col.into_row_iter();
621
622        // Peek shouldn't advance the iterator.
623        assert_eq!(iter.peek(), Some(b.borrow()));
624
625        assert_eq!(iter.next(), Some(b.borrow()));
626        assert_eq!(iter.next(), Some(b.borrow()));
627        assert_eq!(iter.next(), Some(a.borrow()));
628        assert_eq!(iter.next(), Some(a.borrow()));
629        assert_eq!(iter.next(), Some(a.borrow()));
630        assert_eq!(iter.next(), None);
631
632        // For good measure make sure we don't panic.
633        assert_eq!(iter.next(), None);
634        assert_eq!(iter.peek(), None);
635    }
636
637    #[mz_ore::test]
638    fn test_sorted_iter_offset() {
639        let a = Row::pack_slice(&[Datum::String("hello world")]);
640        let b = Row::pack_slice(&[Datum::UInt32(42)]);
641        let mut col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(3).unwrap())], &[]);
642        col.merge(&RowCollection::new(
643            vec![(b.clone(), NonZeroUsize::new(2).unwrap())],
644            &[],
645        ));
646        let col = col.sorted_view(&[]);
647
648        // Test with a reasonable offset that does not span rows.
649        let mut iter = col.into_row_iter().apply_offset(1);
650        assert_eq!(iter.next(), Some(b.borrow()));
651        assert_eq!(iter.next(), Some(a.borrow()));
652        assert_eq!(iter.next(), Some(a.borrow()));
653        assert_eq!(iter.next(), Some(a.borrow()));
654        assert_eq!(iter.next(), None);
655        assert_eq!(iter.next(), None);
656
657        let col = iter.into_inner();
658
659        // Test with an offset that spans the first row.
660        let mut iter = col.into_row_iter().apply_offset(3);
661
662        assert_eq!(iter.peek(), Some(a.borrow()));
663
664        assert_eq!(iter.next(), Some(a.borrow()));
665        assert_eq!(iter.next(), Some(a.borrow()));
666        assert_eq!(iter.next(), None);
667        assert_eq!(iter.next(), None);
668
669        let col = iter.into_inner();
670
671        // Test with an offset that passes the entire collection.
672        let mut iter = col.into_row_iter().apply_offset(100);
673        assert_eq!(iter.peek(), None);
674        assert_eq!(iter.next(), None);
675        assert_eq!(iter.peek(), None);
676        assert_eq!(iter.next(), None);
677    }
678
679    #[mz_ore::test]
680    fn test_sorted_iter_limit() {
681        let a = Row::pack_slice(&[Datum::String("hello world")]);
682        let b = Row::pack_slice(&[Datum::UInt32(42)]);
683        let mut col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(3).unwrap())], &[]);
684        col.merge(&RowCollection::new(
685            vec![(b.clone(), NonZeroUsize::new(2).unwrap())],
686            &[],
687        ));
688        let col = col.sorted_view(&[]);
689
690        // Test with a limit that spans only the first row.
691        let mut iter = col.into_row_iter().with_limit(1);
692        assert_eq!(iter.next(), Some(b.borrow()));
693        assert_eq!(iter.next(), None);
694        assert_eq!(iter.next(), None);
695
696        let col = iter.into_inner();
697
698        // Test with a limit that spans both rows.
699        let mut iter = col.into_row_iter().with_limit(4);
700        assert_eq!(iter.peek(), Some(b.borrow()));
701        assert_eq!(iter.next(), Some(b.borrow()));
702        assert_eq!(iter.next(), Some(b.borrow()));
703
704        assert_eq!(iter.peek(), Some(a.borrow()));
705        assert_eq!(iter.next(), Some(a.borrow()));
706        assert_eq!(iter.next(), Some(a.borrow()));
707
708        assert_eq!(iter.next(), None);
709        assert_eq!(iter.next(), None);
710
711        let col = iter.into_inner();
712
713        // Test with a limit that is more rows than we have.
714        let mut iter = col.into_row_iter().with_limit(1000);
715        assert_eq!(iter.next(), Some(b.borrow()));
716        assert_eq!(iter.next(), Some(b.borrow()));
717        assert_eq!(iter.next(), Some(a.borrow()));
718        assert_eq!(iter.next(), Some(a.borrow()));
719        assert_eq!(iter.next(), Some(a.borrow()));
720        assert_eq!(iter.next(), None);
721        assert_eq!(iter.next(), None);
722
723        let col = iter.into_inner();
724
725        // Test with a limit of 0.
726        let mut iter = col.into_row_iter().with_limit(0);
727        assert_eq!(iter.peek(), None);
728        assert_eq!(iter.next(), None);
729        assert_eq!(iter.next(), None);
730    }
731
732    #[mz_ore::test]
733    fn test_mapped_row_iterator() {
734        let a = Row::pack_slice(&[Datum::String("hello world")]);
735        let col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(3).unwrap())], &[]);
736        let col = col.sorted_view(&[]);
737
738        // Make sure we can call `.map` on a `dyn RowIterator`.
739        let iter: Box<dyn RowIterator> = Box::new(col.into_row_iter());
740
741        let mut mapped = iter.map(|f| f.to_owned());
742        assert!(mapped.next().is_some());
743        assert!(mapped.next().is_some());
744        assert!(mapped.next().is_some());
745        assert_none!(mapped.next());
746        assert_none!(mapped.next());
747    }
748
749    #[mz_ore::test]
750    fn test_projected_row_iterator() {
751        let a = Row::pack_slice(&[Datum::String("hello world"), Datum::Int16(42)]);
752        let col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(2).unwrap())], &[]);
753        let col = col.sorted_view(&[]);
754
755        // Project away the first column.
756        let mut iter = col.into_row_iter().with_projection(vec![1]);
757
758        let projected_a = Row::pack_slice(&[Datum::Int16(42)]);
759        assert_eq!(iter.next(), Some(projected_a.as_ref()));
760        assert_eq!(iter.next(), Some(projected_a.as_ref()));
761        assert_eq!(iter.next(), None);
762        assert_eq!(iter.next(), None);
763
764        let col = iter.into_inner();
765
766        // Project away all columns.
767        let mut iter = col.into_row_iter().with_projection(vec![]);
768
769        let projected_a = Row::default();
770        assert_eq!(iter.next(), Some(projected_a.as_ref()));
771        assert_eq!(iter.next(), Some(projected_a.as_ref()));
772        assert_eq!(iter.next(), None);
773        assert_eq!(iter.next(), None);
774
775        let col = iter.into_inner();
776
777        // Include all columns.
778        let mut iter = col.into_row_iter().with_projection(vec![0, 1]);
779
780        assert_eq!(iter.next(), Some(a.as_ref()));
781        assert_eq!(iter.next(), Some(a.as_ref()));
782        assert_eq!(iter.next(), None);
783        assert_eq!(iter.next(), None);
784
785        let col = iter.into_inner();
786
787        // Swap the order of columns.
788        let mut iter = col.into_row_iter().with_projection(vec![1, 0]);
789
790        let projected_a = Row::pack_slice(&[Datum::Int16(42), Datum::String("hello world")]);
791        assert_eq!(iter.next(), Some(projected_a.as_ref()));
792        assert_eq!(iter.next(), Some(projected_a.as_ref()));
793        assert_eq!(iter.next(), None);
794        assert_eq!(iter.next(), None);
795    }
796
797    #[mz_ore::test]
798    fn test_count_respects_limit_and_offset() {
799        let a = Row::pack_slice(&[Datum::String("hello world")]);
800        let b = Row::pack_slice(&[Datum::UInt32(42)]);
801        let col = RowCollection::new(
802            vec![
803                (a.clone(), NonZeroUsize::new(3).unwrap()),
804                (b.clone(), NonZeroUsize::new(2).unwrap()),
805            ],
806            &[],
807        );
808        let col = col.sorted_view(&[]);
809
810        // How many total rows there are.
811        let iter = col.into_row_iter();
812        assert_eq!(iter.count(), 5);
813
814        let col = iter.into_inner();
815
816        // With a LIMIT.
817        let iter = col.into_row_iter().with_limit(1);
818        assert_eq!(iter.count(), 1);
819
820        let col = iter.into_inner();
821
822        // With a LIMIT larger than the total number of rows.
823        let iter = col.into_row_iter().with_limit(100);
824        assert_eq!(iter.count(), 5);
825
826        let col = iter.into_inner();
827
828        // With an OFFSET.
829        let iter = col.into_row_iter().apply_offset(3);
830        assert_eq!(iter.count(), 2);
831
832        let col = iter.into_inner();
833
834        // With an OFFSET greater than the total number of rows.
835        let iter = col.into_row_iter().apply_offset(100);
836        assert_eq!(iter.count(), 0);
837
838        let col = iter.into_inner();
839
840        // With a LIMIT and an OFFSET.
841        let iter = col.into_row_iter().with_limit(2).apply_offset(4);
842        assert_eq!(iter.count(), 1);
843    }
844
845    #[mz_ore::test]
846    #[cfg_attr(miri, ignore)] // too slow
847    fn proptest_row_collection() {
848        fn row_collection_roundtrips(rows: Vec<Row>) {
849            let collection = RowCollection::from(&rows);
850
851            for i in 0..rows.len() {
852                let (a, _) = collection.get(i).unwrap();
853                let b = rows.get(i).unwrap().borrow();
854
855                assert_eq!(a, b);
856            }
857        }
858
859        // This test is slow, so we limit the default number of test cases.
860        proptest!(
861            Config { cases: 10, ..Default::default() },
862            |(rows in any::<Vec<Row>>())| {
863                // The proptest! macro interferes with rustfmt.
864                row_collection_roundtrips(rows)
865            }
866        );
867    }
868
869    #[mz_ore::test]
870    #[cfg_attr(miri, ignore)] // too slow
871    fn proptest_merge() {
872        fn row_collection_merge(a: Vec<Row>, b: Vec<Row>) {
873            let mut a_col = RowCollection::from(&a);
874            let b_col = RowCollection::from(&b);
875
876            a_col.merge(&b_col);
877
878            let all_rows = a.iter().chain(b.iter());
879            for (idx, row) in all_rows.enumerate() {
880                let (col_row, _) = a_col.get(idx).unwrap();
881                assert_eq!(col_row, row.borrow());
882            }
883        }
884
885        // This test is slow, so we limit the default number of test cases.
886        proptest!(
887            Config { cases: 5, ..Default::default() },
888            |(a in any::<Vec<Row>>(), b in any::<Vec<Row>>())| {
889                // The proptest! macro interferes with rustfmt.
890                row_collection_merge(a, b)
891            }
892        );
893    }
894
895    #[mz_ore::test]
896    #[cfg_attr(miri, ignore)] // too slow
897    fn proptest_sort() {
898        fn row_collection_sort(mut a: Vec<Row>, mut b: Vec<Row>) {
899            a.sort_by(|a, b| a.cmp(b));
900            b.sort_by(|a, b| a.cmp(b));
901            let mut col = RowCollection::from(&a);
902            col.merge(&RowCollection::from(&b));
903
904            let sorted_view = col.sorted_view(&[]);
905
906            a.append(&mut b);
907            a.sort_by(|a, b| a.cmp(b));
908
909            for i in 0..a.len() {
910                let (row_x, _) = sorted_view.get(i).unwrap();
911                let row_y = a.get(i).unwrap();
912
913                assert_eq!(row_x, row_y.borrow());
914            }
915        }
916
917        // This test is slow, so we limit the default number of test cases.
918        proptest!(
919            Config { cases: 10, ..Default::default() },
920            |(a in any::<Vec<Row>>(), b in any::<Vec<Row>>())| {
921                // The proptest! macro interferes with rustfmt.
922                row_collection_sort(a, b)
923            }
924        );
925    }
926}