1use std::cell::RefCell;
15use std::collections::BTreeMap;
16use std::rc::Rc;
17
18use differential_dataflow::Data;
19use differential_dataflow::hashable::Hashable;
20use differential_dataflow::lattice::Lattice;
21use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
22use differential_dataflow::trace::{Builder, Trace};
23use differential_dataflow::{AsCollection, Collection};
24use mz_compute_types::plan::top_k::{
25 BasicTopKPlan, MonotonicTop1Plan, MonotonicTopKPlan, TopKPlan,
26};
27use mz_expr::func::CastUint64ToInt64;
28use mz_expr::{BinaryFunc, EvalError, MirScalarExpr, UnaryFunc};
29use mz_ore::cast::CastFrom;
30use mz_ore::soft_assert_or_log;
31use mz_repr::{Datum, DatumVec, Diff, Row, ScalarType, SharedRow};
32use mz_storage_types::errors::DataflowError;
33use mz_timely_util::operator::CollectionExt;
34use timely::Container;
35use timely::container::{CapacityContainerBuilder, PushInto};
36use timely::dataflow::Scope;
37use timely::dataflow::channels::pact::Pipeline;
38use timely::dataflow::operators::Operator;
39
40use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
41use crate::extensions::reduce::MzReduce;
42use crate::render::Pairer;
43use crate::render::context::{CollectionBundle, Context};
44use crate::render::errors::MaybeValidatingRow;
45use crate::row_spine::{
46 DatumSeq, RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowValBuilder, RowValSpine,
47};
48use crate::typedefs::{KeyBatcher, MzTimestamp, RowRowSpine, RowSpine};
49
50impl<G> Context<G>
52where
53 G: Scope,
54 G::Timestamp: crate::render::RenderTimestamp,
55{
56 pub(crate) fn render_topk(
57 &self,
58 input: CollectionBundle<G>,
59 top_k_plan: TopKPlan,
60 ) -> CollectionBundle<G> {
61 let (ok_input, err_input) = input.as_specific_collection(None, &self.config_set);
62
63 let (ok_result, err_collection) = ok_input.scope().region_named("TopK", |inner| {
65 let ok_input = ok_input.enter_region(inner);
66 let mut err_collection = err_input.enter_region(inner);
67
68 match top_k_plan.limit().map(|l| (l.as_literal(), l)) {
76 None => {}
77 Some((Some(Ok(literal)), _))
78 if literal == Datum::Null || literal.unwrap_int64() >= 0 => {}
79 Some((_, expr)) => {
80 let expr = expr.clone();
86 let mut datum_vec = mz_repr::DatumVec::new();
87 let errors = ok_input.flat_map(move |row| {
88 let temp_storage = mz_repr::RowArena::new();
89 let datums = datum_vec.borrow_with(&row);
90 match expr.eval(&datums[..], &temp_storage) {
91 Ok(l) if l != Datum::Null && l.unwrap_int64() < 0 => {
92 Some(EvalError::NegLimit.into())
93 }
94 Ok(_) => None,
95 Err(e) => Some(e.into()),
96 }
97 });
98 err_collection = err_collection.concat(&errors);
99 }
100 }
101
102 let ok_result = match top_k_plan {
103 TopKPlan::MonotonicTop1(MonotonicTop1Plan {
104 group_key,
105 order_key,
106 must_consolidate,
107 }) => {
108 let (oks, errs) = self.render_top1_monotonic(
109 ok_input,
110 group_key,
111 order_key,
112 must_consolidate,
113 );
114 err_collection = err_collection.concat(&errs);
115 oks
116 }
117 TopKPlan::MonotonicTopK(MonotonicTopKPlan {
118 order_key,
119 group_key,
120 arity,
121 mut limit,
122 must_consolidate,
123 }) => {
124 if let Some(expr) = limit.as_mut() {
126 let mut map = BTreeMap::new();
127 for (index, column) in group_key.iter().enumerate() {
128 map.insert(*column, index);
129 }
130 expr.permute_map(&map);
131 }
132
133 let mut datum_vec = mz_repr::DatumVec::new();
135 let collection = ok_input
136 .map(move |row| {
137 let group_row = {
138 let datums = datum_vec.borrow_with(&row);
139 SharedRow::pack(group_key.iter().map(|i| datums[*i]))
140 };
141 (group_row, row)
142 })
143 .consolidate_named_if::<KeyBatcher<_, _, _>>(
144 must_consolidate,
145 "Consolidated MonotonicTopK input",
146 );
147
148 let error_logger = self.error_logger();
150 let (collection, errs) = collection.ensure_monotonic(move |data, diff| {
151 error_logger.log(
152 "Non-monotonic input to MonotonicTopK",
153 &format!("data={data:?}, diff={diff}"),
154 );
155 let m = "tried to build monotonic top-k on non-monotonic input".into();
156 (DataflowError::from(EvalError::Internal(m)), Diff::ONE)
157 });
158 err_collection = err_collection.concat(&errs);
159
160 let collection = if let Some(limit) = limit.clone() {
166 render_intra_ts_thinning(collection, order_key.clone(), limit)
167 } else {
168 collection
169 };
170
171 let pairer = Pairer::new(1);
172 let collection = collection.map(move |(group_row, row)| {
173 let hash = row.hashed();
174 let hash_key = pairer.merge(std::iter::once(Datum::from(hash)), &group_row);
175 (hash_key, row)
176 });
177
178 use differential_dataflow::operators::iterate::Variable;
187 let delay = std::time::Duration::from_secs(10);
188 let retractions = Variable::new(
189 &mut ok_input.scope(),
190 <G::Timestamp as crate::render::RenderTimestamp>::system_delay(
191 delay.try_into().expect("must fit"),
192 ),
193 );
194 let thinned = collection.concat(&retractions.negate());
195
196 let (result, errs) =
202 self.build_topk_stage(thinned, order_key, 1u64, 0, limit, arity, false);
203 let result = result.consolidate_named::<KeyBatcher<_, _, _>>(
205 "Monotonic TopK final consolidate",
206 );
207 retractions.set(&collection.concat(&result.negate()));
208 soft_assert_or_log!(
209 errs.is_none(),
210 "requested no validation, but received error collection"
211 );
212
213 result.map(|(_key_hash, row)| row)
214 }
215 TopKPlan::Basic(BasicTopKPlan {
216 group_key,
217 order_key,
218 offset,
219 mut limit,
220 arity,
221 buckets,
222 }) => {
223 if let Some(expr) = limit.as_mut() {
225 let mut map = BTreeMap::new();
226 for (index, column) in group_key.iter().enumerate() {
227 map.insert(*column, index);
228 }
229 expr.permute_map(&map);
230 }
231
232 let (oks, errs) = self.build_topk(
233 ok_input, group_key, order_key, offset, limit, arity, buckets,
234 );
235 err_collection = err_collection.concat(&errs);
236 oks
237 }
238 };
239
240 (ok_result.leave_region(), err_collection.leave_region())
242 });
243
244 CollectionBundle::from_collections(ok_result, err_collection)
245 }
246
247 fn build_topk<S>(
249 &self,
250 collection: Collection<S, Row, Diff>,
251 group_key: Vec<usize>,
252 order_key: Vec<mz_expr::ColumnOrder>,
253 offset: usize,
254 limit: Option<mz_expr::MirScalarExpr>,
255 arity: usize,
256 buckets: Vec<u64>,
257 ) -> (Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>)
258 where
259 S: Scope<Timestamp = G::Timestamp>,
260 {
261 let pairer = Pairer::new(1);
262 let mut datum_vec = mz_repr::DatumVec::new();
263 let mut collection = collection.map({
264 move |row| {
265 let group_row = {
266 let row_hash = row.hashed();
267 let datums = datum_vec.borrow_with(&row);
268 let iterator = group_key.iter().map(|i| datums[*i]);
269 pairer.merge(std::iter::once(Datum::from(row_hash)), iterator)
270 };
271 (group_row, row)
272 }
273 });
274
275 let mut validating = true;
276 let mut err_collection: Option<Collection<S, _, _>> = None;
277
278 if let Some(mut limit) = limit.clone() {
279 if offset > 0 {
282 let new_limit = (|| {
283 let limit = limit.as_literal_int64()?;
284 let offset = i64::try_from(offset).ok()?;
285 limit.checked_add(offset)
286 })();
287
288 if let Some(new_limit) = new_limit {
289 limit = MirScalarExpr::literal_ok(Datum::Int64(new_limit), ScalarType::Int64);
290 } else {
291 limit = limit.call_binary(
292 MirScalarExpr::literal_ok(
293 Datum::UInt64(u64::cast_from(offset)),
294 ScalarType::UInt64,
295 )
296 .call_unary(UnaryFunc::CastUint64ToInt64(CastUint64ToInt64)),
297 BinaryFunc::AddInt64,
298 );
299 }
300 }
301
302 for bucket in buckets.into_iter() {
306 let (oks, errs) = self.build_topk_stage(
310 collection,
311 order_key.clone(),
312 bucket,
313 0,
314 Some(limit.clone()),
315 arity,
316 validating,
317 );
318 collection = oks;
319 if validating {
320 err_collection = errs;
321 validating = false;
322 }
323 }
324 }
325
326 let (oks, errs) = self.build_topk_stage(
330 collection, order_key, 1u64, offset, limit, arity, validating,
331 );
332 let oks = oks.consolidate_named::<KeyBatcher<_, _, _>>("TopK final consolidate");
334 collection = oks;
335 if validating {
336 err_collection = errs;
337 }
338 (
339 collection.map(|(_key_hash, row)| row),
340 err_collection.expect("at least one stage validated its inputs"),
341 )
342 }
343
344 fn build_topk_stage<S>(
377 &self,
378 collection: Collection<S, (Row, Row), Diff>,
379 order_key: Vec<mz_expr::ColumnOrder>,
380 modulus: u64,
381 offset: usize,
382 limit: Option<mz_expr::MirScalarExpr>,
383 arity: usize,
384 validating: bool,
385 ) -> (
386 Collection<S, (Row, Row), Diff>,
387 Option<Collection<S, DataflowError, Diff>>,
388 )
389 where
390 S: Scope<Timestamp = G::Timestamp>,
391 {
392 let input = collection.map(move |(hash_key, row)| {
395 let mut hash_key_iter = hash_key.iter();
396 let hash = hash_key_iter.next().unwrap().unwrap_uint64() % modulus;
397 let hash_key = SharedRow::pack(std::iter::once(hash.into()).chain(hash_key_iter));
398 (hash_key, row)
399 });
400
401 let (input, oks, errs) = if validating {
403 let (input, stage) = build_topk_negated_stage::<
405 S,
406 RowValBuilder<_, _, _>,
407 RowValSpine<Result<Row, Row>, _, _>,
408 >(&input, order_key, offset, limit, arity);
409 let stage = stage.as_collection(|k, v| (k.to_row(), v.clone()));
410
411 let error_logger = self.error_logger();
413 type CB<C> = CapacityContainerBuilder<C>;
414 let (oks, errs) = stage.map_fallible::<CB<_>, CB<_>, _, _, _>(
415 "Demuxing Errors",
416 move |(hk, result)| match result {
417 Err(v) => {
418 let mut hk_iter = hk.iter();
419 let h = hk_iter.next().unwrap().unwrap_uint64();
420 let k = SharedRow::pack(hk_iter);
421 let message = "Negative multiplicities in TopK";
422 error_logger.log(message, &format!("k={k:?}, h={h}, v={v:?}"));
423 Err(EvalError::Internal(message.into()).into())
424 }
425 Ok(t) => Ok((hk, t)),
426 },
427 );
428 (input, oks, Some(errs))
429 } else {
430 let (input, stage) =
432 build_topk_negated_stage::<S, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
433 &input, order_key, offset, limit, arity,
434 );
435 let stage = stage.as_collection(|k, v| (k.to_row(), v.to_row()));
437
438 (input, stage, None)
439 };
440 let input = input.as_collection(|k, v| (k.to_row(), v.to_row()));
441 (oks.concat(&input), errs)
442 }
443
444 fn render_top1_monotonic<S>(
445 &self,
446 collection: Collection<S, Row, Diff>,
447 group_key: Vec<usize>,
448 order_key: Vec<mz_expr::ColumnOrder>,
449 must_consolidate: bool,
450 ) -> (Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>)
451 where
452 S: Scope<Timestamp = G::Timestamp>,
453 {
454 let collection = collection
459 .map({
460 let mut datum_vec = mz_repr::DatumVec::new();
461 move |row| {
462 let group_key = {
464 let datums = datum_vec.borrow_with(&row);
465 SharedRow::pack(group_key.iter().map(|i| datums[*i]))
466 };
467 (group_key, row)
468 }
469 })
470 .consolidate_named_if::<KeyBatcher<_, _, _>>(
471 must_consolidate,
472 "Consolidated MonotonicTop1 input",
473 );
474
475 let error_logger = self.error_logger();
477 let (partial, errs) = collection.ensure_monotonic(move |data, diff| {
478 error_logger.log(
479 "Non-monotonic input to MonotonicTop1",
480 &format!("data={data:?}, diff={diff}"),
481 );
482 let m = "tried to build monotonic top-1 on non-monotonic input".into();
483 (EvalError::Internal(m).into(), Diff::ONE)
484 });
485 let partial: KeyCollection<_, _, _> = partial
486 .explode_one(move |(group_key, row)| {
487 (
488 group_key,
489 monoids::Top1Monoid {
490 row,
491 order_key: order_key.clone(),
492 },
493 )
494 })
495 .into();
496 let result = partial
497 .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
498 "Arranged MonotonicTop1 partial [val: empty]",
499 )
500 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
501 "MonotonicTop1",
502 move |_key, input, output| {
503 let accum: &monoids::Top1Monoid = &input[0].1;
504 output.push((accum.row.clone(), Diff::ONE));
505 },
506 );
507 (result.as_collection(|_k, v| v.to_row()), errs)
509 }
510}
511
512fn build_topk_negated_stage<G, Bu, Tr>(
520 input: &Collection<G, (Row, Row), Diff>,
521 order_key: Vec<mz_expr::ColumnOrder>,
522 offset: usize,
523 limit: Option<mz_expr::MirScalarExpr>,
524 arity: usize,
525) -> (
526 Arranged<G, TraceAgent<RowRowSpine<G::Timestamp, Diff>>>,
527 Arranged<G, TraceAgent<Tr>>,
528)
529where
530 G: Scope,
531 G::Timestamp: MzTimestamp,
532 Bu: Builder<
533 Time = G::Timestamp,
534 Input: Container + PushInto<((Row, Tr::ValOwn), G::Timestamp, Diff)>,
535 Output = Tr::Batch,
536 >,
537 Tr: for<'a> Trace<
538 Key<'a> = DatumSeq<'a>,
539 KeyOwn = Row,
540 ValOwn: Data + MaybeValidatingRow<Row, Row>,
541 Time = G::Timestamp,
542 Diff = Diff,
543 > + 'static,
544 Arranged<G, TraceAgent<Tr>>: ArrangementSize,
545{
546 let mut datum_vec = mz_repr::DatumVec::new();
547
548 let arranged = input.mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
553 "Arranged TopK input",
554 );
555
556 let limit = limit.map(|l| match l.as_literal() {
558 Some(Ok(Datum::Null)) => Ok(Diff::MAX),
559 Some(Ok(d)) => Ok(Diff::from(d.unwrap_int64())),
560 _ => Err(l),
561 });
562
563 let reduced = arranged.mz_reduce_abelian::<_, Bu, Tr>("Reduced TopK input", {
564 move |mut hash_key, source, target: &mut Vec<(Tr::ValOwn, Diff)>| {
565 let limit = match &limit {
567 Some(Ok(lit)) => Some(*lit),
568 Some(Err(expr)) => {
569 let temp_storage = mz_repr::RowArena::new();
572 let _hash = hash_key.next();
573 let mut key_datums = datum_vec.borrow();
574 key_datums.extend(hash_key);
575 let datum_limit = expr
576 .eval(&key_datums, &temp_storage)
577 .unwrap_or(Datum::Int64(0));
578 Some(match datum_limit {
579 Datum::Null => Diff::MAX,
580 d => Diff::from(d.unwrap_int64()),
581 })
582 }
583 None => None,
584 };
585
586 if let Some(err) = Tr::ValOwn::into_error() {
587 for (datums, diff) in source.iter() {
588 if diff.is_positive() {
589 continue;
590 }
591 target.push((err((*datums).to_row()), Diff::ONE));
592 return;
593 }
594 }
595
596 let must_shrink = offset > 0
598 || limit
599 .map(|l| source.iter().map(|(_, d)| *d).sum::<Diff>() > l)
600 .unwrap_or(false);
601 if !must_shrink {
602 return;
603 }
604
605 target.reserve(source.len());
609 for (datums, diff) in source.iter() {
610 target.push((Tr::ValOwn::ok((*datums).to_row()), -diff));
611 }
612 let mut offset = offset;
614 let mut limit = limit;
615
616 let mut indexes = (0..source.len()).collect::<Vec<_>>();
618 let mut buffer = datum_vec.borrow();
621 for (index, (datums, _)) in source.iter().enumerate() {
622 buffer.extend(*datums);
623 assert_eq!(buffer.len(), arity * (index + 1));
624 }
625 let width = buffer.len() / source.len();
626
627 indexes.sort_by(|left, right| {
629 let left = &buffer[left * width..][..width];
630 let right = &buffer[right * width..][..width];
631 mz_expr::compare_columns(&order_key, left, right, || left.cmp(right))
634 });
635
636 for index in indexes.into_iter() {
639 let (datums, mut diff) = source[index];
640 if !diff.is_positive() {
641 continue;
642 }
643 if offset > 0 {
645 let to_skip =
646 std::cmp::min(offset, usize::try_from(diff.into_inner()).unwrap());
647 offset -= to_skip;
648 diff -= Diff::try_from(to_skip).unwrap();
649 }
650 if let Some(limit) = &mut limit {
652 diff = std::cmp::min(diff, Diff::from(*limit));
653 *limit -= diff;
654 }
655 if diff.is_positive() {
657 target.push((Tr::ValOwn::ok(datums.to_row()), diff));
660 }
661 }
662 }
663 });
664 (arranged, reduced)
665}
666
667fn render_intra_ts_thinning<S>(
668 collection: Collection<S, (Row, Row), Diff>,
669 order_key: Vec<mz_expr::ColumnOrder>,
670 limit: mz_expr::MirScalarExpr,
671) -> Collection<S, (Row, Row), Diff>
672where
673 S: Scope,
674 S::Timestamp: Lattice,
675{
676 let mut datum_vec = mz_repr::DatumVec::new();
677
678 let mut aggregates = BTreeMap::new();
679 let shared = Rc::new(RefCell::new(monoids::Top1MonoidShared {
680 order_key,
681 left: DatumVec::new(),
682 right: DatumVec::new(),
683 }));
684 collection
685 .inner
686 .unary_notify(
687 Pipeline,
688 "TopKIntraTimeThinning",
689 [],
690 move |input, output, notificator| {
691 while let Some((time, data)) = input.next() {
692 let agg_time = aggregates
693 .entry(time.time().clone())
694 .or_insert_with(BTreeMap::new);
695 for ((grp_row, row), record_time, diff) in data.drain(..) {
696 let monoid = monoids::Top1MonoidLocal {
697 row,
698 shared: Rc::clone(&shared),
699 };
700
701 let limit = if let Some(l) = limit.as_literal_int64() {
703 l
704 } else {
705 let temp_storage = mz_repr::RowArena::new();
706 let key_datums = datum_vec.borrow_with(&grp_row);
707 let datum_limit = limit
710 .eval(&key_datums, &temp_storage)
711 .unwrap_or(mz_repr::Datum::Int64(0));
712 if datum_limit == Datum::Null {
713 i64::MAX
714 } else {
715 datum_limit.unwrap_int64()
716 }
717 };
718
719 let topk = agg_time
720 .entry((grp_row, record_time))
721 .or_insert_with(move || topk_agg::TopKBatch::new(limit));
722 topk.update(monoid, diff.into_inner());
723 }
724 notificator.notify_at(time.retain());
725 }
726
727 notificator.for_each(|time, _, _| {
728 if let Some(aggs) = aggregates.remove(time.time()) {
729 let mut session = output.session(&time);
730 for ((grp_row, record_time), topk) in aggs {
731 session.give_iterator(topk.into_iter().map(|(monoid, diff)| {
732 (
733 (grp_row.clone(), monoid.into_row()),
734 record_time.clone(),
735 diff.into(),
736 )
737 }))
738 }
739 }
740 });
741 },
742 )
743 .as_collection()
744}
745
746pub mod topk_agg {
748 use differential_dataflow::consolidation;
749 use smallvec::SmallVec;
750
751 pub struct TopKBatch<T> {
758 updates: SmallVec<[(T, i64); 16]>,
759 clean: usize,
760 limit: i64,
761 }
762
763 impl<T: Ord> TopKBatch<T> {
764 pub fn new(limit: i64) -> Self {
765 Self {
766 updates: SmallVec::new(),
767 clean: 0,
768 limit,
769 }
770 }
771
772 #[inline]
779 pub fn update(&mut self, item: T, value: i64) {
780 self.updates.push((item, value));
781 self.maintain_bounds();
782 }
783
784 #[inline]
790 pub fn compact(&mut self) {
791 if self.clean < self.updates.len() && self.updates.len() > 1 {
792 let len = consolidation::consolidate_slice(&mut self.updates);
793 self.updates.truncate(len);
794
795 let mut limit = self.limit;
797 self.updates.retain(|x| {
798 if limit > 0 {
799 limit -= x.1;
800 true
801 } else {
802 false
803 }
804 });
805 if limit < 0 {
813 if let Some(item) = self.updates.last_mut() {
814 item.1 -= -limit;
817 }
818 }
819 }
820 self.clean = self.updates.len();
821 }
822
823 fn maintain_bounds(&mut self) {
826 if self.updates.len() > 32 && self.updates.len() >> 1 >= self.clean {
828 self.compact()
829 }
830 }
831 }
832
833 impl<T: Ord> IntoIterator for TopKBatch<T> {
834 type Item = (T, i64);
835 type IntoIter = smallvec::IntoIter<[(T, i64); 16]>;
836
837 fn into_iter(mut self) -> Self::IntoIter {
838 self.compact();
839 self.updates.into_iter()
840 }
841 }
842}
843
844pub mod monoids {
846 use std::cell::RefCell;
847 use std::cmp::Ordering;
848 use std::hash::{Hash, Hasher};
849 use std::rc::Rc;
850
851 use differential_dataflow::containers::{Columnation, Region};
852 use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
853 use mz_expr::ColumnOrder;
854 use mz_repr::{DatumVec, Diff, Row};
855 use serde::{Deserialize, Serialize};
856
857 #[derive(Eq, PartialEq, Debug, Serialize, Deserialize, Hash, Default)]
859 pub struct Top1Monoid {
860 pub row: Row,
861 pub order_key: Vec<ColumnOrder>,
862 }
863
864 impl Clone for Top1Monoid {
865 #[inline]
866 fn clone(&self) -> Self {
867 Self {
868 row: self.row.clone(),
869 order_key: self.order_key.clone(),
870 }
871 }
872
873 #[inline]
874 fn clone_from(&mut self, source: &Self) {
875 self.row.clone_from(&source.row);
876 self.order_key.clone_from(&source.order_key);
877 }
878 }
879
880 impl Multiply<Diff> for Top1Monoid {
881 type Output = Self;
882
883 fn multiply(self, factor: &Diff) -> Self {
884 assert!(factor.is_positive());
889 self
890 }
891 }
892
893 impl Ord for Top1Monoid {
894 fn cmp(&self, other: &Self) -> Ordering {
895 debug_assert_eq!(self.order_key, other.order_key);
896
897 let left: Vec<_> = self.row.unpack();
900 let right: Vec<_> = other.row.unpack();
901 mz_expr::compare_columns(&self.order_key, &left, &right, || left.cmp(&right))
902 }
903 }
904 impl PartialOrd for Top1Monoid {
905 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
906 Some(self.cmp(other))
907 }
908 }
909
910 impl Semigroup for Top1Monoid {
911 fn plus_equals(&mut self, rhs: &Self) {
912 let cmp = (*self).cmp(rhs);
913 if cmp == Ordering::Greater {
915 self.clone_from(rhs);
916 }
917 }
918 }
919
920 impl IsZero for Top1Monoid {
921 fn is_zero(&self) -> bool {
922 false
923 }
924 }
925
926 impl Columnation for Top1Monoid {
927 type InnerRegion = Top1MonoidRegion;
928 }
929
930 #[derive(Default)]
931 pub struct Top1MonoidRegion {
932 row_region: <Row as Columnation>::InnerRegion,
933 order_key_region: <Vec<ColumnOrder> as Columnation>::InnerRegion,
934 }
935
936 impl Region for Top1MonoidRegion {
937 type Item = Top1Monoid;
938
939 unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
940 let row = unsafe { self.row_region.copy(&item.row) };
941 let order_key = unsafe { self.order_key_region.copy(&item.order_key) };
942 Self::Item { row, order_key }
943 }
944
945 fn clear(&mut self) {
946 self.row_region.clear();
947 self.order_key_region.clear();
948 }
949
950 fn reserve_items<'a, I>(&mut self, items1: I)
951 where
952 Self: 'a,
953 I: Iterator<Item = &'a Self::Item> + Clone,
954 {
955 let items2 = items1.clone();
956 self.row_region
957 .reserve_items(items1.into_iter().map(|s| &s.row));
958 self.order_key_region
959 .reserve_items(items2.into_iter().map(|s| &s.order_key));
960 }
961
962 fn reserve_regions<'a, I>(&mut self, regions1: I)
963 where
964 Self: 'a,
965 I: Iterator<Item = &'a Self> + Clone,
966 {
967 let regions2 = regions1.clone();
968 self.row_region
969 .reserve_regions(regions1.into_iter().map(|s| &s.row_region));
970 self.order_key_region
971 .reserve_regions(regions2.into_iter().map(|s| &s.order_key_region));
972 }
973
974 fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
975 self.row_region.heap_size(&mut callback);
976 self.order_key_region.heap_size(callback);
977 }
978 }
979
980 #[derive(Debug)]
982 pub struct Top1MonoidShared {
983 pub order_key: Vec<ColumnOrder>,
984 pub left: DatumVec,
985 pub right: DatumVec,
986 }
987
988 #[derive(Debug, Clone)]
991 pub struct Top1MonoidLocal {
992 pub row: Row,
993 pub shared: Rc<RefCell<Top1MonoidShared>>,
994 }
995
996 impl Top1MonoidLocal {
997 pub fn into_row(self) -> Row {
998 self.row
999 }
1000 }
1001
1002 impl PartialEq for Top1MonoidLocal {
1003 fn eq(&self, other: &Self) -> bool {
1004 self.row.eq(&other.row)
1005 }
1006 }
1007
1008 impl Eq for Top1MonoidLocal {}
1009
1010 impl Hash for Top1MonoidLocal {
1011 fn hash<H: Hasher>(&self, state: &mut H) {
1012 self.row.hash(state);
1013 }
1014 }
1015
1016 impl Ord for Top1MonoidLocal {
1017 fn cmp(&self, other: &Self) -> Ordering {
1018 debug_assert!(Rc::ptr_eq(&self.shared, &other.shared));
1019 let Top1MonoidShared {
1020 left,
1021 right,
1022 order_key,
1023 } = &mut *self.shared.borrow_mut();
1024
1025 let left = left.borrow_with(&self.row);
1026 let right = right.borrow_with(&other.row);
1027 mz_expr::compare_columns(order_key, &left, &right, || left.cmp(&right))
1028 }
1029 }
1030
1031 impl PartialOrd for Top1MonoidLocal {
1032 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1033 Some(self.cmp(other))
1034 }
1035 }
1036
1037 impl Semigroup for Top1MonoidLocal {
1038 fn plus_equals(&mut self, rhs: &Self) {
1039 let cmp = (*self).cmp(rhs);
1040 if cmp == Ordering::Greater {
1042 self.clone_from(rhs);
1043 }
1044 }
1045 }
1046
1047 impl IsZero for Top1MonoidLocal {
1048 fn is_zero(&self) -> bool {
1049 false
1050 }
1051 }
1052}