1use std::num::NonZeroUsize;
13
14use itertools::Itertools;
15use mz_repr::{
16 DatumVec, IntoRowIterator, Row, RowIterator, RowRef, Rows, RowsBuilder, SharedSlice,
17};
18use serde::{Deserialize, Serialize};
19
20use crate::{ColumnOrder, RowComparator};
21
22#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
27pub struct RowCollection {
28 rows: Rows,
30 diffs: SharedSlice<NonZeroUsize>,
32}
33
34#[derive(Debug)]
35pub struct RowCollectionBuilder {
36 rows: RowsBuilder,
37 diffs: Vec<NonZeroUsize>,
38}
39
40impl RowCollectionBuilder {
41 pub fn push(&mut self, row: &RowRef, diff: NonZeroUsize) {
42 self.rows.push(row);
43 self.diffs.push(diff);
44 }
45
46 pub fn build(self) -> RowCollection {
47 RowCollection {
48 rows: self.rows.build(),
49 diffs: self.diffs.into(),
50 }
51 }
52}
53
54impl RowCollection {
55 pub fn builder(byte_len_hint: usize, len_hint: usize) -> RowCollectionBuilder {
59 RowCollectionBuilder {
60 rows: Rows::builder(byte_len_hint, len_hint),
61 diffs: Vec::with_capacity(len_hint),
62 }
63 }
64
65 pub fn new(mut rows: Vec<(Row, NonZeroUsize)>, order_by: &[ColumnOrder]) -> Self {
71 let comparator = RowComparator::new(order_by);
72 rows.sort_by(|(row1, _diff1), (row2, _diff2)| {
74 comparator.compare_rows(row1, row2, || row1.cmp(row2))
75 });
76
77 let encoded_size = rows.iter().map(|(row, _diff)| row.data_len()).sum();
83
84 let mut builder = Self::builder(encoded_size, rows.len());
85 for (row, diff) in rows {
86 builder.push(row.as_row_ref(), diff);
87 }
88 builder.build()
89 }
90
91 fn iter(&self) -> impl Iterator<Item = (&RowRef, NonZeroUsize)> {
92 self.rows.iter().zip_eq(self.diffs.iter().copied())
93 }
94
95 pub fn concat(&mut self, other: &RowCollection) {
99 if other.count() == 0 {
100 return;
101 }
102
103 let byte_len = self.rows.byte_len() + other.rows.byte_len();
105 let len = self.rows.len() + other.rows.len();
106 let mut builder = Self::builder(byte_len, len);
107 for (row, diff) in self.iter().chain(other.iter()) {
108 builder.push(row, diff);
109 }
110 *self = builder.build();
111 }
112
113 pub fn offset_limit(mut total: usize, offset: usize, limit: Option<usize>) -> usize {
118 total = total.saturating_sub(offset);
120
121 if let Some(limit) = limit {
123 total = std::cmp::min(limit, total);
124 }
125
126 total
127 }
128
129 pub fn count(&self) -> usize {
131 self.diffs.iter().map(|u| u.get()).sum()
132 }
133
134 pub fn entries(&self) -> usize {
136 self.rows.len()
137 }
138
139 pub fn byte_len(&self) -> usize {
141 let row_data_size = self
143 .rows
144 .byte_len()
145 .saturating_add(self.rows.len().saturating_mul(size_of::<usize>()));
146 let diff_size = self.diffs.len().saturating_mul(size_of::<NonZeroUsize>());
147 row_data_size.saturating_add(diff_size)
148 }
149
150 pub fn get(&self, idx: usize) -> Option<(&RowRef, &NonZeroUsize)> {
152 Some((self.rows.get(idx)?, self.diffs.get(idx)?))
153 }
154
155 pub fn merge_sorted(runs: &[Self], order_by: &[ColumnOrder]) -> RowCollection {
160 let comparator = RowComparator::new(order_by);
161 Self::merge_sorted_inner(runs, |a, b| comparator.compare_rows(a, b, || a.cmp(b)))
162 }
163
164 fn merge_sorted_inner<F>(runs: &[Self], cmp: F) -> RowCollection
165 where
166 F: Fn(&RowRef, &RowRef) -> std::cmp::Ordering,
167 {
168 let mut metadata_len = 0;
169 let mut encoded_len = 0;
170 for collection in runs.iter() {
171 metadata_len += collection.rows.len();
172 encoded_len += collection.rows.byte_len();
173 }
174
175 let mut builder = Self::builder(encoded_len, metadata_len);
176
177 for (row, diff) in
178 mz_ore::iter::merge_iters_by(runs.iter().map(|r| r.iter()), |(r0, _), (r1, _)| {
179 cmp(r0, r1)
180 })
181 {
182 builder.push(row, diff);
183 }
184 builder.build()
185 }
186}
187
188#[derive(Debug, Clone)]
189pub struct RowCollectionIter {
190 collection: RowCollection,
192
193 row_idx: usize,
195 diff_idx: usize,
197
198 limit: Option<usize>,
200 offset: usize,
205
206 projection: Option<Vec<usize>>,
208 projection_buf: (DatumVec, Row),
210}
211
212impl RowCollectionIter {
213 pub fn into_inner(self) -> RowCollection {
215 self.collection
216 }
217
218 pub fn apply_offset(mut self, offset: usize) -> RowCollectionIter {
220 Self::advance_by(
221 &self.collection,
222 &mut self.row_idx,
223 &mut self.diff_idx,
224 offset,
225 );
226
227 self.offset = self.offset.saturating_add(offset);
229
230 self
231 }
232
233 pub fn with_limit(mut self, limit: usize) -> RowCollectionIter {
235 self.limit = Some(limit);
236 self
237 }
238
239 pub fn with_projection(mut self, projection: Vec<usize>) -> RowCollectionIter {
241 if let Some((row, _)) = self.collection.get(0) {
243 let cols = row.into_iter().enumerate().map(|(idx, _datum)| idx);
244 if projection.iter().copied().eq(cols) {
245 return self;
246 }
247 }
248
249 self.projection = Some(projection);
250 self
251 }
252
253 fn advance_by(
257 collection: &RowCollection,
258 row_idx: &mut usize,
259 diff_idx: &mut usize,
260 mut count: usize,
261 ) {
262 while count > 0 {
263 let Some((_, row_meta)) = collection.get(*row_idx) else {
264 return;
265 };
266
267 let remaining_diff = row_meta.get() - *diff_idx;
268 if remaining_diff <= count {
269 *diff_idx = 0;
270 *row_idx += 1;
271 count -= remaining_diff;
272 } else {
273 *diff_idx += count;
274 count = 0;
275 }
276 }
277 }
278
279 fn project<'a>(
283 projection: Option<&[usize]>,
284 row: &'a RowRef,
285 datum_buf: &'a mut DatumVec,
286 row_buf: &'a mut Row,
287 ) -> &'a RowRef {
288 if let Some(projection) = projection {
289 {
291 let datums = datum_buf.borrow_with(row);
292 row_buf
293 .packer()
294 .extend(projection.iter().map(|i| &datums[*i]));
295 }
296
297 row_buf
298 } else {
299 row
300 }
301 }
302}
303
304impl RowIterator for RowCollectionIter {
305 fn next(&mut self) -> Option<&RowRef> {
306 if let Some(0) = self.limit {
308 return None;
309 }
310
311 let row = self.collection.get(self.row_idx).map(|(r, _)| r)?;
312
313 if let Some(limit) = &mut self.limit {
315 *limit = limit.saturating_sub(1);
316 }
317
318 Self::advance_by(&self.collection, &mut self.row_idx, &mut self.diff_idx, 1);
320
321 let (datum_buf, row_buf) = &mut self.projection_buf;
323 Some(Self::project(
324 self.projection.as_deref(),
325 row,
326 datum_buf,
327 row_buf,
328 ))
329 }
330
331 fn peek(&mut self) -> Option<&RowRef> {
332 if let Some(0) = self.limit {
334 return None;
335 }
336
337 let row = self.collection.get(self.row_idx).map(|(r, _)| r)?;
338
339 let (datum_buf, row_buf) = &mut self.projection_buf;
344 Some(Self::project(
345 self.projection.as_deref(),
346 row,
347 datum_buf,
348 row_buf,
349 ))
350 }
351
352 fn count(&self) -> usize {
353 RowCollection::offset_limit(self.collection.count(), self.offset, self.limit)
354 }
355
356 fn box_clone(&self) -> Box<dyn RowIterator> {
357 Box::new(self.clone())
358 }
359}
360
361impl IntoRowIterator for RowCollection {
362 type Iter = RowCollectionIter;
363
364 fn into_row_iter(self) -> Self::Iter {
365 RowCollectionIter {
366 collection: self,
367 row_idx: 0,
368 diff_idx: 0,
369 limit: None,
370 offset: 0,
371 projection: None,
372 projection_buf: (DatumVec::new(), Row::default()),
374 }
375 }
376}
377
378#[cfg(test)]
379mod tests {
380 use std::borrow::Borrow;
381
382 use mz_ore::assert_none;
383 use mz_repr::Datum;
384 use proptest::prelude::*;
385 use proptest::test_runner::Config;
386
387 use super::*;
388
389 impl<'a, T: IntoIterator<Item = &'a Row>> From<T> for RowCollection {
390 fn from(rows: T) -> Self {
391 let mut encoded = Rows::builder(0, 0);
392 let mut diffs = vec![];
393
394 for row in rows {
395 encoded.push(row.as_row_ref());
396 diffs.push(NonZeroUsize::MIN);
397 }
398
399 RowCollection {
400 rows: encoded.build(),
401 diffs: diffs.into(),
402 }
403 }
404 }
405
406 #[mz_ore::test]
407 fn test_row_collection() {
408 let a = Row::pack_slice(&[Datum::False, Datum::String("hello world"), Datum::Int16(42)]);
409 let b = Row::pack_slice(&[Datum::MzTimestamp(mz_repr::Timestamp::new(10))]);
410
411 let collection = RowCollection::from([&a, &b]);
412
413 let (a_rnd, _) = collection.get(0).unwrap();
414 assert_eq!(a_rnd, a.borrow());
415
416 let (b_rnd, _) = collection.get(1).unwrap();
417 assert_eq!(b_rnd, b.borrow());
418 }
419
420 #[mz_ore::test]
421 fn test_merge() {
422 let a = Row::pack_slice(&[Datum::False, Datum::String("hello world"), Datum::Int16(42)]);
423 let b = Row::pack_slice(&[Datum::MzTimestamp(mz_repr::Timestamp::new(10))]);
424
425 let mut a_col = RowCollection::from([&a]);
426 let b_col = RowCollection::from([&b]);
427
428 a_col.concat(&b_col);
429
430 assert_eq!(a_col.count(), 2);
431 assert_eq!(a_col.get(0).map(|(r, _)| r), Some(a.borrow()));
432 assert_eq!(a_col.get(1).map(|(r, _)| r), Some(b.borrow()));
433 }
434
435 #[mz_ore::test]
436 fn test_sort() {
437 let a = Row::pack_slice(&[Datum::False, Datum::String("hello world"), Datum::Int16(42)]);
438 let b = Row::pack_slice(&[Datum::MzTimestamp(mz_repr::Timestamp::new(10))]);
439 let c = Row::pack_slice(&[Datum::True, Datum::String("hello world"), Datum::Int16(42)]);
440 let d = Row::pack_slice(&[Datum::MzTimestamp(mz_repr::Timestamp::new(9))]);
441
442 let cols = {
443 let mut part = [&a, &b];
444 part.sort_by(|a, b| a.cmp(b));
445 let part1 = RowCollection::from(part);
446 let mut part = [&c, &d];
447 part.sort_by(|a, b| a.cmp(b));
448 let part2 = RowCollection::from(part);
449 vec![part1, part2]
450 };
451 let mut rows = [a, b, c, d];
452
453 let sorted_view = RowCollection::merge_sorted(&cols, &[]);
454 rows.sort_by(|a, b| a.cmp(b));
455
456 for i in 0..rows.len() {
457 let (row_x, _) = sorted_view.get(i).unwrap();
458 let row_y = rows.get(i).unwrap();
459
460 assert_eq!(row_x, row_y.borrow());
461 }
462 }
463
464 #[mz_ore::test]
465 fn test_sorted_iter() {
466 let a = Row::pack_slice(&[Datum::String("hello world")]);
467 let b = Row::pack_slice(&[Datum::UInt32(42)]);
468 let col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(3).unwrap())], &[]);
469 let col = RowCollection::merge_sorted(
470 &[
471 col,
472 RowCollection::new(vec![(b.clone(), NonZeroUsize::new(2).unwrap())], &[]),
473 ],
474 &[],
475 );
476 let mut iter = col.into_row_iter();
477
478 assert_eq!(iter.peek(), Some(b.borrow()));
480
481 assert_eq!(iter.next(), Some(b.borrow()));
482 assert_eq!(iter.next(), Some(b.borrow()));
483 assert_eq!(iter.next(), Some(a.borrow()));
484 assert_eq!(iter.next(), Some(a.borrow()));
485 assert_eq!(iter.next(), Some(a.borrow()));
486 assert_eq!(iter.next(), None);
487
488 assert_eq!(iter.next(), None);
490 assert_eq!(iter.peek(), None);
491 }
492
493 #[mz_ore::test]
494 fn test_sorted_iter_offset() {
495 let a = Row::pack_slice(&[Datum::String("hello world")]);
496 let b = Row::pack_slice(&[Datum::UInt32(42)]);
497 let col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(3).unwrap())], &[]);
498 let col = RowCollection::merge_sorted(
499 &[
500 col,
501 RowCollection::new(vec![(b.clone(), NonZeroUsize::new(2).unwrap())], &[]),
502 ],
503 &[],
504 );
505
506 let mut iter = col.into_row_iter().apply_offset(1);
508 assert_eq!(iter.next(), Some(b.borrow()));
509 assert_eq!(iter.next(), Some(a.borrow()));
510 assert_eq!(iter.next(), Some(a.borrow()));
511 assert_eq!(iter.next(), Some(a.borrow()));
512 assert_eq!(iter.next(), None);
513 assert_eq!(iter.next(), None);
514
515 let col = iter.into_inner();
516
517 let mut iter = col.into_row_iter().apply_offset(3);
519
520 assert_eq!(iter.peek(), Some(a.borrow()));
521
522 assert_eq!(iter.next(), Some(a.borrow()));
523 assert_eq!(iter.next(), Some(a.borrow()));
524 assert_eq!(iter.next(), None);
525 assert_eq!(iter.next(), None);
526
527 let col = iter.into_inner();
528
529 let mut iter = col.into_row_iter().apply_offset(100);
531 assert_eq!(iter.peek(), None);
532 assert_eq!(iter.next(), None);
533 assert_eq!(iter.peek(), None);
534 assert_eq!(iter.next(), None);
535 }
536
537 #[mz_ore::test]
538 fn test_sorted_iter_limit() {
539 let a = Row::pack_slice(&[Datum::String("hello world")]);
540 let b = Row::pack_slice(&[Datum::UInt32(42)]);
541 let col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(3).unwrap())], &[]);
542 let col = RowCollection::merge_sorted(
543 &[
544 col,
545 RowCollection::new(vec![(b.clone(), NonZeroUsize::new(2).unwrap())], &[]),
546 ],
547 &[],
548 );
549
550 let mut iter = col.into_row_iter().with_limit(1);
552 assert_eq!(iter.next(), Some(b.borrow()));
553 assert_eq!(iter.next(), None);
554 assert_eq!(iter.next(), None);
555
556 let col = iter.into_inner();
557
558 let mut iter = col.into_row_iter().with_limit(4);
560 assert_eq!(iter.peek(), Some(b.borrow()));
561 assert_eq!(iter.next(), Some(b.borrow()));
562 assert_eq!(iter.next(), Some(b.borrow()));
563
564 assert_eq!(iter.peek(), Some(a.borrow()));
565 assert_eq!(iter.next(), Some(a.borrow()));
566 assert_eq!(iter.next(), Some(a.borrow()));
567
568 assert_eq!(iter.next(), None);
569 assert_eq!(iter.next(), None);
570
571 let col = iter.into_inner();
572
573 let mut iter = col.into_row_iter().with_limit(1000);
575 assert_eq!(iter.next(), Some(b.borrow()));
576 assert_eq!(iter.next(), Some(b.borrow()));
577 assert_eq!(iter.next(), Some(a.borrow()));
578 assert_eq!(iter.next(), Some(a.borrow()));
579 assert_eq!(iter.next(), Some(a.borrow()));
580 assert_eq!(iter.next(), None);
581 assert_eq!(iter.next(), None);
582
583 let col = iter.into_inner();
584
585 let mut iter = col.into_row_iter().with_limit(0);
587 assert_eq!(iter.peek(), None);
588 assert_eq!(iter.next(), None);
589 assert_eq!(iter.next(), None);
590 }
591
592 #[mz_ore::test]
593 fn test_mapped_row_iterator() {
594 let a = Row::pack_slice(&[Datum::String("hello world")]);
595 let col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(3).unwrap())], &[]);
596
597 let iter: Box<dyn RowIterator> = Box::new(col.into_row_iter());
599
600 let mut mapped = iter.map(|f| f.to_owned());
601 assert!(mapped.next().is_some());
602 assert!(mapped.next().is_some());
603 assert!(mapped.next().is_some());
604 assert_none!(mapped.next());
605 assert_none!(mapped.next());
606 }
607
608 #[mz_ore::test]
609 fn test_projected_row_iterator() {
610 let a = Row::pack_slice(&[Datum::String("hello world"), Datum::Int16(42)]);
611 let col = RowCollection::new(vec![(a.clone(), NonZeroUsize::new(2).unwrap())], &[]);
612
613 let mut iter = col.into_row_iter().with_projection(vec![1]);
615
616 let projected_a = Row::pack_slice(&[Datum::Int16(42)]);
617 assert_eq!(iter.next(), Some(projected_a.as_ref()));
618 assert_eq!(iter.next(), Some(projected_a.as_ref()));
619 assert_eq!(iter.next(), None);
620 assert_eq!(iter.next(), None);
621
622 let col = iter.into_inner();
623
624 let mut iter = col.into_row_iter().with_projection(vec![]);
626
627 let projected_a = Row::default();
628 assert_eq!(iter.next(), Some(projected_a.as_ref()));
629 assert_eq!(iter.next(), Some(projected_a.as_ref()));
630 assert_eq!(iter.next(), None);
631 assert_eq!(iter.next(), None);
632
633 let col = iter.into_inner();
634
635 let mut iter = col.into_row_iter().with_projection(vec![0, 1]);
637
638 assert_eq!(iter.next(), Some(a.as_ref()));
639 assert_eq!(iter.next(), Some(a.as_ref()));
640 assert_eq!(iter.next(), None);
641 assert_eq!(iter.next(), None);
642
643 let col = iter.into_inner();
644
645 let mut iter = col.into_row_iter().with_projection(vec![1, 0]);
647
648 let projected_a = Row::pack_slice(&[Datum::Int16(42), Datum::String("hello world")]);
649 assert_eq!(iter.next(), Some(projected_a.as_ref()));
650 assert_eq!(iter.next(), Some(projected_a.as_ref()));
651 assert_eq!(iter.next(), None);
652 assert_eq!(iter.next(), None);
653 }
654
655 #[mz_ore::test]
656 fn test_count_respects_limit_and_offset() {
657 let a = Row::pack_slice(&[Datum::String("hello world")]);
658 let b = Row::pack_slice(&[Datum::UInt32(42)]);
659 let col = RowCollection::new(
660 vec![
661 (a.clone(), NonZeroUsize::new(3).unwrap()),
662 (b.clone(), NonZeroUsize::new(2).unwrap()),
663 ],
664 &[],
665 );
666
667 let iter = col.into_row_iter();
669 assert_eq!(iter.count(), 5);
670
671 let col = iter.into_inner();
672
673 let iter = col.into_row_iter().with_limit(1);
675 assert_eq!(iter.count(), 1);
676
677 let col = iter.into_inner();
678
679 let iter = col.into_row_iter().with_limit(100);
681 assert_eq!(iter.count(), 5);
682
683 let col = iter.into_inner();
684
685 let iter = col.into_row_iter().apply_offset(3);
687 assert_eq!(iter.count(), 2);
688
689 let col = iter.into_inner();
690
691 let iter = col.into_row_iter().apply_offset(100);
693 assert_eq!(iter.count(), 0);
694
695 let col = iter.into_inner();
696
697 let iter = col.into_row_iter().with_limit(2).apply_offset(4);
699 assert_eq!(iter.count(), 1);
700 }
701
702 #[mz_ore::test]
703 #[cfg_attr(miri, ignore)] fn proptest_row_collection() {
705 fn row_collection_roundtrips(rows: Vec<Row>) {
706 let collection = RowCollection::from(&rows);
707
708 for i in 0..rows.len() {
709 let (a, _) = collection.get(i).unwrap();
710 let b = rows.get(i).unwrap().borrow();
711
712 assert_eq!(a, b);
713 }
714 }
715
716 proptest!(
718 Config { cases: 5, ..Default::default() },
719 |(rows in any::<Vec<Row>>())| {
720 row_collection_roundtrips(rows)
722 }
723 );
724 }
725
726 #[mz_ore::test]
727 #[cfg_attr(miri, ignore)] fn proptest_merge() {
729 fn row_collection_merge(a: Vec<Row>, b: Vec<Row>) {
730 let mut a_col = RowCollection::from(&a);
731 let b_col = RowCollection::from(&b);
732
733 a_col.concat(&b_col);
734
735 let all_rows = a.iter().chain(b.iter());
736 for (idx, row) in all_rows.enumerate() {
737 let (col_row, _) = a_col.get(idx).unwrap();
738 assert_eq!(col_row, row.borrow());
739 }
740 }
741
742 proptest!(
744 Config { cases: 3, ..Default::default() },
745 |(a in any::<Vec<Row>>(), b in any::<Vec<Row>>())| {
746 row_collection_merge(a, b)
748 }
749 );
750 }
751
752 #[mz_ore::test]
753 #[cfg_attr(miri, ignore)] fn proptest_sort() {
755 fn row_collection_sort(mut a: Vec<Row>, mut b: Vec<Row>) {
756 a.sort_by(|a, b| a.cmp(b));
757 b.sort_by(|a, b| a.cmp(b));
758
759 let sorted_view = RowCollection::merge_sorted(
760 &[RowCollection::from(&a), RowCollection::from(&b)],
761 &[],
762 );
763
764 a.append(&mut b);
765 a.sort_by(|a, b| a.cmp(b));
766
767 for i in 0..a.len() {
768 let (row_x, _) = sorted_view.get(i).unwrap();
769 let row_y = a.get(i).unwrap();
770
771 assert_eq!(row_x, row_y.borrow());
772 }
773 }
774
775 proptest!(
777 Config { cases: 5, ..Default::default() },
778 |(a in any::<Vec<Row>>(), b in any::<Vec<Row>>())| {
779 row_collection_sort(a, b)
781 }
782 );
783 }
784}