1use std::cell::RefCell;
15use std::collections::BTreeMap;
16use std::rc::Rc;
17
18use differential_dataflow::AsCollection;
19use differential_dataflow::hashable::Hashable;
20use differential_dataflow::lattice::Lattice;
21use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
22use differential_dataflow::operators::iterate::Variable as SemigroupVariable;
23use differential_dataflow::trace::implementations::BatchContainer;
24use differential_dataflow::trace::implementations::merge_batcher::container::InternalMerge;
25use differential_dataflow::trace::{Builder, Trace};
26use differential_dataflow::{Data, VecCollection};
27use mz_compute_types::plan::top_k::{
28 BasicTopKPlan, MonotonicTop1Plan, MonotonicTopKPlan, TopKPlan,
29};
30use mz_expr::func::CastUint64ToInt64;
31use mz_expr::{BinaryFunc, EvalError, MirScalarExpr, UnaryFunc, func};
32use mz_ore::cast::CastFrom;
33use mz_ore::soft_assert_or_log;
34use mz_repr::{Datum, DatumVec, Diff, ReprScalarType, Row, SharedRow};
35use mz_storage_types::errors::DataflowError;
36use mz_timely_util::operator::CollectionExt;
37use timely::Container;
38use timely::container::{CapacityContainerBuilder, PushInto};
39use timely::dataflow::channels::pact::Pipeline;
40use timely::dataflow::operators::Operator;
41
42use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
43use crate::extensions::reduce::{ClearContainer, MzReduce};
44use crate::render::Pairer;
45use crate::render::context::{CollectionBundle, Context};
46use crate::render::errors::MaybeValidatingRow;
47use crate::row_spine::{
48 DatumSeq, RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowValBuilder, RowValSpine,
49};
50use crate::typedefs::{KeyBatcher, MzTimestamp, RowRowSpine, RowSpine};
51
52impl<'scope, T: crate::render::RenderTimestamp> Context<'scope, T> {
54 pub(crate) fn render_topk(
55 &self,
56 input: CollectionBundle<'scope, T>,
57 top_k_plan: TopKPlan,
58 ) -> CollectionBundle<'scope, T> {
59 let (ok_input, err_input) = input.as_specific_collection(None, &self.config_set);
60
61 let outer_scope = ok_input.scope();
63 let (ok_result, err_collection) = outer_scope.clone().region_named("TopK", |inner| {
64 let ok_input = ok_input.enter_region(inner);
65 let mut err_collection = err_input.enter_region(inner);
66
67 match top_k_plan.limit().map(|l| (l.as_literal(), l)) {
75 None => {}
76 Some((Some(Ok(literal)), _))
77 if literal == Datum::Null || literal.unwrap_int64() >= 0 => {}
78 Some((_, expr)) => {
79 let expr = expr.clone();
85 let mut datum_vec = mz_repr::DatumVec::new();
86 let errors = ok_input.clone().flat_map(move |row| {
87 let temp_storage = mz_repr::RowArena::new();
88 let datums = datum_vec.borrow_with(&row);
89 match expr.eval(&datums[..], &temp_storage) {
90 Ok(l) if l != Datum::Null && l.unwrap_int64() < 0 => {
91 Some(EvalError::NegLimit.into())
92 }
93 Ok(_) => None,
94 Err(e) => Some(e.into()),
95 }
96 });
97 err_collection = err_collection.concat(errors);
98 }
99 }
100
101 let ok_result = match top_k_plan {
102 TopKPlan::MonotonicTop1(MonotonicTop1Plan {
103 group_key,
104 order_key,
105 must_consolidate,
106 }) => {
107 let (oks, errs) = self.render_top1_monotonic(
108 ok_input,
109 group_key,
110 order_key,
111 must_consolidate,
112 );
113 err_collection = err_collection.concat(errs);
114 oks
115 }
116 TopKPlan::MonotonicTopK(MonotonicTopKPlan {
117 order_key,
118 group_key,
119 arity,
120 mut limit,
121 must_consolidate,
122 }) => {
123 if let Some(expr) = limit.as_mut() {
125 let mut map = BTreeMap::new();
126 for (index, column) in group_key.iter().enumerate() {
127 map.insert(*column, index);
128 }
129 expr.permute_map(&map);
130 }
131
132 let mut datum_vec = mz_repr::DatumVec::new();
134 let ok_scope = ok_input.scope();
135 let collection = ok_input
136 .map(move |row| {
137 let group_row = {
138 let datums = datum_vec.borrow_with(&row);
139 SharedRow::pack(group_key.iter().map(|i| datums[*i]))
140 };
141 (group_row, row)
142 })
143 .consolidate_named_if::<KeyBatcher<_, _, _>>(
144 must_consolidate,
145 "Consolidated MonotonicTopK input",
146 );
147
148 let error_logger = self.error_logger();
150 let (collection, errs) = collection.ensure_monotonic(move |data, diff| {
151 error_logger.log(
152 "Non-monotonic input to MonotonicTopK",
153 &format!("data={data:?}, diff={diff}"),
154 );
155 let m = "tried to build monotonic top-k on non-monotonic input".into();
156 (DataflowError::from(EvalError::Internal(m)), Diff::ONE)
157 });
158 err_collection = err_collection.concat(errs);
159
160 let collection = if let Some(limit) = limit.clone() {
166 render_intra_ts_thinning(collection, order_key.clone(), limit)
167 } else {
168 collection
169 };
170
171 let pairer = Pairer::new(1);
172 let collection = collection.map(move |(group_row, row)| {
173 let hash = row.hashed();
174 let hash_key = pairer.merge(std::iter::once(Datum::from(hash)), &group_row);
175 (hash_key, row)
176 });
177
178 let delay = std::time::Duration::from_secs(10);
187 let (retractions_var, retractions) = SemigroupVariable::new(
188 ok_scope,
189 <T as crate::render::RenderTimestamp>::system_delay(
190 delay.try_into().expect("must fit"),
191 ),
192 );
193 let thinned = collection.clone().concat(retractions.negate());
194
195 let (result, errs) =
201 self.build_topk_stage(thinned, order_key, 1u64, 0, limit, arity, false);
202 let result = CollectionExt::consolidate_named::<KeyBatcher<_, _, _>>(
204 result,
205 "Monotonic TopK final consolidate",
206 );
207 retractions_var.set(collection.concat(result.clone().negate()));
208 soft_assert_or_log!(
209 errs.is_none(),
210 "requested no validation, but received error collection"
211 );
212
213 result.map(|(_key_hash, row)| row)
214 }
215 TopKPlan::Basic(BasicTopKPlan {
216 group_key,
217 order_key,
218 offset,
219 mut limit,
220 arity,
221 buckets,
222 }) => {
223 if let Some(expr) = limit.as_mut() {
225 let mut map = BTreeMap::new();
226 for (index, column) in group_key.iter().enumerate() {
227 map.insert(*column, index);
228 }
229 expr.permute_map(&map);
230 }
231
232 let (oks, errs) = self.build_topk(
233 ok_input, group_key, order_key, offset, limit, arity, buckets,
234 );
235 err_collection = err_collection.concat(errs);
236 oks
237 }
238 };
239
240 (
242 ok_result.leave_region(outer_scope),
243 err_collection.leave_region(outer_scope),
244 )
245 });
246
247 CollectionBundle::from_collections(ok_result, err_collection)
248 }
249
250 fn build_topk<'s>(
252 &self,
253 collection: VecCollection<'s, T, Row, Diff>,
254 group_key: Vec<usize>,
255 order_key: Vec<mz_expr::ColumnOrder>,
256 offset: usize,
257 limit: Option<mz_expr::MirScalarExpr>,
258 arity: usize,
259 buckets: Vec<u64>,
260 ) -> (
261 VecCollection<'s, T, Row, Diff>,
262 VecCollection<'s, T, DataflowError, Diff>,
263 ) {
264 let pairer = Pairer::new(1);
265 let mut datum_vec = mz_repr::DatumVec::new();
266 let mut collection = collection.map({
267 move |row| {
268 let group_row = {
269 let row_hash = row.hashed();
270 let datums = datum_vec.borrow_with(&row);
271 let iterator = group_key.iter().map(|i| datums[*i]);
272 pairer.merge(std::iter::once(Datum::from(row_hash)), iterator)
273 };
274 (group_row, row)
275 }
276 });
277
278 let mut validating = true;
279 let mut err_collection: Option<VecCollection<'s, T, _, _>> = None;
280
281 if let Some(mut limit) = limit.clone() {
282 if offset > 0 {
285 let new_limit = (|| {
286 let limit = limit.as_literal_int64()?;
287 let offset = i64::try_from(offset).ok()?;
288 limit.checked_add(offset)
289 })();
290
291 if let Some(new_limit) = new_limit {
292 limit =
293 MirScalarExpr::literal_ok(Datum::Int64(new_limit), ReprScalarType::Int64);
294 } else {
295 limit = limit.call_binary(
296 MirScalarExpr::literal_ok(
297 Datum::UInt64(u64::cast_from(offset)),
298 ReprScalarType::UInt64,
299 )
300 .call_unary(UnaryFunc::CastUint64ToInt64(CastUint64ToInt64)),
301 BinaryFunc::AddInt64(func::AddInt64),
302 );
303 }
304 }
305
306 for bucket in buckets.into_iter() {
310 let (oks, errs) = self.build_topk_stage(
314 collection,
315 order_key.clone(),
316 bucket,
317 0,
318 Some(limit.clone()),
319 arity,
320 validating,
321 );
322 collection = oks;
323 if validating {
324 err_collection = errs;
325 validating = false;
326 }
327 }
328 }
329
330 let (oks, errs) = self.build_topk_stage(
334 collection, order_key, 1u64, offset, limit, arity, validating,
335 );
336 let oks =
338 CollectionExt::consolidate_named::<KeyBatcher<_, _, _>>(oks, "TopK final consolidate");
339 collection = oks;
340 if validating {
341 err_collection = errs;
342 }
343 (
344 collection.map(|(_key_hash, row)| row),
345 err_collection.expect("at least one stage validated its inputs"),
346 )
347 }
348
349 fn build_topk_stage<'s>(
382 &self,
383 collection: VecCollection<'s, T, (Row, Row), Diff>,
384 order_key: Vec<mz_expr::ColumnOrder>,
385 modulus: u64,
386 offset: usize,
387 limit: Option<mz_expr::MirScalarExpr>,
388 arity: usize,
389 validating: bool,
390 ) -> (
391 VecCollection<'s, T, (Row, Row), Diff>,
392 Option<VecCollection<'s, T, DataflowError, Diff>>,
393 ) {
394 let input = collection.map(move |(hash_key, row)| {
397 let mut hash_key_iter = hash_key.iter();
398 let hash = hash_key_iter.next().unwrap().unwrap_uint64() % modulus;
399 let hash_key = SharedRow::pack(std::iter::once(hash.into()).chain(hash_key_iter));
400 (hash_key, row)
401 });
402
403 let (input, oks, errs) = if validating {
405 let (input, stage) = build_topk_negated_stage::<
407 T,
408 RowValBuilder<_, _, _>,
409 RowValSpine<Result<Row, Row>, _, _>,
410 >(&input, order_key, offset, limit, arity);
411 let stage = stage.as_collection(|k, v| (k.to_row(), v.clone()));
412
413 let error_logger = self.error_logger();
415 type CB<C> = CapacityContainerBuilder<C>;
416 let (oks, errs) = stage.map_fallible::<CB<_>, CB<_>, _, _, _>(
417 "Demuxing Errors",
418 move |(hk, result)| match result {
419 Err(v) => {
420 let mut hk_iter = hk.iter();
421 let h = hk_iter.next().unwrap().unwrap_uint64();
422 let k = SharedRow::pack(hk_iter);
423 let message = "Negative multiplicities in TopK";
424 error_logger.log(message, &format!("k={k:?}, h={h}, v={v:?}"));
425 Err(EvalError::Internal(message.into()).into())
426 }
427 Ok(t) => Ok((hk, t)),
428 },
429 );
430 (input, oks, Some(errs))
431 } else {
432 let (input, stage) =
434 build_topk_negated_stage::<T, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
435 &input, order_key, offset, limit, arity,
436 );
437 let stage = stage.as_collection(|k, v| (k.to_row(), v.to_row()));
439
440 (input, stage, None)
441 };
442 let input = input.as_collection(|k, v| (k.to_row(), v.to_row()));
443 (oks.concat(input), errs)
444 }
445
446 fn render_top1_monotonic<'s>(
447 &self,
448 collection: VecCollection<'s, T, Row, Diff>,
449 group_key: Vec<usize>,
450 order_key: Vec<mz_expr::ColumnOrder>,
451 must_consolidate: bool,
452 ) -> (
453 VecCollection<'s, T, Row, Diff>,
454 VecCollection<'s, T, DataflowError, Diff>,
455 ) {
456 let collection = collection
461 .map({
462 let mut datum_vec = mz_repr::DatumVec::new();
463 move |row| {
464 let group_key = {
466 let datums = datum_vec.borrow_with(&row);
467 SharedRow::pack(group_key.iter().map(|i| datums[*i]))
468 };
469 (group_key, row)
470 }
471 })
472 .consolidate_named_if::<KeyBatcher<_, _, _>>(
473 must_consolidate,
474 "Consolidated MonotonicTop1 input",
475 );
476
477 let error_logger = self.error_logger();
479 let (partial, errs) = collection.ensure_monotonic(move |data, diff| {
480 error_logger.log(
481 "Non-monotonic input to MonotonicTop1",
482 &format!("data={data:?}, diff={diff}"),
483 );
484 let m = "tried to build monotonic top-1 on non-monotonic input".into();
485 (EvalError::Internal(m).into(), Diff::ONE)
486 });
487 let partial: KeyCollection<_, _, _> = partial
488 .explode_one(move |(group_key, row)| {
489 (
490 group_key,
491 monoids::Top1Monoid {
492 row,
493 order_key: order_key.clone(),
494 },
495 )
496 })
497 .into();
498 let result = partial
499 .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
500 "Arranged MonotonicTop1 partial [val: empty]",
501 )
502 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
503 "MonotonicTop1",
504 move |_key, input, output| {
505 let accum: &monoids::Top1Monoid = &input[0].1;
506 output.push((accum.row.clone(), Diff::ONE));
507 },
508 );
509 (result.as_collection(|_k, v| v.to_row()), errs)
511 }
512}
513
514fn build_topk_negated_stage<'s, T, Bu, Tr>(
522 input: &VecCollection<'s, T, (Row, Row), Diff>,
523 order_key: Vec<mz_expr::ColumnOrder>,
524 offset: usize,
525 limit: Option<mz_expr::MirScalarExpr>,
526 arity: usize,
527) -> (
528 Arranged<'s, TraceAgent<RowRowSpine<T, Diff>>>,
529 Arranged<'s, TraceAgent<Tr>>,
530)
531where
532 T: MzTimestamp,
533 Bu: Builder<
534 Time = T,
535 Input: Container
536 + InternalMerge
537 + ClearContainer
538 + PushInto<((Row, Tr::ValOwn), T, Diff)>,
539 Output = Tr::Batch,
540 >,
541 Tr: for<'a> Trace<
542 Key<'a> = DatumSeq<'a>,
543 KeyContainer: BatchContainer<Owned = Row>,
544 ValOwn: Data + MaybeValidatingRow<Row, Row>,
545 Time = T,
546 Diff = Diff,
547 > + 'static,
548 Arranged<'s, TraceAgent<Tr>>: ArrangementSize,
549{
550 let mut datum_vec = mz_repr::DatumVec::new();
551
552 let arranged = input
557 .clone()
558 .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
559 "Arranged TopK input",
560 );
561
562 let limit = limit.map(|l| match l.as_literal() {
564 Some(Ok(Datum::Null)) => Ok(Diff::MAX),
565 Some(Ok(d)) => Ok(Diff::from(d.unwrap_int64())),
566 _ => Err(l),
567 });
568
569 let reduced = arranged
570 .clone()
571 .mz_reduce_abelian::<_, Bu, Tr>("Reduced TopK input", {
572 move |mut hash_key, source, target: &mut Vec<(Tr::ValOwn, Diff)>| {
573 let limit = match &limit {
575 Some(Ok(lit)) => Some(*lit),
576 Some(Err(expr)) => {
577 let temp_storage = mz_repr::RowArena::new();
580 let _hash = hash_key.next();
581 let mut key_datums = datum_vec.borrow();
582 key_datums.extend(hash_key);
583 let datum_limit = expr
584 .eval(&key_datums, &temp_storage)
585 .unwrap_or(Datum::Int64(0));
586 Some(match datum_limit {
587 Datum::Null => Diff::MAX,
588 d => Diff::from(d.unwrap_int64()),
589 })
590 }
591 None => None,
592 };
593
594 if let Some(err) = Tr::ValOwn::into_error() {
595 for (datums, diff) in source.iter() {
596 if diff.is_positive() {
597 continue;
598 }
599 target.push((err((*datums).to_row()), Diff::ONE));
600 return;
601 }
602 }
603
604 let must_shrink = offset > 0
606 || limit
607 .map(|l| source.iter().map(|(_, d)| *d).sum::<Diff>() > l)
608 .unwrap_or(false);
609 if !must_shrink {
610 return;
611 }
612
613 target.reserve(source.len());
617 for (datums, diff) in source.iter() {
618 target.push((Tr::ValOwn::ok((*datums).to_row()), -diff));
619 }
620 let mut offset = offset;
622 let mut limit = limit;
623
624 let mut indexes = (0..source.len()).collect::<Vec<_>>();
626 let mut buffer = datum_vec.borrow();
629 for (index, (datums, _)) in source.iter().enumerate() {
630 buffer.extend(*datums);
631 assert_eq!(buffer.len(), arity * (index + 1));
632 }
633 let width = buffer.len() / source.len();
634
635 indexes.sort_by(|left, right| {
637 let left = &buffer[left * width..][..width];
638 let right = &buffer[right * width..][..width];
639 mz_expr::compare_columns(&order_key, left, right, || left.cmp(right))
642 });
643
644 for index in indexes.into_iter() {
647 let (datums, mut diff) = source[index];
648 if !diff.is_positive() {
649 continue;
650 }
651 if offset > 0 {
653 let to_skip =
654 std::cmp::min(offset, usize::try_from(diff.into_inner()).unwrap());
655 offset -= to_skip;
656 diff -= Diff::try_from(to_skip).unwrap();
657 }
658 if let Some(limit) = &mut limit {
660 diff = std::cmp::min(diff, Diff::from(*limit));
661 *limit -= diff;
662 }
663 if diff.is_positive() {
665 target.push((Tr::ValOwn::ok(datums.to_row()), diff));
668 }
669 }
670 }
671 });
672 (arranged, reduced)
673}
674
675fn render_intra_ts_thinning<'s, T>(
676 collection: VecCollection<'s, T, (Row, Row), Diff>,
677 order_key: Vec<mz_expr::ColumnOrder>,
678 limit: mz_expr::MirScalarExpr,
679) -> VecCollection<'s, T, (Row, Row), Diff>
680where
681 T: timely::progress::Timestamp + Lattice,
682{
683 let mut datum_vec = mz_repr::DatumVec::new();
684
685 let mut aggregates = BTreeMap::new();
686 let shared = Rc::new(RefCell::new(monoids::Top1MonoidShared {
687 order_key,
688 left: DatumVec::new(),
689 right: DatumVec::new(),
690 }));
691 collection
692 .inner
693 .unary_notify(
694 Pipeline,
695 "TopKIntraTimeThinning",
696 [],
697 move |input, output, notificator| {
698 input.for_each_time(|time, data| {
699 let agg_time = aggregates
700 .entry(time.time().clone())
701 .or_insert_with(BTreeMap::new);
702 for ((grp_row, row), record_time, diff) in data.flat_map(|data| data.drain(..))
703 {
704 let monoid = monoids::Top1MonoidLocal {
705 row,
706 shared: Rc::clone(&shared),
707 };
708
709 let limit = if let Some(l) = limit.as_literal_int64() {
711 l
712 } else {
713 let temp_storage = mz_repr::RowArena::new();
714 let key_datums = datum_vec.borrow_with(&grp_row);
715 let datum_limit = limit
718 .eval(&key_datums, &temp_storage)
719 .unwrap_or(mz_repr::Datum::Int64(0));
720 if datum_limit == Datum::Null {
721 i64::MAX
722 } else {
723 datum_limit.unwrap_int64()
724 }
725 };
726
727 let topk = agg_time
728 .entry((grp_row, record_time))
729 .or_insert_with(move || topk_agg::TopKBatch::new(limit));
730 topk.update(monoid, diff.into_inner());
731 }
732 notificator.notify_at(time.retain(0));
733 });
734
735 notificator.for_each(|time, _, _| {
736 if let Some(aggs) = aggregates.remove(time.time()) {
737 let mut session = output.session(&time);
738 for ((grp_row, record_time), topk) in aggs {
739 session.give_iterator(topk.into_iter().map(|(monoid, diff)| {
740 (
741 (grp_row.clone(), monoid.into_row()),
742 record_time.clone(),
743 diff.into(),
744 )
745 }))
746 }
747 }
748 });
749 },
750 )
751 .as_collection()
752}
753
754pub mod topk_agg {
756 use differential_dataflow::consolidation;
757 use smallvec::SmallVec;
758
759 pub struct TopKBatch<T> {
766 updates: SmallVec<[(T, i64); 16]>,
767 clean: usize,
768 limit: i64,
769 }
770
771 impl<T: Ord> TopKBatch<T> {
772 pub fn new(limit: i64) -> Self {
773 Self {
774 updates: SmallVec::new(),
775 clean: 0,
776 limit,
777 }
778 }
779
780 #[inline]
787 pub fn update(&mut self, item: T, value: i64) {
788 self.updates.push((item, value));
789 self.maintain_bounds();
790 }
791
792 #[inline]
798 pub fn compact(&mut self) {
799 if self.clean < self.updates.len() && self.updates.len() > 1 {
800 let len = consolidation::consolidate_slice(&mut self.updates);
801 self.updates.truncate(len);
802
803 let mut limit = self.limit;
805 self.updates.retain(|x| {
806 if limit > 0 {
807 limit -= x.1;
808 true
809 } else {
810 false
811 }
812 });
813 if limit < 0 {
821 if let Some(item) = self.updates.last_mut() {
822 item.1 -= -limit;
825 }
826 }
827 }
828 self.clean = self.updates.len();
829 }
830
831 fn maintain_bounds(&mut self) {
834 if self.updates.len() > 32 && self.updates.len() >> 1 >= self.clean {
836 self.compact()
837 }
838 }
839 }
840
841 impl<T: Ord> IntoIterator for TopKBatch<T> {
842 type Item = (T, i64);
843 type IntoIter = smallvec::IntoIter<[(T, i64); 16]>;
844
845 fn into_iter(mut self) -> Self::IntoIter {
846 self.compact();
847 self.updates.into_iter()
848 }
849 }
850}
851
852pub mod monoids {
854 use std::cell::RefCell;
855 use std::cmp::Ordering;
856 use std::hash::{Hash, Hasher};
857 use std::rc::Rc;
858
859 use columnation::{Columnation, Region};
860 use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
861 use mz_expr::ColumnOrder;
862 use mz_repr::{DatumVec, Diff, Row};
863 use serde::{Deserialize, Serialize};
864
865 #[derive(Eq, PartialEq, Debug, Serialize, Deserialize, Hash, Default)]
867 pub struct Top1Monoid {
868 pub row: Row,
869 pub order_key: Vec<ColumnOrder>,
870 }
871
872 impl Clone for Top1Monoid {
873 #[inline]
874 fn clone(&self) -> Self {
875 Self {
876 row: self.row.clone(),
877 order_key: self.order_key.clone(),
878 }
879 }
880
881 #[inline]
882 fn clone_from(&mut self, source: &Self) {
883 self.row.clone_from(&source.row);
884 self.order_key.clone_from(&source.order_key);
885 }
886 }
887
888 impl Multiply<Diff> for Top1Monoid {
889 type Output = Self;
890
891 fn multiply(self, factor: &Diff) -> Self {
892 assert!(factor.is_positive());
897 self
898 }
899 }
900
901 impl Ord for Top1Monoid {
902 fn cmp(&self, other: &Self) -> Ordering {
903 debug_assert_eq!(self.order_key, other.order_key);
904
905 let left: Vec<_> = self.row.unpack();
908 let right: Vec<_> = other.row.unpack();
909 mz_expr::compare_columns(&self.order_key, &left, &right, || left.cmp(&right))
910 }
911 }
912 impl PartialOrd for Top1Monoid {
913 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
914 Some(self.cmp(other))
915 }
916 }
917
918 impl Semigroup for Top1Monoid {
919 fn plus_equals(&mut self, rhs: &Self) {
920 let cmp = (*self).cmp(rhs);
921 if cmp == Ordering::Greater {
923 self.clone_from(rhs);
924 }
925 }
926 }
927
928 impl IsZero for Top1Monoid {
929 fn is_zero(&self) -> bool {
930 false
931 }
932 }
933
934 impl Columnation for Top1Monoid {
935 type InnerRegion = Top1MonoidRegion;
936 }
937
938 #[derive(Default)]
939 pub struct Top1MonoidRegion {
940 row_region: <Row as Columnation>::InnerRegion,
941 order_key_region: <Vec<ColumnOrder> as Columnation>::InnerRegion,
942 }
943
944 impl Region for Top1MonoidRegion {
945 type Item = Top1Monoid;
946
947 unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
948 let row = unsafe { self.row_region.copy(&item.row) };
949 let order_key = unsafe { self.order_key_region.copy(&item.order_key) };
950 Self::Item { row, order_key }
951 }
952
953 fn clear(&mut self) {
954 self.row_region.clear();
955 self.order_key_region.clear();
956 }
957
958 fn reserve_items<'a, I>(&mut self, items1: I)
959 where
960 Self: 'a,
961 I: Iterator<Item = &'a Self::Item> + Clone,
962 {
963 let items2 = items1.clone();
964 self.row_region
965 .reserve_items(items1.into_iter().map(|s| &s.row));
966 self.order_key_region
967 .reserve_items(items2.into_iter().map(|s| &s.order_key));
968 }
969
970 fn reserve_regions<'a, I>(&mut self, regions1: I)
971 where
972 Self: 'a,
973 I: Iterator<Item = &'a Self> + Clone,
974 {
975 let regions2 = regions1.clone();
976 self.row_region
977 .reserve_regions(regions1.into_iter().map(|s| &s.row_region));
978 self.order_key_region
979 .reserve_regions(regions2.into_iter().map(|s| &s.order_key_region));
980 }
981
982 fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
983 self.row_region.heap_size(&mut callback);
984 self.order_key_region.heap_size(callback);
985 }
986 }
987
988 #[derive(Debug)]
990 pub struct Top1MonoidShared {
991 pub order_key: Vec<ColumnOrder>,
992 pub left: DatumVec,
993 pub right: DatumVec,
994 }
995
996 #[derive(Debug, Clone)]
999 pub struct Top1MonoidLocal {
1000 pub row: Row,
1001 pub shared: Rc<RefCell<Top1MonoidShared>>,
1002 }
1003
1004 impl Top1MonoidLocal {
1005 pub fn into_row(self) -> Row {
1006 self.row
1007 }
1008 }
1009
1010 impl PartialEq for Top1MonoidLocal {
1011 fn eq(&self, other: &Self) -> bool {
1012 self.row.eq(&other.row)
1013 }
1014 }
1015
1016 impl Eq for Top1MonoidLocal {}
1017
1018 impl Hash for Top1MonoidLocal {
1019 fn hash<H: Hasher>(&self, state: &mut H) {
1020 self.row.hash(state);
1021 }
1022 }
1023
1024 impl Ord for Top1MonoidLocal {
1025 fn cmp(&self, other: &Self) -> Ordering {
1026 debug_assert!(Rc::ptr_eq(&self.shared, &other.shared));
1027 let Top1MonoidShared {
1028 left,
1029 right,
1030 order_key,
1031 } = &mut *self.shared.borrow_mut();
1032
1033 let left = left.borrow_with(&self.row);
1034 let right = right.borrow_with(&other.row);
1035 mz_expr::compare_columns(order_key, &left, &right, || left.cmp(&right))
1036 }
1037 }
1038
1039 impl PartialOrd for Top1MonoidLocal {
1040 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1041 Some(self.cmp(other))
1042 }
1043 }
1044
1045 impl Semigroup for Top1MonoidLocal {
1046 fn plus_equals(&mut self, rhs: &Self) {
1047 let cmp = (*self).cmp(rhs);
1048 if cmp == Ordering::Greater {
1050 self.clone_from(rhs);
1051 }
1052 }
1053 }
1054
1055 impl IsZero for Top1MonoidLocal {
1056 fn is_zero(&self) -> bool {
1057 false
1058 }
1059 }
1060}