1use std::cell::RefCell;
15use std::collections::BTreeMap;
16use std::rc::Rc;
17
18use differential_dataflow::AsCollection;
19use differential_dataflow::hashable::Hashable;
20use differential_dataflow::lattice::Lattice;
21use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
22use differential_dataflow::operators::iterate::Variable as SemigroupVariable;
23use differential_dataflow::trace::implementations::merge_batcher::container::MergerChunk;
24use differential_dataflow::trace::{Builder, Trace};
25use differential_dataflow::{Data, VecCollection};
26use mz_compute_types::plan::top_k::{
27 BasicTopKPlan, MonotonicTop1Plan, MonotonicTopKPlan, TopKPlan,
28};
29use mz_expr::func::CastUint64ToInt64;
30use mz_expr::{BinaryFunc, EvalError, MirScalarExpr, UnaryFunc, func};
31use mz_ore::cast::CastFrom;
32use mz_ore::soft_assert_or_log;
33use mz_repr::{Datum, DatumVec, Diff, ReprScalarType, Row, SharedRow};
34use mz_storage_types::errors::DataflowError;
35use mz_timely_util::operator::CollectionExt;
36use timely::Container;
37use timely::container::{CapacityContainerBuilder, PushInto};
38use timely::dataflow::Scope;
39use timely::dataflow::channels::pact::Pipeline;
40use timely::dataflow::operators::Operator;
41
42use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
43use crate::extensions::reduce::MzReduce;
44use crate::render::Pairer;
45use crate::render::context::{CollectionBundle, Context};
46use crate::render::errors::MaybeValidatingRow;
47use crate::row_spine::{
48 DatumSeq, RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowValBuilder, RowValSpine,
49};
50use crate::typedefs::{KeyBatcher, MzTimestamp, RowRowSpine, RowSpine};
51
52impl<G> Context<G>
54where
55 G: Scope,
56 G::Timestamp: crate::render::RenderTimestamp,
57{
58 pub(crate) fn render_topk(
59 &self,
60 input: CollectionBundle<G>,
61 top_k_plan: TopKPlan,
62 ) -> CollectionBundle<G> {
63 let (ok_input, err_input) = input.as_specific_collection(None, &self.config_set);
64
65 let (ok_result, err_collection) = ok_input.scope().region_named("TopK", |inner| {
67 let ok_input = ok_input.enter_region(inner);
68 let mut err_collection = err_input.enter_region(inner);
69
70 match top_k_plan.limit().map(|l| (l.as_literal(), l)) {
78 None => {}
79 Some((Some(Ok(literal)), _))
80 if literal == Datum::Null || literal.unwrap_int64() >= 0 => {}
81 Some((_, expr)) => {
82 let expr = expr.clone();
88 let mut datum_vec = mz_repr::DatumVec::new();
89 let errors = ok_input.clone().flat_map(move |row| {
90 let temp_storage = mz_repr::RowArena::new();
91 let datums = datum_vec.borrow_with(&row);
92 match expr.eval(&datums[..], &temp_storage) {
93 Ok(l) if l != Datum::Null && l.unwrap_int64() < 0 => {
94 Some(EvalError::NegLimit.into())
95 }
96 Ok(_) => None,
97 Err(e) => Some(e.into()),
98 }
99 });
100 err_collection = err_collection.concat(errors);
101 }
102 }
103
104 let ok_result = match top_k_plan {
105 TopKPlan::MonotonicTop1(MonotonicTop1Plan {
106 group_key,
107 order_key,
108 must_consolidate,
109 }) => {
110 let (oks, errs) = self.render_top1_monotonic(
111 ok_input,
112 group_key,
113 order_key,
114 must_consolidate,
115 );
116 err_collection = err_collection.concat(errs);
117 oks
118 }
119 TopKPlan::MonotonicTopK(MonotonicTopKPlan {
120 order_key,
121 group_key,
122 arity,
123 mut limit,
124 must_consolidate,
125 }) => {
126 if let Some(expr) = limit.as_mut() {
128 let mut map = BTreeMap::new();
129 for (index, column) in group_key.iter().enumerate() {
130 map.insert(*column, index);
131 }
132 expr.permute_map(&map);
133 }
134
135 let mut datum_vec = mz_repr::DatumVec::new();
137 let mut ok_scope = ok_input.scope();
138 let collection = ok_input
139 .map(move |row| {
140 let group_row = {
141 let datums = datum_vec.borrow_with(&row);
142 SharedRow::pack(group_key.iter().map(|i| datums[*i]))
143 };
144 (group_row, row)
145 })
146 .consolidate_named_if::<KeyBatcher<_, _, _>>(
147 must_consolidate,
148 "Consolidated MonotonicTopK input",
149 );
150
151 let error_logger = self.error_logger();
153 let (collection, errs) = collection.ensure_monotonic(move |data, diff| {
154 error_logger.log(
155 "Non-monotonic input to MonotonicTopK",
156 &format!("data={data:?}, diff={diff}"),
157 );
158 let m = "tried to build monotonic top-k on non-monotonic input".into();
159 (DataflowError::from(EvalError::Internal(m)), Diff::ONE)
160 });
161 err_collection = err_collection.concat(errs);
162
163 let collection = if let Some(limit) = limit.clone() {
169 render_intra_ts_thinning(collection, order_key.clone(), limit)
170 } else {
171 collection
172 };
173
174 let pairer = Pairer::new(1);
175 let collection = collection.map(move |(group_row, row)| {
176 let hash = row.hashed();
177 let hash_key = pairer.merge(std::iter::once(Datum::from(hash)), &group_row);
178 (hash_key, row)
179 });
180
181 let delay = std::time::Duration::from_secs(10);
190 let (retractions_var, retractions) = SemigroupVariable::new(
191 &mut ok_scope,
192 <G::Timestamp as crate::render::RenderTimestamp>::system_delay(
193 delay.try_into().expect("must fit"),
194 ),
195 );
196 let thinned = collection.clone().concat(retractions.negate());
197
198 let (result, errs) =
204 self.build_topk_stage(thinned, order_key, 1u64, 0, limit, arity, false);
205 let result = CollectionExt::consolidate_named::<KeyBatcher<_, _, _>>(
207 result,
208 "Monotonic TopK final consolidate",
209 );
210 retractions_var.set(collection.concat(result.clone().negate()));
211 soft_assert_or_log!(
212 errs.is_none(),
213 "requested no validation, but received error collection"
214 );
215
216 result.map(|(_key_hash, row)| row)
217 }
218 TopKPlan::Basic(BasicTopKPlan {
219 group_key,
220 order_key,
221 offset,
222 mut limit,
223 arity,
224 buckets,
225 }) => {
226 if let Some(expr) = limit.as_mut() {
228 let mut map = BTreeMap::new();
229 for (index, column) in group_key.iter().enumerate() {
230 map.insert(*column, index);
231 }
232 expr.permute_map(&map);
233 }
234
235 let (oks, errs) = self.build_topk(
236 ok_input, group_key, order_key, offset, limit, arity, buckets,
237 );
238 err_collection = err_collection.concat(errs);
239 oks
240 }
241 };
242
243 (ok_result.leave_region(), err_collection.leave_region())
245 });
246
247 CollectionBundle::from_collections(ok_result, err_collection)
248 }
249
250 fn build_topk<S>(
252 &self,
253 collection: VecCollection<S, Row, Diff>,
254 group_key: Vec<usize>,
255 order_key: Vec<mz_expr::ColumnOrder>,
256 offset: usize,
257 limit: Option<mz_expr::MirScalarExpr>,
258 arity: usize,
259 buckets: Vec<u64>,
260 ) -> (
261 VecCollection<S, Row, Diff>,
262 VecCollection<S, DataflowError, Diff>,
263 )
264 where
265 S: Scope<Timestamp = G::Timestamp>,
266 {
267 let pairer = Pairer::new(1);
268 let mut datum_vec = mz_repr::DatumVec::new();
269 let mut collection = collection.map({
270 move |row| {
271 let group_row = {
272 let row_hash = row.hashed();
273 let datums = datum_vec.borrow_with(&row);
274 let iterator = group_key.iter().map(|i| datums[*i]);
275 pairer.merge(std::iter::once(Datum::from(row_hash)), iterator)
276 };
277 (group_row, row)
278 }
279 });
280
281 let mut validating = true;
282 let mut err_collection: Option<VecCollection<S, _, _>> = None;
283
284 if let Some(mut limit) = limit.clone() {
285 if offset > 0 {
288 let new_limit = (|| {
289 let limit = limit.as_literal_int64()?;
290 let offset = i64::try_from(offset).ok()?;
291 limit.checked_add(offset)
292 })();
293
294 if let Some(new_limit) = new_limit {
295 limit =
296 MirScalarExpr::literal_ok(Datum::Int64(new_limit), ReprScalarType::Int64);
297 } else {
298 limit = limit.call_binary(
299 MirScalarExpr::literal_ok(
300 Datum::UInt64(u64::cast_from(offset)),
301 ReprScalarType::UInt64,
302 )
303 .call_unary(UnaryFunc::CastUint64ToInt64(CastUint64ToInt64)),
304 BinaryFunc::AddInt64(func::AddInt64),
305 );
306 }
307 }
308
309 for bucket in buckets.into_iter() {
313 let (oks, errs) = self.build_topk_stage(
317 collection,
318 order_key.clone(),
319 bucket,
320 0,
321 Some(limit.clone()),
322 arity,
323 validating,
324 );
325 collection = oks;
326 if validating {
327 err_collection = errs;
328 validating = false;
329 }
330 }
331 }
332
333 let (oks, errs) = self.build_topk_stage(
337 collection, order_key, 1u64, offset, limit, arity, validating,
338 );
339 let oks =
341 CollectionExt::consolidate_named::<KeyBatcher<_, _, _>>(oks, "TopK final consolidate");
342 collection = oks;
343 if validating {
344 err_collection = errs;
345 }
346 (
347 collection.map(|(_key_hash, row)| row),
348 err_collection.expect("at least one stage validated its inputs"),
349 )
350 }
351
352 fn build_topk_stage<S>(
385 &self,
386 collection: VecCollection<S, (Row, Row), Diff>,
387 order_key: Vec<mz_expr::ColumnOrder>,
388 modulus: u64,
389 offset: usize,
390 limit: Option<mz_expr::MirScalarExpr>,
391 arity: usize,
392 validating: bool,
393 ) -> (
394 VecCollection<S, (Row, Row), Diff>,
395 Option<VecCollection<S, DataflowError, Diff>>,
396 )
397 where
398 S: Scope<Timestamp = G::Timestamp>,
399 {
400 let input = collection.map(move |(hash_key, row)| {
403 let mut hash_key_iter = hash_key.iter();
404 let hash = hash_key_iter.next().unwrap().unwrap_uint64() % modulus;
405 let hash_key = SharedRow::pack(std::iter::once(hash.into()).chain(hash_key_iter));
406 (hash_key, row)
407 });
408
409 let (input, oks, errs) = if validating {
411 let (input, stage) = build_topk_negated_stage::<
413 S,
414 RowValBuilder<_, _, _>,
415 RowValSpine<Result<Row, Row>, _, _>,
416 >(&input, order_key, offset, limit, arity);
417 let stage = stage.as_collection(|k, v| (k.to_row(), v.clone()));
418
419 let error_logger = self.error_logger();
421 type CB<C> = CapacityContainerBuilder<C>;
422 let (oks, errs) = stage.map_fallible::<CB<_>, CB<_>, _, _, _>(
423 "Demuxing Errors",
424 move |(hk, result)| match result {
425 Err(v) => {
426 let mut hk_iter = hk.iter();
427 let h = hk_iter.next().unwrap().unwrap_uint64();
428 let k = SharedRow::pack(hk_iter);
429 let message = "Negative multiplicities in TopK";
430 error_logger.log(message, &format!("k={k:?}, h={h}, v={v:?}"));
431 Err(EvalError::Internal(message.into()).into())
432 }
433 Ok(t) => Ok((hk, t)),
434 },
435 );
436 (input, oks, Some(errs))
437 } else {
438 let (input, stage) =
440 build_topk_negated_stage::<S, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
441 &input, order_key, offset, limit, arity,
442 );
443 let stage = stage.as_collection(|k, v| (k.to_row(), v.to_row()));
445
446 (input, stage, None)
447 };
448 let input = input.as_collection(|k, v| (k.to_row(), v.to_row()));
449 (oks.concat(input), errs)
450 }
451
452 fn render_top1_monotonic<S>(
453 &self,
454 collection: VecCollection<S, Row, Diff>,
455 group_key: Vec<usize>,
456 order_key: Vec<mz_expr::ColumnOrder>,
457 must_consolidate: bool,
458 ) -> (
459 VecCollection<S, Row, Diff>,
460 VecCollection<S, DataflowError, Diff>,
461 )
462 where
463 S: Scope<Timestamp = G::Timestamp>,
464 {
465 let collection = collection
470 .map({
471 let mut datum_vec = mz_repr::DatumVec::new();
472 move |row| {
473 let group_key = {
475 let datums = datum_vec.borrow_with(&row);
476 SharedRow::pack(group_key.iter().map(|i| datums[*i]))
477 };
478 (group_key, row)
479 }
480 })
481 .consolidate_named_if::<KeyBatcher<_, _, _>>(
482 must_consolidate,
483 "Consolidated MonotonicTop1 input",
484 );
485
486 let error_logger = self.error_logger();
488 let (partial, errs) = collection.ensure_monotonic(move |data, diff| {
489 error_logger.log(
490 "Non-monotonic input to MonotonicTop1",
491 &format!("data={data:?}, diff={diff}"),
492 );
493 let m = "tried to build monotonic top-1 on non-monotonic input".into();
494 (EvalError::Internal(m).into(), Diff::ONE)
495 });
496 let partial: KeyCollection<_, _, _> = partial
497 .explode_one(move |(group_key, row)| {
498 (
499 group_key,
500 monoids::Top1Monoid {
501 row,
502 order_key: order_key.clone(),
503 },
504 )
505 })
506 .into();
507 let result = partial
508 .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
509 "Arranged MonotonicTop1 partial [val: empty]",
510 )
511 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
512 "MonotonicTop1",
513 move |_key, input, output| {
514 let accum: &monoids::Top1Monoid = &input[0].1;
515 output.push((accum.row.clone(), Diff::ONE));
516 },
517 );
518 (result.as_collection(|_k, v| v.to_row()), errs)
520 }
521}
522
523fn build_topk_negated_stage<G, Bu, Tr>(
531 input: &VecCollection<G, (Row, Row), Diff>,
532 order_key: Vec<mz_expr::ColumnOrder>,
533 offset: usize,
534 limit: Option<mz_expr::MirScalarExpr>,
535 arity: usize,
536) -> (
537 Arranged<G, TraceAgent<RowRowSpine<G::Timestamp, Diff>>>,
538 Arranged<G, TraceAgent<Tr>>,
539)
540where
541 G: Scope,
542 G::Timestamp: MzTimestamp,
543 Bu: Builder<
544 Time = G::Timestamp,
545 Input: Container + MergerChunk + PushInto<((Row, Tr::ValOwn), G::Timestamp, Diff)>,
546 Output = Tr::Batch,
547 >,
548 Tr: for<'a> Trace<
549 Key<'a> = DatumSeq<'a>,
550 KeyOwn = Row,
551 ValOwn: Data + MaybeValidatingRow<Row, Row>,
552 Time = G::Timestamp,
553 Diff = Diff,
554 > + 'static,
555 Arranged<G, TraceAgent<Tr>>: ArrangementSize,
556{
557 let mut datum_vec = mz_repr::DatumVec::new();
558
559 let arranged = input
564 .clone()
565 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
566 "Arranged TopK input",
567 );
568
569 let limit = limit.map(|l| match l.as_literal() {
571 Some(Ok(Datum::Null)) => Ok(Diff::MAX),
572 Some(Ok(d)) => Ok(Diff::from(d.unwrap_int64())),
573 _ => Err(l),
574 });
575
576 let reduced = arranged
577 .clone()
578 .mz_reduce_abelian::<_, Bu, Tr>("Reduced TopK input", {
579 move |mut hash_key, source, target: &mut Vec<(Tr::ValOwn, Diff)>| {
580 let limit = match &limit {
582 Some(Ok(lit)) => Some(*lit),
583 Some(Err(expr)) => {
584 let temp_storage = mz_repr::RowArena::new();
587 let _hash = hash_key.next();
588 let mut key_datums = datum_vec.borrow();
589 key_datums.extend(hash_key);
590 let datum_limit = expr
591 .eval(&key_datums, &temp_storage)
592 .unwrap_or(Datum::Int64(0));
593 Some(match datum_limit {
594 Datum::Null => Diff::MAX,
595 d => Diff::from(d.unwrap_int64()),
596 })
597 }
598 None => None,
599 };
600
601 if let Some(err) = Tr::ValOwn::into_error() {
602 for (datums, diff) in source.iter() {
603 if diff.is_positive() {
604 continue;
605 }
606 target.push((err((*datums).to_row()), Diff::ONE));
607 return;
608 }
609 }
610
611 let must_shrink = offset > 0
613 || limit
614 .map(|l| source.iter().map(|(_, d)| *d).sum::<Diff>() > l)
615 .unwrap_or(false);
616 if !must_shrink {
617 return;
618 }
619
620 target.reserve(source.len());
624 for (datums, diff) in source.iter() {
625 target.push((Tr::ValOwn::ok((*datums).to_row()), -diff));
626 }
627 let mut offset = offset;
629 let mut limit = limit;
630
631 let mut indexes = (0..source.len()).collect::<Vec<_>>();
633 let mut buffer = datum_vec.borrow();
636 for (index, (datums, _)) in source.iter().enumerate() {
637 buffer.extend(*datums);
638 assert_eq!(buffer.len(), arity * (index + 1));
639 }
640 let width = buffer.len() / source.len();
641
642 indexes.sort_by(|left, right| {
644 let left = &buffer[left * width..][..width];
645 let right = &buffer[right * width..][..width];
646 mz_expr::compare_columns(&order_key, left, right, || left.cmp(right))
649 });
650
651 for index in indexes.into_iter() {
654 let (datums, mut diff) = source[index];
655 if !diff.is_positive() {
656 continue;
657 }
658 if offset > 0 {
660 let to_skip =
661 std::cmp::min(offset, usize::try_from(diff.into_inner()).unwrap());
662 offset -= to_skip;
663 diff -= Diff::try_from(to_skip).unwrap();
664 }
665 if let Some(limit) = &mut limit {
667 diff = std::cmp::min(diff, Diff::from(*limit));
668 *limit -= diff;
669 }
670 if diff.is_positive() {
672 target.push((Tr::ValOwn::ok(datums.to_row()), diff));
675 }
676 }
677 }
678 });
679 (arranged, reduced)
680}
681
682fn render_intra_ts_thinning<S>(
683 collection: VecCollection<S, (Row, Row), Diff>,
684 order_key: Vec<mz_expr::ColumnOrder>,
685 limit: mz_expr::MirScalarExpr,
686) -> VecCollection<S, (Row, Row), Diff>
687where
688 S: Scope,
689 S::Timestamp: Lattice,
690{
691 let mut datum_vec = mz_repr::DatumVec::new();
692
693 let mut aggregates = BTreeMap::new();
694 let shared = Rc::new(RefCell::new(monoids::Top1MonoidShared {
695 order_key,
696 left: DatumVec::new(),
697 right: DatumVec::new(),
698 }));
699 collection
700 .inner
701 .unary_notify(
702 Pipeline,
703 "TopKIntraTimeThinning",
704 [],
705 move |input, output, notificator| {
706 input.for_each_time(|time, data| {
707 let agg_time = aggregates
708 .entry(time.time().clone())
709 .or_insert_with(BTreeMap::new);
710 for ((grp_row, row), record_time, diff) in data.flat_map(|data| data.drain(..))
711 {
712 let monoid = monoids::Top1MonoidLocal {
713 row,
714 shared: Rc::clone(&shared),
715 };
716
717 let limit = if let Some(l) = limit.as_literal_int64() {
719 l
720 } else {
721 let temp_storage = mz_repr::RowArena::new();
722 let key_datums = datum_vec.borrow_with(&grp_row);
723 let datum_limit = limit
726 .eval(&key_datums, &temp_storage)
727 .unwrap_or(mz_repr::Datum::Int64(0));
728 if datum_limit == Datum::Null {
729 i64::MAX
730 } else {
731 datum_limit.unwrap_int64()
732 }
733 };
734
735 let topk = agg_time
736 .entry((grp_row, record_time))
737 .or_insert_with(move || topk_agg::TopKBatch::new(limit));
738 topk.update(monoid, diff.into_inner());
739 }
740 notificator.notify_at(time.retain(0));
741 });
742
743 notificator.for_each(|time, _, _| {
744 if let Some(aggs) = aggregates.remove(time.time()) {
745 let mut session = output.session(&time);
746 for ((grp_row, record_time), topk) in aggs {
747 session.give_iterator(topk.into_iter().map(|(monoid, diff)| {
748 (
749 (grp_row.clone(), monoid.into_row()),
750 record_time.clone(),
751 diff.into(),
752 )
753 }))
754 }
755 }
756 });
757 },
758 )
759 .as_collection()
760}
761
762pub mod topk_agg {
764 use differential_dataflow::consolidation;
765 use smallvec::SmallVec;
766
767 pub struct TopKBatch<T> {
774 updates: SmallVec<[(T, i64); 16]>,
775 clean: usize,
776 limit: i64,
777 }
778
779 impl<T: Ord> TopKBatch<T> {
780 pub fn new(limit: i64) -> Self {
781 Self {
782 updates: SmallVec::new(),
783 clean: 0,
784 limit,
785 }
786 }
787
788 #[inline]
795 pub fn update(&mut self, item: T, value: i64) {
796 self.updates.push((item, value));
797 self.maintain_bounds();
798 }
799
800 #[inline]
806 pub fn compact(&mut self) {
807 if self.clean < self.updates.len() && self.updates.len() > 1 {
808 let len = consolidation::consolidate_slice(&mut self.updates);
809 self.updates.truncate(len);
810
811 let mut limit = self.limit;
813 self.updates.retain(|x| {
814 if limit > 0 {
815 limit -= x.1;
816 true
817 } else {
818 false
819 }
820 });
821 if limit < 0 {
829 if let Some(item) = self.updates.last_mut() {
830 item.1 -= -limit;
833 }
834 }
835 }
836 self.clean = self.updates.len();
837 }
838
839 fn maintain_bounds(&mut self) {
842 if self.updates.len() > 32 && self.updates.len() >> 1 >= self.clean {
844 self.compact()
845 }
846 }
847 }
848
849 impl<T: Ord> IntoIterator for TopKBatch<T> {
850 type Item = (T, i64);
851 type IntoIter = smallvec::IntoIter<[(T, i64); 16]>;
852
853 fn into_iter(mut self) -> Self::IntoIter {
854 self.compact();
855 self.updates.into_iter()
856 }
857 }
858}
859
860pub mod monoids {
862 use std::cell::RefCell;
863 use std::cmp::Ordering;
864 use std::hash::{Hash, Hasher};
865 use std::rc::Rc;
866
867 use differential_dataflow::containers::{Columnation, Region};
868 use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
869 use mz_expr::ColumnOrder;
870 use mz_repr::{DatumVec, Diff, Row};
871 use serde::{Deserialize, Serialize};
872
873 #[derive(Eq, PartialEq, Debug, Serialize, Deserialize, Hash, Default)]
875 pub struct Top1Monoid {
876 pub row: Row,
877 pub order_key: Vec<ColumnOrder>,
878 }
879
880 impl Clone for Top1Monoid {
881 #[inline]
882 fn clone(&self) -> Self {
883 Self {
884 row: self.row.clone(),
885 order_key: self.order_key.clone(),
886 }
887 }
888
889 #[inline]
890 fn clone_from(&mut self, source: &Self) {
891 self.row.clone_from(&source.row);
892 self.order_key.clone_from(&source.order_key);
893 }
894 }
895
896 impl Multiply<Diff> for Top1Monoid {
897 type Output = Self;
898
899 fn multiply(self, factor: &Diff) -> Self {
900 assert!(factor.is_positive());
905 self
906 }
907 }
908
909 impl Ord for Top1Monoid {
910 fn cmp(&self, other: &Self) -> Ordering {
911 debug_assert_eq!(self.order_key, other.order_key);
912
913 let left: Vec<_> = self.row.unpack();
916 let right: Vec<_> = other.row.unpack();
917 mz_expr::compare_columns(&self.order_key, &left, &right, || left.cmp(&right))
918 }
919 }
920 impl PartialOrd for Top1Monoid {
921 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
922 Some(self.cmp(other))
923 }
924 }
925
926 impl Semigroup for Top1Monoid {
927 fn plus_equals(&mut self, rhs: &Self) {
928 let cmp = (*self).cmp(rhs);
929 if cmp == Ordering::Greater {
931 self.clone_from(rhs);
932 }
933 }
934 }
935
936 impl IsZero for Top1Monoid {
937 fn is_zero(&self) -> bool {
938 false
939 }
940 }
941
942 impl Columnation for Top1Monoid {
943 type InnerRegion = Top1MonoidRegion;
944 }
945
946 #[derive(Default)]
947 pub struct Top1MonoidRegion {
948 row_region: <Row as Columnation>::InnerRegion,
949 order_key_region: <Vec<ColumnOrder> as Columnation>::InnerRegion,
950 }
951
952 impl Region for Top1MonoidRegion {
953 type Item = Top1Monoid;
954
955 unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
956 let row = unsafe { self.row_region.copy(&item.row) };
957 let order_key = unsafe { self.order_key_region.copy(&item.order_key) };
958 Self::Item { row, order_key }
959 }
960
961 fn clear(&mut self) {
962 self.row_region.clear();
963 self.order_key_region.clear();
964 }
965
966 fn reserve_items<'a, I>(&mut self, items1: I)
967 where
968 Self: 'a,
969 I: Iterator<Item = &'a Self::Item> + Clone,
970 {
971 let items2 = items1.clone();
972 self.row_region
973 .reserve_items(items1.into_iter().map(|s| &s.row));
974 self.order_key_region
975 .reserve_items(items2.into_iter().map(|s| &s.order_key));
976 }
977
978 fn reserve_regions<'a, I>(&mut self, regions1: I)
979 where
980 Self: 'a,
981 I: Iterator<Item = &'a Self> + Clone,
982 {
983 let regions2 = regions1.clone();
984 self.row_region
985 .reserve_regions(regions1.into_iter().map(|s| &s.row_region));
986 self.order_key_region
987 .reserve_regions(regions2.into_iter().map(|s| &s.order_key_region));
988 }
989
990 fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
991 self.row_region.heap_size(&mut callback);
992 self.order_key_region.heap_size(callback);
993 }
994 }
995
996 #[derive(Debug)]
998 pub struct Top1MonoidShared {
999 pub order_key: Vec<ColumnOrder>,
1000 pub left: DatumVec,
1001 pub right: DatumVec,
1002 }
1003
1004 #[derive(Debug, Clone)]
1007 pub struct Top1MonoidLocal {
1008 pub row: Row,
1009 pub shared: Rc<RefCell<Top1MonoidShared>>,
1010 }
1011
1012 impl Top1MonoidLocal {
1013 pub fn into_row(self) -> Row {
1014 self.row
1015 }
1016 }
1017
1018 impl PartialEq for Top1MonoidLocal {
1019 fn eq(&self, other: &Self) -> bool {
1020 self.row.eq(&other.row)
1021 }
1022 }
1023
1024 impl Eq for Top1MonoidLocal {}
1025
1026 impl Hash for Top1MonoidLocal {
1027 fn hash<H: Hasher>(&self, state: &mut H) {
1028 self.row.hash(state);
1029 }
1030 }
1031
1032 impl Ord for Top1MonoidLocal {
1033 fn cmp(&self, other: &Self) -> Ordering {
1034 debug_assert!(Rc::ptr_eq(&self.shared, &other.shared));
1035 let Top1MonoidShared {
1036 left,
1037 right,
1038 order_key,
1039 } = &mut *self.shared.borrow_mut();
1040
1041 let left = left.borrow_with(&self.row);
1042 let right = right.borrow_with(&other.row);
1043 mz_expr::compare_columns(order_key, &left, &right, || left.cmp(&right))
1044 }
1045 }
1046
1047 impl PartialOrd for Top1MonoidLocal {
1048 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1049 Some(self.cmp(other))
1050 }
1051 }
1052
1053 impl Semigroup for Top1MonoidLocal {
1054 fn plus_equals(&mut self, rhs: &Self) {
1055 let cmp = (*self).cmp(rhs);
1056 if cmp == Ordering::Greater {
1058 self.clone_from(rhs);
1059 }
1060 }
1061 }
1062
1063 impl IsZero for Top1MonoidLocal {
1064 fn is_zero(&self) -> bool {
1065 false
1066 }
1067 }
1068}