1use std::cell::RefCell;
13use std::cmp::Reverse;
14use std::collections::BinaryHeap;
15use std::num::NonZeroUsize;
16use std::sync::Arc;
17
18use bytes::Bytes;
19use mz_ore::cast::CastFrom;
20use mz_proto::RustType;
21use mz_repr::{DatumVec, IntoRowIterator, Row, RowIterator, RowRef};
22use serde::{Deserialize, Serialize};
23
24use crate::ColumnOrder;
25
26include!(concat!(env!("OUT_DIR"), "/mz_expr.row.collection.rs"));
27
28#[derive(Default, Debug, Clone, PartialEq)]
33pub struct RowCollection {
34 encoded: Bytes,
36 metadata: Arc<[EncodedRowMetadata]>,
38 runs: Vec<usize>,
40}
41
42impl RowCollection {
43 pub fn new(mut rows: Vec<(Row, NonZeroUsize)>, order_by: &[ColumnOrder]) -> Self {
49 if order_by.is_empty() {
51 rows.sort();
53 } else {
54 let (mut datum_vec1, mut datum_vec2) = (DatumVec::new(), DatumVec::new());
55 rows.sort_by(|(row1, _diff1), (row2, _diff2)| {
56 let borrow1 = datum_vec1.borrow_with(row1);
57 let borrow2 = datum_vec2.borrow_with(row2);
58 crate::compare_columns(order_by, &borrow1, &borrow2, || row1.cmp(row2))
59 });
60 }
61
62 let encoded_size = rows.iter().map(|(row, _diff)| row.data_len()).sum();
68
69 let mut encoded = Vec::<u8>::with_capacity(encoded_size);
70 let mut metadata = Vec::<EncodedRowMetadata>::with_capacity(rows.len());
71 let runs = (!rows.is_empty())
72 .then(|| vec![rows.len()])
73 .unwrap_or_default();
74
75 for (row, diff) in rows {
76 encoded.extend(row.data());
77 metadata.push(EncodedRowMetadata {
78 offset: encoded.len(),
79 diff,
80 });
81 }
82
83 RowCollection {
84 encoded: Bytes::from(encoded),
85 metadata: metadata.into(),
86 runs,
87 }
88 }
89
90 pub fn merge(&mut self, other: &RowCollection) {
92 if other.count(0, None) == 0 {
93 return;
94 }
95
96 let mut new_bytes = vec![0; self.encoded.len() + other.encoded.len()];
98 new_bytes[..self.encoded.len()].copy_from_slice(&self.encoded[..]);
99 new_bytes[self.encoded.len()..].copy_from_slice(&other.encoded[..]);
100
101 let mapped_metas = other.metadata.iter().map(|meta| EncodedRowMetadata {
102 offset: meta.offset + self.encoded.len(),
103 diff: meta.diff,
104 });
105 let self_len = self.metadata.len();
106
107 self.metadata = self.metadata.iter().cloned().chain(mapped_metas).collect();
108 self.encoded = Bytes::from(new_bytes);
109 self.runs.extend(other.runs.iter().map(|f| f + self_len));
110 }
111
112 pub fn count(&self, offset: usize, limit: Option<usize>) -> usize {
115 let mut total: usize = self.metadata.iter().map(|meta| meta.diff.get()).sum();
116
117 total = total.saturating_sub(offset);
119
120 if let Some(limit) = limit {
122 total = std::cmp::min(limit, total);
123 }
124
125 total
126 }
127
128 pub fn entries(&self) -> usize {
130 self.metadata.len()
131 }
132
133 pub fn byte_len(&self) -> usize {
135 let row_data_size = self.encoded.len();
136 let metadata_size = self
137 .metadata
138 .len()
139 .saturating_mul(std::mem::size_of::<EncodedRowMetadata>());
140
141 row_data_size.saturating_add(metadata_size)
142 }
143
144 pub fn get(&self, idx: usize) -> Option<(&RowRef, &EncodedRowMetadata)> {
146 let (lower_offset, upper) = match idx {
147 0 => (0, self.metadata.get(idx)?),
148 _ => {
149 let lower = self.metadata.get(idx - 1).map(|m| m.offset)?;
150 let upper = self.metadata.get(idx)?;
151 (lower, upper)
152 }
153 };
154
155 let slice = &self.encoded[lower_offset..upper.offset];
156 let row = RowRef::from_slice(slice);
157
158 Some((row, upper))
159 }
160
161 pub fn sorted_view(self, order_by: &[ColumnOrder]) -> SortedRowCollection {
164 if order_by.is_empty() {
165 self.sorted_view_inner(&Ord::cmp)
166 } else {
167 let left_datum_vec = RefCell::new(mz_repr::DatumVec::new());
168 let right_datum_vec = RefCell::new(mz_repr::DatumVec::new());
169
170 let cmp = &|left: &RowRef, right: &RowRef| {
171 let (mut left_datum_vec, mut right_datum_vec) =
172 (left_datum_vec.borrow_mut(), right_datum_vec.borrow_mut());
173 let left_datums = left_datum_vec.borrow_with(left);
174 let right_datums = right_datum_vec.borrow_with(right);
175 crate::compare_columns(order_by, &left_datums, &right_datums, || left.cmp(right))
176 };
177 self.sorted_view_inner(cmp)
178 }
179 }
180
181 fn sorted_view_inner<F>(self, cmp: &F) -> SortedRowCollection
182 where
183 F: Fn(&RowRef, &RowRef) -> std::cmp::Ordering,
184 {
185 let mut heap = BinaryHeap::with_capacity(self.runs.len());
186
187 for index in 0..self.runs.len() {
188 let start = (index > 0).then(|| self.runs[index - 1]).unwrap_or(0);
189 let end = self.runs[index];
190
191 heap.push(Reverse(RunIter {
192 collection: &self,
193 cmp,
194 range: start..end,
195 }));
196 }
197
198 let mut view = Vec::with_capacity(self.metadata.len());
199
200 while let Some(Reverse(mut run)) = heap.pop() {
201 if let Some(next) = run.range.next() {
202 view.push(next);
203 if !run.range.is_empty() {
204 heap.push(Reverse(run));
205 }
206 }
207 }
208
209 SortedRowCollection {
210 collection: self,
211 sorted_view: view.into(),
212 }
213 }
214}
215
216impl RustType<ProtoRowCollection> for RowCollection {
217 fn into_proto(&self) -> ProtoRowCollection {
218 ProtoRowCollection {
219 encoded: Bytes::clone(&self.encoded),
220 metadata: self
221 .metadata
222 .iter()
223 .map(EncodedRowMetadata::into_proto)
224 .collect(),
225 runs: self.runs.iter().copied().map(u64::cast_from).collect(),
226 }
227 }
228
229 fn from_proto(proto: ProtoRowCollection) -> Result<Self, mz_proto::TryFromProtoError> {
230 Ok(RowCollection {
231 encoded: proto.encoded,
232 metadata: proto
233 .metadata
234 .into_iter()
235 .map(EncodedRowMetadata::from_proto)
236 .collect::<Result<_, _>>()?,
237 runs: proto.runs.into_iter().map(usize::cast_from).collect(),
238 })
239 }
240}
241
242#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
244pub struct EncodedRowMetadata {
245 offset: usize,
249 diff: NonZeroUsize,
255}
256
257impl RustType<ProtoEncodedRowMetadata> for EncodedRowMetadata {
258 fn into_proto(&self) -> ProtoEncodedRowMetadata {
259 ProtoEncodedRowMetadata {
260 offset: u64::cast_from(self.offset),
261 diff: self.diff.into_proto(),
262 }
263 }
264
265 fn from_proto(proto: ProtoEncodedRowMetadata) -> Result<Self, mz_proto::TryFromProtoError> {
266 Ok(EncodedRowMetadata {
267 offset: usize::cast_from(proto.offset),
268 diff: NonZeroUsize::from_proto(proto.diff)?,
269 })
270 }
271}
272
273#[derive(Debug, Clone)]
275pub struct SortedRowCollection {
276 collection: RowCollection,
278 sorted_view: Arc<[usize]>,
280}
281
282impl SortedRowCollection {
283 pub fn get(&self, idx: usize) -> Option<(&RowRef, &EncodedRowMetadata)> {
284 self.sorted_view
285 .get(idx)
286 .and_then(|inner_idx| self.collection.get(*inner_idx))
287 }
288}
289
290#[derive(Debug, Clone)]
291pub struct SortedRowCollectionIter {
292 collection: SortedRowCollection,
294
295 row_idx: usize,
297 diff_idx: usize,
299
300 limit: Option<usize>,
302 offset: usize,
307
308 projection: Option<Vec<usize>>,
310 projection_buf: (DatumVec, Row),
312}
313
314impl SortedRowCollectionIter {
315 pub fn into_inner(self) -> SortedRowCollection {
317 self.collection
318 }
319
320 pub fn apply_offset(mut self, offset: usize) -> SortedRowCollectionIter {
322 Self::advance_by(
323 &self.collection,
324 &mut self.row_idx,
325 &mut self.diff_idx,
326 offset,
327 );
328
329 self.offset = self.offset.saturating_add(offset);
331
332 self
333 }
334
335 pub fn with_limit(mut self, limit: usize) -> SortedRowCollectionIter {
337 self.limit = Some(limit);
338 self
339 }
340
341 pub fn with_projection(mut self, projection: Vec<usize>) -> SortedRowCollectionIter {
343 if let Some((row, _)) = self.collection.get(0) {
345 let cols = row.into_iter().enumerate().map(|(idx, _datum)| idx);
346 if projection.iter().copied().eq(cols) {
347 return self;
348 }
349 }
350
351 self.projection = Some(projection);
352 self
353 }
354
355 fn advance_by(
359 collection: &SortedRowCollection,
360 row_idx: &mut usize,
361 diff_idx: &mut usize,
362 mut count: usize,
363 ) {
364 while count > 0 {
365 let Some((_, row_meta)) = collection.get(*row_idx) else {
366 return;
367 };
368
369 let remaining_diff = row_meta.diff.get() - *diff_idx;
370 if remaining_diff <= count {
371 *diff_idx = 0;
372 *row_idx += 1;
373 count -= remaining_diff;
374 } else {
375 *diff_idx += count;
376 count = 0;
377 }
378 }
379 }
380
381 fn project<'a>(
385 projection: Option<&[usize]>,
386 row: &'a RowRef,
387 datum_buf: &'a mut DatumVec,
388 row_buf: &'a mut Row,
389 ) -> &'a RowRef {
390 if let Some(projection) = projection {
391 {
393 let datums = datum_buf.borrow_with(row);
394 row_buf
395 .packer()
396 .extend(projection.iter().map(|i| &datums[*i]));
397 }
398
399 row_buf
400 } else {
401 row
402 }
403 }
404}
405
406impl RowIterator for SortedRowCollectionIter {
407 fn next(&mut self) -> Option<&RowRef> {
408 if let Some(0) = self.limit {
410 return None;
411 }
412
413 let row = self.collection.get(self.row_idx).map(|(r, _)| r)?;
414
415 if let Some(limit) = &mut self.limit {
417 *limit = limit.saturating_sub(1);
418 }
419
420 Self::advance_by(&self.collection, &mut self.row_idx, &mut self.diff_idx, 1);
422
423 let (datum_buf, row_buf) = &mut self.projection_buf;
425 Some(Self::project(
426 self.projection.as_deref(),
427 row,
428 datum_buf,
429 row_buf,
430 ))
431 }
432
433 fn peek(&mut self) -> Option<&RowRef> {
434 if let Some(0) = self.limit {
436 return None;
437 }
438
439 let row = self.collection.get(self.row_idx).map(|(r, _)| r)?;
440
441 let (datum_buf, row_buf) = &mut self.projection_buf;
446 Some(Self::project(
447 self.projection.as_deref(),
448 row,
449 datum_buf,
450 row_buf,
451 ))
452 }
453
454 fn count(&self) -> usize {
455 self.collection.collection.count(self.offset, self.limit)
456 }
457
458 fn box_clone(&self) -> Box<dyn RowIterator> {
459 Box::new(self.clone())
460 }
461}
462
463impl IntoRowIterator for SortedRowCollection {
464 type Iter = SortedRowCollectionIter;
465
466 fn into_row_iter(self) -> Self::Iter {
467 SortedRowCollectionIter {
468 collection: self,
469 row_idx: 0,
470 diff_idx: 0,
471 limit: None,
472 offset: 0,
473 projection: None,
474 projection_buf: (DatumVec::new(), Row::default()),
476 }
477 }
478}
479
480struct RunIter<'a, F> {
482 collection: &'a RowCollection,
483 cmp: &'a F,
484 range: std::ops::Range<usize>,
485}
486
487impl<'a, F> PartialOrd for RunIter<'a, F>
488where
489 F: Fn(&RowRef, &RowRef) -> std::cmp::Ordering,
490{
491 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
492 Some(self.cmp(other))
493 }
494}
495
496impl<'a, F> Ord for RunIter<'a, F>
497where
498 F: Fn(&RowRef, &RowRef) -> std::cmp::Ordering,
499{
500 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
501 let left = self.collection.get(self.range.start).unwrap().0;
502 let right = self.collection.get(other.range.start).unwrap().0;
503 (self.cmp)(left, right)
504 }
505}
506
507impl<'a, F> PartialEq for RunIter<'a, F>
508where
509 F: Fn(&RowRef, &RowRef) -> std::cmp::Ordering,
510{
511 fn eq(&self, other: &Self) -> bool {
512 self.cmp(other) == std::cmp::Ordering::Equal
513 }
514}
515
516impl<'a, F> Eq for RunIter<'a, F> where F: Fn(&RowRef, &RowRef) -> std::cmp::Ordering {}
517
518#[cfg(test)]
519mod tests {
520 use std::borrow::Borrow;
521
522 use mz_ore::assert_none;
523 use mz_repr::Datum;
524 use proptest::prelude::*;
525 use proptest::test_runner::Config;
526
527 use super::*;
528
529 impl<'a, T: IntoIterator<Item = &'a Row>> From<T> for RowCollection {
530 fn from(rows: T) -> Self {
531 let mut encoded = Vec::<u8>::new();
532 let mut metadata = Vec::<EncodedRowMetadata>::new();
533
534 for row in rows {
535 encoded.extend(row.data());
536 metadata.push(EncodedRowMetadata {
537 offset: encoded.len(),
538 diff: NonZeroUsize::MIN,
539 });
540 }
541 let runs = vec![metadata.len()];
542
543 RowCollection {
544 encoded: Bytes::from(encoded),
545 metadata: metadata.into(),
546 runs,
547 }
548 }
549 }
550
551 #[mz_ore::test]
552 fn test_row_collection() {
553 let a = Row::pack_slice(&[Datum::False, Datum::String("hello world"), Datum::Int16(42)]);
554 let b = Row::pack_slice(&[Datum::MzTimestamp(mz_repr::Timestamp::new(10))]);
555
556 let collection = RowCollection::from([&a, &b]);
557
558 let (a_rnd, _) = collection.get(0).unwrap();
559 assert_eq!(a_rnd, a.borrow());
560
561 let (b_rnd, _) = collection.get(1).unwrap();
562 assert_eq!(b_rnd, b.borrow());
563 }
564
565 #[mz_ore::test]
566 fn test_merge() {
567 let a = Row::pack_slice(&[Datum::False, Datum::String("hello world"), Datum::Int16(42)]);
568 let b = Row::pack_slice(&[Datum::MzTimestamp(mz_repr::Timestamp::new(10))]);
569
570 let mut a_col = RowCollection::from([&a]);
571 let b_col = RowCollection::from([&b]);
572
573 a_col.merge(&b_col);
574
575 assert_eq!(a_col.count(0, None), 2);
576 assert_eq!(a_col.get(0).map(|(r, _)| r), Some(a.borrow()));
577 assert_eq!(a_col.get(1).map(|(r, _)| r), Some(b.borrow()));
578 }
579
580 #[mz_ore::test]
581 fn test_sort() {
582 let a = Row::pack_slice(&[Datum::False, Datum::String("hello world"), Datum::Int16(42)]);
583 let b = Row::pack_slice(&[Datum::MzTimestamp(mz_repr::Timestamp::new(10))]);
584 let c = Row::pack_slice(&[Datum::True, Datum::String("hello world"), Datum::Int16(42)]);
585 let d = Row::pack_slice(&[Datum::MzTimestamp(mz_repr::Timestamp::new(9))]);
586
587 let col = {
588 let mut part = [&a, &b];
589 part.sort_by(|a, b| a.cmp(b));
590 let mut part1 = RowCollection::from(part);
591 let mut part = [&c, &d];
592 part.sort_by(|a, b| a.cmp(b));
593 let part2 = RowCollection::from(part);
594 part1.merge(&part2);
595 part1
596 };
597 let mut rows = [a, b, c, d];
598
599 let sorted_view = col.sorted_view(&[]);
600 rows.sort_by(|a, b| a.cmp(b));
601
602 for i in 0..rows.len() {
603 let (row_x, _) = sorted_view.get(i).unwrap();
604 let row_y = rows.get(i).unwrap();
605
606 assert_eq!(row_x, row_y.borrow());
607 }
608 }
609
610 #[mz_ore::test]
611 fn test_sorted_iter() {
612 let a = Row::pack_slice(&[Datum::String("hello world")]);
613 let b = Row::pack_slice(&[Datum::UInt32(42)]);
614 let mut col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(3).unwrap())], &[]);
615 col.merge(&RowCollection::new(
616 vec![(b.clone(), NonZeroUsize::new(2).unwrap())],
617 &[],
618 ));
619 let col = col.sorted_view(&[]);
620 let mut iter = col.into_row_iter();
621
622 assert_eq!(iter.peek(), Some(b.borrow()));
624
625 assert_eq!(iter.next(), Some(b.borrow()));
626 assert_eq!(iter.next(), Some(b.borrow()));
627 assert_eq!(iter.next(), Some(a.borrow()));
628 assert_eq!(iter.next(), Some(a.borrow()));
629 assert_eq!(iter.next(), Some(a.borrow()));
630 assert_eq!(iter.next(), None);
631
632 assert_eq!(iter.next(), None);
634 assert_eq!(iter.peek(), None);
635 }
636
637 #[mz_ore::test]
638 fn test_sorted_iter_offset() {
639 let a = Row::pack_slice(&[Datum::String("hello world")]);
640 let b = Row::pack_slice(&[Datum::UInt32(42)]);
641 let mut col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(3).unwrap())], &[]);
642 col.merge(&RowCollection::new(
643 vec![(b.clone(), NonZeroUsize::new(2).unwrap())],
644 &[],
645 ));
646 let col = col.sorted_view(&[]);
647
648 let mut iter = col.into_row_iter().apply_offset(1);
650 assert_eq!(iter.next(), Some(b.borrow()));
651 assert_eq!(iter.next(), Some(a.borrow()));
652 assert_eq!(iter.next(), Some(a.borrow()));
653 assert_eq!(iter.next(), Some(a.borrow()));
654 assert_eq!(iter.next(), None);
655 assert_eq!(iter.next(), None);
656
657 let col = iter.into_inner();
658
659 let mut iter = col.into_row_iter().apply_offset(3);
661
662 assert_eq!(iter.peek(), Some(a.borrow()));
663
664 assert_eq!(iter.next(), Some(a.borrow()));
665 assert_eq!(iter.next(), Some(a.borrow()));
666 assert_eq!(iter.next(), None);
667 assert_eq!(iter.next(), None);
668
669 let col = iter.into_inner();
670
671 let mut iter = col.into_row_iter().apply_offset(100);
673 assert_eq!(iter.peek(), None);
674 assert_eq!(iter.next(), None);
675 assert_eq!(iter.peek(), None);
676 assert_eq!(iter.next(), None);
677 }
678
679 #[mz_ore::test]
680 fn test_sorted_iter_limit() {
681 let a = Row::pack_slice(&[Datum::String("hello world")]);
682 let b = Row::pack_slice(&[Datum::UInt32(42)]);
683 let mut col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(3).unwrap())], &[]);
684 col.merge(&RowCollection::new(
685 vec![(b.clone(), NonZeroUsize::new(2).unwrap())],
686 &[],
687 ));
688 let col = col.sorted_view(&[]);
689
690 let mut iter = col.into_row_iter().with_limit(1);
692 assert_eq!(iter.next(), Some(b.borrow()));
693 assert_eq!(iter.next(), None);
694 assert_eq!(iter.next(), None);
695
696 let col = iter.into_inner();
697
698 let mut iter = col.into_row_iter().with_limit(4);
700 assert_eq!(iter.peek(), Some(b.borrow()));
701 assert_eq!(iter.next(), Some(b.borrow()));
702 assert_eq!(iter.next(), Some(b.borrow()));
703
704 assert_eq!(iter.peek(), Some(a.borrow()));
705 assert_eq!(iter.next(), Some(a.borrow()));
706 assert_eq!(iter.next(), Some(a.borrow()));
707
708 assert_eq!(iter.next(), None);
709 assert_eq!(iter.next(), None);
710
711 let col = iter.into_inner();
712
713 let mut iter = col.into_row_iter().with_limit(1000);
715 assert_eq!(iter.next(), Some(b.borrow()));
716 assert_eq!(iter.next(), Some(b.borrow()));
717 assert_eq!(iter.next(), Some(a.borrow()));
718 assert_eq!(iter.next(), Some(a.borrow()));
719 assert_eq!(iter.next(), Some(a.borrow()));
720 assert_eq!(iter.next(), None);
721 assert_eq!(iter.next(), None);
722
723 let col = iter.into_inner();
724
725 let mut iter = col.into_row_iter().with_limit(0);
727 assert_eq!(iter.peek(), None);
728 assert_eq!(iter.next(), None);
729 assert_eq!(iter.next(), None);
730 }
731
732 #[mz_ore::test]
733 fn test_mapped_row_iterator() {
734 let a = Row::pack_slice(&[Datum::String("hello world")]);
735 let col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(3).unwrap())], &[]);
736 let col = col.sorted_view(&[]);
737
738 let iter: Box<dyn RowIterator> = Box::new(col.into_row_iter());
740
741 let mut mapped = iter.map(|f| f.to_owned());
742 assert!(mapped.next().is_some());
743 assert!(mapped.next().is_some());
744 assert!(mapped.next().is_some());
745 assert_none!(mapped.next());
746 assert_none!(mapped.next());
747 }
748
749 #[mz_ore::test]
750 fn test_projected_row_iterator() {
751 let a = Row::pack_slice(&[Datum::String("hello world"), Datum::Int16(42)]);
752 let col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(2).unwrap())], &[]);
753 let col = col.sorted_view(&[]);
754
755 let mut iter = col.into_row_iter().with_projection(vec![1]);
757
758 let projected_a = Row::pack_slice(&[Datum::Int16(42)]);
759 assert_eq!(iter.next(), Some(projected_a.as_ref()));
760 assert_eq!(iter.next(), Some(projected_a.as_ref()));
761 assert_eq!(iter.next(), None);
762 assert_eq!(iter.next(), None);
763
764 let col = iter.into_inner();
765
766 let mut iter = col.into_row_iter().with_projection(vec![]);
768
769 let projected_a = Row::default();
770 assert_eq!(iter.next(), Some(projected_a.as_ref()));
771 assert_eq!(iter.next(), Some(projected_a.as_ref()));
772 assert_eq!(iter.next(), None);
773 assert_eq!(iter.next(), None);
774
775 let col = iter.into_inner();
776
777 let mut iter = col.into_row_iter().with_projection(vec![0, 1]);
779
780 assert_eq!(iter.next(), Some(a.as_ref()));
781 assert_eq!(iter.next(), Some(a.as_ref()));
782 assert_eq!(iter.next(), None);
783 assert_eq!(iter.next(), None);
784
785 let col = iter.into_inner();
786
787 let mut iter = col.into_row_iter().with_projection(vec![1, 0]);
789
790 let projected_a = Row::pack_slice(&[Datum::Int16(42), Datum::String("hello world")]);
791 assert_eq!(iter.next(), Some(projected_a.as_ref()));
792 assert_eq!(iter.next(), Some(projected_a.as_ref()));
793 assert_eq!(iter.next(), None);
794 assert_eq!(iter.next(), None);
795 }
796
797 #[mz_ore::test]
798 fn test_count_respects_limit_and_offset() {
799 let a = Row::pack_slice(&[Datum::String("hello world")]);
800 let b = Row::pack_slice(&[Datum::UInt32(42)]);
801 let col = RowCollection::new(
802 vec![
803 (a.clone(), NonZeroUsize::new(3).unwrap()),
804 (b.clone(), NonZeroUsize::new(2).unwrap()),
805 ],
806 &[],
807 );
808 let col = col.sorted_view(&[]);
809
810 let iter = col.into_row_iter();
812 assert_eq!(iter.count(), 5);
813
814 let col = iter.into_inner();
815
816 let iter = col.into_row_iter().with_limit(1);
818 assert_eq!(iter.count(), 1);
819
820 let col = iter.into_inner();
821
822 let iter = col.into_row_iter().with_limit(100);
824 assert_eq!(iter.count(), 5);
825
826 let col = iter.into_inner();
827
828 let iter = col.into_row_iter().apply_offset(3);
830 assert_eq!(iter.count(), 2);
831
832 let col = iter.into_inner();
833
834 let iter = col.into_row_iter().apply_offset(100);
836 assert_eq!(iter.count(), 0);
837
838 let col = iter.into_inner();
839
840 let iter = col.into_row_iter().with_limit(2).apply_offset(4);
842 assert_eq!(iter.count(), 1);
843 }
844
845 #[mz_ore::test]
846 #[cfg_attr(miri, ignore)] fn proptest_row_collection() {
848 fn row_collection_roundtrips(rows: Vec<Row>) {
849 let collection = RowCollection::from(&rows);
850
851 for i in 0..rows.len() {
852 let (a, _) = collection.get(i).unwrap();
853 let b = rows.get(i).unwrap().borrow();
854
855 assert_eq!(a, b);
856 }
857 }
858
859 proptest!(
861 Config { cases: 10, ..Default::default() },
862 |(rows in any::<Vec<Row>>())| {
863 row_collection_roundtrips(rows)
865 }
866 );
867 }
868
869 #[mz_ore::test]
870 #[cfg_attr(miri, ignore)] fn proptest_merge() {
872 fn row_collection_merge(a: Vec<Row>, b: Vec<Row>) {
873 let mut a_col = RowCollection::from(&a);
874 let b_col = RowCollection::from(&b);
875
876 a_col.merge(&b_col);
877
878 let all_rows = a.iter().chain(b.iter());
879 for (idx, row) in all_rows.enumerate() {
880 let (col_row, _) = a_col.get(idx).unwrap();
881 assert_eq!(col_row, row.borrow());
882 }
883 }
884
885 proptest!(
887 Config { cases: 5, ..Default::default() },
888 |(a in any::<Vec<Row>>(), b in any::<Vec<Row>>())| {
889 row_collection_merge(a, b)
891 }
892 );
893 }
894
895 #[mz_ore::test]
896 #[cfg_attr(miri, ignore)] fn proptest_sort() {
898 fn row_collection_sort(mut a: Vec<Row>, mut b: Vec<Row>) {
899 a.sort_by(|a, b| a.cmp(b));
900 b.sort_by(|a, b| a.cmp(b));
901 let mut col = RowCollection::from(&a);
902 col.merge(&RowCollection::from(&b));
903
904 let sorted_view = col.sorted_view(&[]);
905
906 a.append(&mut b);
907 a.sort_by(|a, b| a.cmp(b));
908
909 for i in 0..a.len() {
910 let (row_x, _) = sorted_view.get(i).unwrap();
911 let row_y = a.get(i).unwrap();
912
913 assert_eq!(row_x, row_y.borrow());
914 }
915 }
916
917 proptest!(
919 Config { cases: 10, ..Default::default() },
920 |(a in any::<Vec<Row>>(), b in any::<Vec<Row>>())| {
921 row_collection_sort(a, b)
923 }
924 );
925 }
926}