1use columnar::Columnar;
15use differential_dataflow::IntoOwned;
16use differential_dataflow::containers::Columnation;
17use differential_dataflow::hashable::Hashable;
18use differential_dataflow::lattice::Lattice;
19use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
20use differential_dataflow::trace::{Batch, Builder, Trace, TraceReader};
21use differential_dataflow::{AsCollection, Collection};
22use mz_compute_types::plan::top_k::{
23 BasicTopKPlan, MonotonicTop1Plan, MonotonicTopKPlan, TopKPlan,
24};
25use mz_expr::func::CastUint64ToInt64;
26use mz_expr::{BinaryFunc, EvalError, MirScalarExpr, UnaryFunc};
27use mz_ore::cast::CastFrom;
28use mz_ore::soft_assert_or_log;
29use mz_repr::{Datum, DatumVec, Diff, Row, ScalarType, SharedRow};
30use mz_storage_types::errors::DataflowError;
31use mz_timely_util::operator::CollectionExt;
32use std::cell::RefCell;
33use std::collections::BTreeMap;
34use std::rc::Rc;
35use timely::Container;
36use timely::container::{CapacityContainerBuilder, PushInto};
37use timely::dataflow::Scope;
38use timely::dataflow::channels::pact::Pipeline;
39use timely::dataflow::operators::Operator;
40
41use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
42use crate::extensions::reduce::MzReduce;
43use crate::render::Pairer;
44use crate::render::context::{CollectionBundle, Context};
45use crate::render::errors::MaybeValidatingRow;
46use crate::row_spine::{
47 DatumSeq, RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowValBuilder, RowValSpine,
48};
49use crate::typedefs::{KeyBatcher, RowRowSpine, RowSpine};
50
51impl<G> Context<G>
53where
54 G: Scope,
55 G::Timestamp: crate::render::RenderTimestamp,
56 <G::Timestamp as Columnar>::Container: Clone + Send,
57{
58 pub(crate) fn render_topk(
59 &self,
60 input: CollectionBundle<G>,
61 top_k_plan: TopKPlan,
62 ) -> CollectionBundle<G> {
63 let (ok_input, err_input) = input.as_specific_collection(None, &self.config_set);
64
65 let (ok_result, err_collection) = ok_input.scope().region_named("TopK", |inner| {
67 let ok_input = ok_input.enter_region(inner);
68 let mut err_collection = err_input.enter_region(inner);
69
70 let limit_err = match &top_k_plan {
77 TopKPlan::MonotonicTop1(MonotonicTop1Plan { .. }) => None,
78 TopKPlan::MonotonicTopK(MonotonicTopKPlan { limit, .. }) => Some(limit),
79 TopKPlan::Basic(BasicTopKPlan { limit, .. }) => Some(limit),
80 };
81 if let Some(limit) = limit_err {
82 if let Some(expr) = limit {
83 let expr = expr.clone();
89 let mut datum_vec = mz_repr::DatumVec::new();
90 let errors = ok_input.flat_map(move |row| {
91 let temp_storage = mz_repr::RowArena::new();
92 let datums = datum_vec.borrow_with(&row);
93 match expr.eval(&datums[..], &temp_storage) {
94 Ok(l) if l != Datum::Null && l.unwrap_int64() < 0 => {
95 Some(EvalError::NegLimit.into())
96 }
97 Ok(_) => None,
98 Err(e) => Some(e.into()),
99 }
100 });
101 err_collection = err_collection.concat(&errors);
102 }
103 }
104
105 let ok_result = match top_k_plan {
106 TopKPlan::MonotonicTop1(MonotonicTop1Plan {
107 group_key,
108 order_key,
109 must_consolidate,
110 }) => {
111 let (oks, errs) = self.render_top1_monotonic(
112 ok_input,
113 group_key,
114 order_key,
115 must_consolidate,
116 );
117 err_collection = err_collection.concat(&errs);
118 oks
119 }
120 TopKPlan::MonotonicTopK(MonotonicTopKPlan {
121 order_key,
122 group_key,
123 arity,
124 mut limit,
125 must_consolidate,
126 }) => {
127 if let Some(expr) = limit.as_mut() {
129 let mut map = BTreeMap::new();
130 for (index, column) in group_key.iter().enumerate() {
131 map.insert(*column, index);
132 }
133 expr.permute_map(&map);
134 }
135
136 let mut datum_vec = mz_repr::DatumVec::new();
138 let collection = ok_input
139 .map(move |row| {
140 let group_row = {
141 let datums = datum_vec.borrow_with(&row);
142 SharedRow::pack(group_key.iter().map(|i| datums[*i]))
143 };
144 (group_row, row)
145 })
146 .consolidate_named_if::<KeyBatcher<_, _, _>>(
147 must_consolidate,
148 "Consolidated MonotonicTopK input",
149 );
150
151 let error_logger = self.error_logger();
153 let (collection, errs) = collection.ensure_monotonic(move |data, diff| {
154 error_logger.log(
155 "Non-monotonic input to MonotonicTopK",
156 &format!("data={data:?}, diff={diff}"),
157 );
158 let m = "tried to build monotonic top-k on non-monotonic input".into();
159 (DataflowError::from(EvalError::Internal(m)), Diff::ONE)
160 });
161 err_collection = err_collection.concat(&errs);
162
163 let collection = if let Some(limit) = limit.clone() {
169 render_intra_ts_thinning(collection, order_key.clone(), limit)
170 } else {
171 collection
172 };
173
174 let pairer = Pairer::new(1);
175 let collection = collection.map(move |(group_row, row)| {
176 let hash = row.hashed();
177 let hash_key = pairer.merge(std::iter::once(Datum::from(hash)), &group_row);
178 (hash_key, row)
179 });
180
181 use differential_dataflow::operators::iterate::Variable;
190 let delay = std::time::Duration::from_secs(10);
191 let retractions = Variable::new(
192 &mut ok_input.scope(),
193 <G::Timestamp as crate::render::RenderTimestamp>::system_delay(
194 delay.try_into().expect("must fit"),
195 ),
196 );
197 let thinned = collection.concat(&retractions.negate());
198
199 let (result, errs) =
205 self.build_topk_stage(thinned, order_key, 1u64, 0, limit, arity, false);
206 let result = result.consolidate_named::<KeyBatcher<_, _, _>>(
208 "Monotonic TopK final consolidate",
209 );
210 retractions.set(&collection.concat(&result.negate()));
211 soft_assert_or_log!(
212 errs.is_none(),
213 "requested no validation, but received error collection"
214 );
215
216 result.map(|(_key_hash, row)| row)
217 }
218 TopKPlan::Basic(BasicTopKPlan {
219 group_key,
220 order_key,
221 offset,
222 mut limit,
223 arity,
224 buckets,
225 }) => {
226 if let Some(expr) = limit.as_mut() {
228 let mut map = BTreeMap::new();
229 for (index, column) in group_key.iter().enumerate() {
230 map.insert(*column, index);
231 }
232 expr.permute_map(&map);
233 }
234
235 let (oks, errs) = self.build_topk(
236 ok_input, group_key, order_key, offset, limit, arity, buckets,
237 );
238 err_collection = err_collection.concat(&errs);
239 oks
240 }
241 };
242
243 (ok_result.leave_region(), err_collection.leave_region())
245 });
246
247 CollectionBundle::from_collections(ok_result, err_collection)
248 }
249
250 fn build_topk<S>(
252 &self,
253 collection: Collection<S, Row, Diff>,
254 group_key: Vec<usize>,
255 order_key: Vec<mz_expr::ColumnOrder>,
256 offset: usize,
257 limit: Option<mz_expr::MirScalarExpr>,
258 arity: usize,
259 buckets: Vec<u64>,
260 ) -> (Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>)
261 where
262 S: Scope<Timestamp = G::Timestamp>,
263 {
264 let pairer = Pairer::new(1);
265 let mut datum_vec = mz_repr::DatumVec::new();
266 let mut collection = collection.map({
267 move |row| {
268 let group_row = {
269 let row_hash = row.hashed();
270 let datums = datum_vec.borrow_with(&row);
271 let iterator = group_key.iter().map(|i| datums[*i]);
272 pairer.merge(std::iter::once(Datum::from(row_hash)), iterator)
273 };
274 (group_row, row)
275 }
276 });
277
278 let mut validating = true;
279 let mut err_collection: Option<Collection<S, _, _>> = None;
280
281 if let Some(mut limit) = limit.clone() {
282 if offset > 0 {
285 let new_limit = (|| {
286 let limit = limit.as_literal_int64()?;
287 let offset = i64::try_from(offset).ok()?;
288 limit.checked_add(offset)
289 })();
290
291 if let Some(new_limit) = new_limit {
292 limit = MirScalarExpr::literal_ok(Datum::Int64(new_limit), ScalarType::Int64);
293 } else {
294 limit = limit.call_binary(
295 MirScalarExpr::literal_ok(
296 Datum::UInt64(u64::cast_from(offset)),
297 ScalarType::UInt64,
298 )
299 .call_unary(UnaryFunc::CastUint64ToInt64(CastUint64ToInt64)),
300 BinaryFunc::AddInt64,
301 );
302 }
303 }
304
305 for bucket in buckets.into_iter() {
309 let (oks, errs) = self.build_topk_stage(
313 collection,
314 order_key.clone(),
315 bucket,
316 0,
317 Some(limit.clone()),
318 arity,
319 validating,
320 );
321 collection = oks;
322 if validating {
323 err_collection = errs;
324 validating = false;
325 }
326 }
327 }
328
329 let (oks, errs) = self.build_topk_stage(
333 collection, order_key, 1u64, offset, limit, arity, validating,
334 );
335 let oks = oks.consolidate_named::<KeyBatcher<_, _, _>>("TopK final consolidate");
337 collection = oks;
338 if validating {
339 err_collection = errs;
340 }
341 (
342 collection.map(|(_key_hash, row)| row),
343 err_collection.expect("at least one stage validated its inputs"),
344 )
345 }
346
347 fn build_topk_stage<S>(
380 &self,
381 collection: Collection<S, (Row, Row), Diff>,
382 order_key: Vec<mz_expr::ColumnOrder>,
383 modulus: u64,
384 offset: usize,
385 limit: Option<mz_expr::MirScalarExpr>,
386 arity: usize,
387 validating: bool,
388 ) -> (
389 Collection<S, (Row, Row), Diff>,
390 Option<Collection<S, DataflowError, Diff>>,
391 )
392 where
393 S: Scope<Timestamp = G::Timestamp>,
394 {
395 let input = collection.map(move |(hash_key, row)| {
398 let mut hash_key_iter = hash_key.iter();
399 let hash = hash_key_iter.next().unwrap().unwrap_uint64() % modulus;
400 let hash_key = SharedRow::pack(std::iter::once(hash.into()).chain(hash_key_iter));
401 (hash_key, row)
402 });
403
404 let (input, oks, errs) = if validating {
406 let (input, stage) = build_topk_negated_stage::<
408 S,
409 _,
410 RowValBuilder<_, _, _>,
411 RowValSpine<Result<Row, Row>, _, _>,
412 >(&input, order_key, offset, limit, arity);
413 let stage = stage.as_collection(|k, v| (k.into_owned(), v.clone()));
414
415 let error_logger = self.error_logger();
417 type CB<C> = CapacityContainerBuilder<C>;
418 let (oks, errs) = stage.map_fallible::<CB<_>, CB<_>, _, _, _>(
419 "Demuxing Errors",
420 move |(hk, result)| match result {
421 Err(v) => {
422 let mut hk_iter = hk.iter();
423 let h = hk_iter.next().unwrap().unwrap_uint64();
424 let k = SharedRow::pack(hk_iter);
425 let message = "Negative multiplicities in TopK";
426 error_logger.log(message, &format!("k={k:?}, h={h}, v={v:?}"));
427 Err(EvalError::Internal(message.into()).into())
428 }
429 Ok(t) => Ok((hk, t)),
430 },
431 );
432 (input, oks, Some(errs))
433 } else {
434 let (input, stage) =
436 build_topk_negated_stage::<S, _, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
437 &input, order_key, offset, limit, arity,
438 );
439 let stage = stage.as_collection(|k, v| (k.into_owned(), v.into_owned()));
441
442 (input, stage, None)
443 };
444 let input = input.as_collection(|k, v| (k.into_owned(), v.into_owned()));
445 (oks.concat(&input), errs)
446 }
447
448 fn render_top1_monotonic<S>(
449 &self,
450 collection: Collection<S, Row, Diff>,
451 group_key: Vec<usize>,
452 order_key: Vec<mz_expr::ColumnOrder>,
453 must_consolidate: bool,
454 ) -> (Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>)
455 where
456 S: Scope<Timestamp = G::Timestamp>,
457 {
458 let collection = collection
463 .map({
464 let mut datum_vec = mz_repr::DatumVec::new();
465 move |row| {
466 let group_key = {
468 let datums = datum_vec.borrow_with(&row);
469 SharedRow::pack(group_key.iter().map(|i| datums[*i]))
470 };
471 (group_key, row)
472 }
473 })
474 .consolidate_named_if::<KeyBatcher<_, _, _>>(
475 must_consolidate,
476 "Consolidated MonotonicTop1 input",
477 );
478
479 let error_logger = self.error_logger();
481 let (partial, errs) = collection.ensure_monotonic(move |data, diff| {
482 error_logger.log(
483 "Non-monotonic input to MonotonicTop1",
484 &format!("data={data:?}, diff={diff}"),
485 );
486 let m = "tried to build monotonic top-1 on non-monotonic input".into();
487 (EvalError::Internal(m).into(), Diff::ONE)
488 });
489 let partial: KeyCollection<_, _, _> = partial
490 .explode_one(move |(group_key, row)| {
491 (
492 group_key,
493 monoids::Top1Monoid {
494 row,
495 order_key: order_key.clone(),
496 },
497 )
498 })
499 .into();
500 let result = partial
501 .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
502 "Arranged MonotonicTop1 partial [val: empty]",
503 )
504 .mz_reduce_abelian::<_, _, _, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
505 "MonotonicTop1",
506 move |_key, input, output| {
507 let accum: &monoids::Top1Monoid = &input[0].1;
508 output.push((accum.row.clone(), Diff::ONE));
509 },
510 );
511 (result.as_collection(|_k, v| v.into_owned()), errs)
513 }
514}
515
516fn build_topk_negated_stage<G, V, Bu, Tr>(
524 input: &Collection<G, (Row, Row), Diff>,
525 order_key: Vec<mz_expr::ColumnOrder>,
526 offset: usize,
527 limit: Option<mz_expr::MirScalarExpr>,
528 arity: usize,
529) -> (
530 Arranged<G, TraceAgent<RowRowSpine<G::Timestamp, Diff>>>,
531 Arranged<G, TraceAgent<Tr>>,
532)
533where
534 G: Scope,
535 G::Timestamp: Lattice + Columnation,
536 V: MaybeValidatingRow<Row, Row>,
537 Bu: Builder<Time = G::Timestamp, Output = Tr::Batch>,
538 Bu::Input: Container + PushInto<((Row, V), G::Timestamp, Diff)>,
539 Tr: Trace
540 + for<'a> TraceReader<Key<'a> = DatumSeq<'a>, Time = G::Timestamp, Diff = Diff>
541 + 'static,
542 for<'a> Tr::Val<'a>: IntoOwned<'a, Owned = V>,
543 Tr::Batch: Batch,
544 Arranged<G, TraceAgent<Tr>>: ArrangementSize,
545{
546 let mut datum_vec = mz_repr::DatumVec::new();
547
548 let arranged = input.mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
553 "Arranged TopK input",
554 );
555
556 let reduced = arranged.mz_reduce_abelian::<_, _, _, Bu, Tr>("Reduced TopK input", {
557 move |mut hash_key, source, target: &mut Vec<(V, Diff)>| {
558 let limit: Option<Diff> = limit.as_ref().map(|l| {
560 if let Some(l) = l.as_literal_int64() {
561 l.into()
562 } else {
563 let temp_storage = mz_repr::RowArena::new();
566 let _hash = hash_key.next();
567 let mut key_datums = datum_vec.borrow();
568 key_datums.extend(hash_key);
569 let datum_limit = l
570 .eval(&key_datums, &temp_storage)
571 .unwrap_or(Datum::Int64(0));
572 if datum_limit == Datum::Null {
573 Diff::MAX
574 } else {
575 datum_limit.unwrap_int64().into()
576 }
577 }
578 });
579
580 if let Some(err) = V::into_error() {
581 for (datums, diff) in source.iter() {
582 if diff.is_positive() {
583 continue;
584 }
585 target.push((err((*datums).into_owned()), Diff::ONE));
586 return;
587 }
588 }
589
590 let must_shrink = offset > 0
592 || limit
593 .map(|l| source.iter().map(|(_, d)| *d).sum::<Diff>() > l)
594 .unwrap_or(false);
595 if !must_shrink {
596 return;
597 }
598
599 target.reserve(source.len());
603 for (datums, diff) in source.iter() {
604 target.push((V::ok((*datums).into_owned()), -diff));
605 }
606 let mut offset = offset;
608 let mut limit = limit;
609
610 let mut indexes = (0..source.len()).collect::<Vec<_>>();
612 let mut buffer = datum_vec.borrow();
615 for (index, (datums, _)) in source.iter().enumerate() {
616 buffer.extend(*datums);
617 assert_eq!(buffer.len(), arity * (index + 1));
618 }
619 let width = buffer.len() / source.len();
620
621 indexes.sort_by(|left, right| {
623 let left = &buffer[left * width..][..width];
624 let right = &buffer[right * width..][..width];
625 mz_expr::compare_columns(&order_key, left, right, || left.cmp(right))
628 });
629
630 for index in indexes.into_iter() {
633 let (datums, mut diff) = source[index];
634 if !diff.is_positive() {
635 continue;
636 }
637 if offset > 0 {
639 let to_skip =
640 std::cmp::min(offset, usize::try_from(diff.into_inner()).unwrap());
641 offset -= to_skip;
642 diff -= Diff::try_from(to_skip).unwrap();
643 }
644 if let Some(limit) = &mut limit {
646 diff = std::cmp::min(diff, Diff::from(*limit));
647 *limit -= diff;
648 }
649 if diff.is_positive() {
651 target.push((V::ok(datums.into_owned()), diff));
654 }
655 }
656 }
657 });
658 (arranged, reduced)
659}
660
661fn render_intra_ts_thinning<S>(
662 collection: Collection<S, (Row, Row), Diff>,
663 order_key: Vec<mz_expr::ColumnOrder>,
664 limit: mz_expr::MirScalarExpr,
665) -> Collection<S, (Row, Row), Diff>
666where
667 S: Scope,
668 S::Timestamp: Lattice,
669{
670 let mut datum_vec = mz_repr::DatumVec::new();
671
672 let mut aggregates = BTreeMap::new();
673 let shared = Rc::new(RefCell::new(monoids::Top1MonoidShared {
674 order_key,
675 left: DatumVec::new(),
676 right: DatumVec::new(),
677 }));
678 collection
679 .inner
680 .unary_notify(
681 Pipeline,
682 "TopKIntraTimeThinning",
683 [],
684 move |input, output, notificator| {
685 while let Some((time, data)) = input.next() {
686 let agg_time = aggregates
687 .entry(time.time().clone())
688 .or_insert_with(BTreeMap::new);
689 for ((grp_row, row), record_time, diff) in data.drain(..) {
690 let monoid = monoids::Top1MonoidLocal {
691 row,
692 shared: Rc::clone(&shared),
693 };
694
695 let limit = if let Some(l) = limit.as_literal_int64() {
697 l
698 } else {
699 let temp_storage = mz_repr::RowArena::new();
700 let key_datums = datum_vec.borrow_with(&grp_row);
701 let datum_limit = limit
704 .eval(&key_datums, &temp_storage)
705 .unwrap_or(mz_repr::Datum::Int64(0));
706 if datum_limit == Datum::Null {
707 i64::MAX
708 } else {
709 datum_limit.unwrap_int64()
710 }
711 };
712
713 let topk = agg_time
714 .entry((grp_row, record_time))
715 .or_insert_with(move || topk_agg::TopKBatch::new(limit));
716 topk.update(monoid, diff.into_inner());
717 }
718 notificator.notify_at(time.retain());
719 }
720
721 notificator.for_each(|time, _, _| {
722 if let Some(aggs) = aggregates.remove(time.time()) {
723 let mut session = output.session(&time);
724 for ((grp_row, record_time), topk) in aggs {
725 session.give_iterator(topk.into_iter().map(|(monoid, diff)| {
726 (
727 (grp_row.clone(), monoid.into_row()),
728 record_time.clone(),
729 diff.into(),
730 )
731 }))
732 }
733 }
734 });
735 },
736 )
737 .as_collection()
738}
739
740pub mod topk_agg {
742 use differential_dataflow::consolidation;
743 use smallvec::SmallVec;
744
745 pub struct TopKBatch<T> {
752 updates: SmallVec<[(T, i64); 16]>,
753 clean: usize,
754 limit: i64,
755 }
756
757 impl<T: Ord> TopKBatch<T> {
758 pub fn new(limit: i64) -> Self {
759 Self {
760 updates: SmallVec::new(),
761 clean: 0,
762 limit,
763 }
764 }
765
766 #[inline]
773 pub fn update(&mut self, item: T, value: i64) {
774 self.updates.push((item, value));
775 self.maintain_bounds();
776 }
777
778 #[inline]
784 pub fn compact(&mut self) {
785 if self.clean < self.updates.len() && self.updates.len() > 1 {
786 let len = consolidation::consolidate_slice(&mut self.updates);
787 self.updates.truncate(len);
788
789 let mut limit = self.limit;
791 self.updates.retain(|x| {
792 if limit > 0 {
793 limit -= x.1;
794 true
795 } else {
796 false
797 }
798 });
799 if limit < 0 {
807 if let Some(item) = self.updates.last_mut() {
808 item.1 -= -limit;
811 }
812 }
813 }
814 self.clean = self.updates.len();
815 }
816
817 fn maintain_bounds(&mut self) {
820 if self.updates.len() > 32 && self.updates.len() >> 1 >= self.clean {
822 self.compact()
823 }
824 }
825 }
826
827 impl<T: Ord> IntoIterator for TopKBatch<T> {
828 type Item = (T, i64);
829 type IntoIter = smallvec::IntoIter<[(T, i64); 16]>;
830
831 fn into_iter(mut self) -> Self::IntoIter {
832 self.compact();
833 self.updates.into_iter()
834 }
835 }
836}
837
838pub mod monoids {
840 use std::cell::RefCell;
841 use std::cmp::Ordering;
842 use std::hash::{Hash, Hasher};
843 use std::rc::Rc;
844
845 use differential_dataflow::containers::{Columnation, Region};
846 use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
847 use mz_expr::ColumnOrder;
848 use mz_repr::{DatumVec, Diff, Row};
849 use serde::{Deserialize, Serialize};
850
851 #[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Hash, Default)]
853 pub struct Top1Monoid {
854 pub row: Row,
855 pub order_key: Vec<ColumnOrder>,
856 }
857
858 impl Multiply<Diff> for Top1Monoid {
859 type Output = Self;
860
861 fn multiply(self, factor: &Diff) -> Self {
862 assert!(factor.is_positive());
867 self
868 }
869 }
870
871 impl Ord for Top1Monoid {
872 fn cmp(&self, other: &Self) -> Ordering {
873 debug_assert_eq!(self.order_key, other.order_key);
874
875 let left: Vec<_> = self.row.unpack();
878 let right: Vec<_> = other.row.unpack();
879 mz_expr::compare_columns(&self.order_key, &left, &right, || left.cmp(&right))
880 }
881 }
882 impl PartialOrd for Top1Monoid {
883 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
884 Some(self.cmp(other))
885 }
886 }
887
888 impl Semigroup for Top1Monoid {
889 fn plus_equals(&mut self, rhs: &Self) {
890 let cmp = (*self).cmp(rhs);
891 if cmp == Ordering::Greater {
893 self.clone_from(rhs);
894 }
895 }
896 }
897
898 impl IsZero for Top1Monoid {
899 fn is_zero(&self) -> bool {
900 false
901 }
902 }
903
904 impl Columnation for Top1Monoid {
905 type InnerRegion = Top1MonoidRegion;
906 }
907
908 #[derive(Default)]
909 pub struct Top1MonoidRegion {
910 row_region: <Row as Columnation>::InnerRegion,
911 order_key_region: <Vec<ColumnOrder> as Columnation>::InnerRegion,
912 }
913
914 impl Region for Top1MonoidRegion {
915 type Item = Top1Monoid;
916
917 unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
918 let row = unsafe { self.row_region.copy(&item.row) };
919 let order_key = unsafe { self.order_key_region.copy(&item.order_key) };
920 Self::Item { row, order_key }
921 }
922
923 fn clear(&mut self) {
924 self.row_region.clear();
925 self.order_key_region.clear();
926 }
927
928 fn reserve_items<'a, I>(&mut self, items1: I)
929 where
930 Self: 'a,
931 I: Iterator<Item = &'a Self::Item> + Clone,
932 {
933 let items2 = items1.clone();
934 self.row_region
935 .reserve_items(items1.into_iter().map(|s| &s.row));
936 self.order_key_region
937 .reserve_items(items2.into_iter().map(|s| &s.order_key));
938 }
939
940 fn reserve_regions<'a, I>(&mut self, regions1: I)
941 where
942 Self: 'a,
943 I: Iterator<Item = &'a Self> + Clone,
944 {
945 let regions2 = regions1.clone();
946 self.row_region
947 .reserve_regions(regions1.into_iter().map(|s| &s.row_region));
948 self.order_key_region
949 .reserve_regions(regions2.into_iter().map(|s| &s.order_key_region));
950 }
951
952 fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
953 self.row_region.heap_size(&mut callback);
954 self.order_key_region.heap_size(callback);
955 }
956 }
957
958 #[derive(Debug)]
960 pub struct Top1MonoidShared {
961 pub order_key: Vec<ColumnOrder>,
962 pub left: DatumVec,
963 pub right: DatumVec,
964 }
965
966 #[derive(Debug, Clone)]
969 pub struct Top1MonoidLocal {
970 pub row: Row,
971 pub shared: Rc<RefCell<Top1MonoidShared>>,
972 }
973
974 impl Top1MonoidLocal {
975 pub fn into_row(self) -> Row {
976 self.row
977 }
978 }
979
980 impl PartialEq for Top1MonoidLocal {
981 fn eq(&self, other: &Self) -> bool {
982 self.row.eq(&other.row)
983 }
984 }
985
986 impl Eq for Top1MonoidLocal {}
987
988 impl Hash for Top1MonoidLocal {
989 fn hash<H: Hasher>(&self, state: &mut H) {
990 self.row.hash(state);
991 }
992 }
993
994 impl Ord for Top1MonoidLocal {
995 fn cmp(&self, other: &Self) -> Ordering {
996 debug_assert!(Rc::ptr_eq(&self.shared, &other.shared));
997 let Top1MonoidShared {
998 left,
999 right,
1000 order_key,
1001 } = &mut *self.shared.borrow_mut();
1002
1003 let left = left.borrow_with(&self.row);
1004 let right = right.borrow_with(&other.row);
1005 mz_expr::compare_columns(order_key, &left, &right, || left.cmp(&right))
1006 }
1007 }
1008
1009 impl PartialOrd for Top1MonoidLocal {
1010 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1011 Some(self.cmp(other))
1012 }
1013 }
1014
1015 impl Semigroup for Top1MonoidLocal {
1016 fn plus_equals(&mut self, rhs: &Self) {
1017 let cmp = (*self).cmp(rhs);
1018 if cmp == Ordering::Greater {
1020 self.clone_from(rhs);
1021 }
1022 }
1023 }
1024
1025 impl IsZero for Top1MonoidLocal {
1026 fn is_zero(&self) -> bool {
1027 false
1028 }
1029 }
1030}