1use std::cell::RefCell;
15use std::collections::BTreeMap;
16use std::rc::Rc;
17
18use differential_dataflow::AsCollection;
19use differential_dataflow::hashable::Hashable;
20use differential_dataflow::lattice::Lattice;
21use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
22use differential_dataflow::operators::iterate::SemigroupVariable;
23use differential_dataflow::trace::implementations::merge_batcher::container::MergerChunk;
24use differential_dataflow::trace::{Builder, Trace};
25use differential_dataflow::{Data, VecCollection};
26use mz_compute_types::plan::top_k::{
27 BasicTopKPlan, MonotonicTop1Plan, MonotonicTopKPlan, TopKPlan,
28};
29use mz_expr::func::CastUint64ToInt64;
30use mz_expr::{BinaryFunc, EvalError, MirScalarExpr, UnaryFunc, func};
31use mz_ore::cast::CastFrom;
32use mz_ore::soft_assert_or_log;
33use mz_repr::{Datum, DatumVec, Diff, Row, SharedRow, SqlScalarType};
34use mz_storage_types::errors::DataflowError;
35use mz_timely_util::operator::CollectionExt;
36use timely::Container;
37use timely::container::{CapacityContainerBuilder, PushInto};
38use timely::dataflow::Scope;
39use timely::dataflow::channels::pact::Pipeline;
40use timely::dataflow::operators::Operator;
41
42use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
43use crate::extensions::reduce::MzReduce;
44use crate::render::Pairer;
45use crate::render::context::{CollectionBundle, Context};
46use crate::render::errors::MaybeValidatingRow;
47use crate::row_spine::{
48 DatumSeq, RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowValBuilder, RowValSpine,
49};
50use crate::typedefs::{KeyBatcher, MzTimestamp, RowRowSpine, RowSpine};
51
52impl<G> Context<G>
54where
55 G: Scope,
56 G::Timestamp: crate::render::RenderTimestamp,
57{
58 pub(crate) fn render_topk(
59 &self,
60 input: CollectionBundle<G>,
61 top_k_plan: TopKPlan,
62 ) -> CollectionBundle<G> {
63 let (ok_input, err_input) = input.as_specific_collection(None, &self.config_set);
64
65 let (ok_result, err_collection) = ok_input.scope().region_named("TopK", |inner| {
67 let ok_input = ok_input.enter_region(inner);
68 let mut err_collection = err_input.enter_region(inner);
69
70 match top_k_plan.limit().map(|l| (l.as_literal(), l)) {
78 None => {}
79 Some((Some(Ok(literal)), _))
80 if literal == Datum::Null || literal.unwrap_int64() >= 0 => {}
81 Some((_, expr)) => {
82 let expr = expr.clone();
88 let mut datum_vec = mz_repr::DatumVec::new();
89 let errors = ok_input.flat_map(move |row| {
90 let temp_storage = mz_repr::RowArena::new();
91 let datums = datum_vec.borrow_with(&row);
92 match expr.eval(&datums[..], &temp_storage) {
93 Ok(l) if l != Datum::Null && l.unwrap_int64() < 0 => {
94 Some(EvalError::NegLimit.into())
95 }
96 Ok(_) => None,
97 Err(e) => Some(e.into()),
98 }
99 });
100 err_collection = err_collection.concat(&errors);
101 }
102 }
103
104 let ok_result = match top_k_plan {
105 TopKPlan::MonotonicTop1(MonotonicTop1Plan {
106 group_key,
107 order_key,
108 must_consolidate,
109 }) => {
110 let (oks, errs) = self.render_top1_monotonic(
111 ok_input,
112 group_key,
113 order_key,
114 must_consolidate,
115 );
116 err_collection = err_collection.concat(&errs);
117 oks
118 }
119 TopKPlan::MonotonicTopK(MonotonicTopKPlan {
120 order_key,
121 group_key,
122 arity,
123 mut limit,
124 must_consolidate,
125 }) => {
126 if let Some(expr) = limit.as_mut() {
128 let mut map = BTreeMap::new();
129 for (index, column) in group_key.iter().enumerate() {
130 map.insert(*column, index);
131 }
132 expr.permute_map(&map);
133 }
134
135 let mut datum_vec = mz_repr::DatumVec::new();
137 let collection = ok_input
138 .map(move |row| {
139 let group_row = {
140 let datums = datum_vec.borrow_with(&row);
141 SharedRow::pack(group_key.iter().map(|i| datums[*i]))
142 };
143 (group_row, row)
144 })
145 .consolidate_named_if::<KeyBatcher<_, _, _>>(
146 must_consolidate,
147 "Consolidated MonotonicTopK input",
148 );
149
150 let error_logger = self.error_logger();
152 let (collection, errs) = collection.ensure_monotonic(move |data, diff| {
153 error_logger.log(
154 "Non-monotonic input to MonotonicTopK",
155 &format!("data={data:?}, diff={diff}"),
156 );
157 let m = "tried to build monotonic top-k on non-monotonic input".into();
158 (DataflowError::from(EvalError::Internal(m)), Diff::ONE)
159 });
160 err_collection = err_collection.concat(&errs);
161
162 let collection = if let Some(limit) = limit.clone() {
168 render_intra_ts_thinning(collection, order_key.clone(), limit)
169 } else {
170 collection
171 };
172
173 let pairer = Pairer::new(1);
174 let collection = collection.map(move |(group_row, row)| {
175 let hash = row.hashed();
176 let hash_key = pairer.merge(std::iter::once(Datum::from(hash)), &group_row);
177 (hash_key, row)
178 });
179
180 let delay = std::time::Duration::from_secs(10);
189 let retractions = SemigroupVariable::new(
190 &mut ok_input.scope(),
191 <G::Timestamp as crate::render::RenderTimestamp>::system_delay(
192 delay.try_into().expect("must fit"),
193 ),
194 );
195 let thinned = collection.concat(&retractions.negate());
196
197 let (result, errs) =
203 self.build_topk_stage(thinned, order_key, 1u64, 0, limit, arity, false);
204 let result = result.consolidate_named::<KeyBatcher<_, _, _>>(
206 "Monotonic TopK final consolidate",
207 );
208 retractions.set(&collection.concat(&result.negate()));
209 soft_assert_or_log!(
210 errs.is_none(),
211 "requested no validation, but received error collection"
212 );
213
214 result.map(|(_key_hash, row)| row)
215 }
216 TopKPlan::Basic(BasicTopKPlan {
217 group_key,
218 order_key,
219 offset,
220 mut limit,
221 arity,
222 buckets,
223 }) => {
224 if let Some(expr) = limit.as_mut() {
226 let mut map = BTreeMap::new();
227 for (index, column) in group_key.iter().enumerate() {
228 map.insert(*column, index);
229 }
230 expr.permute_map(&map);
231 }
232
233 let (oks, errs) = self.build_topk(
234 ok_input, group_key, order_key, offset, limit, arity, buckets,
235 );
236 err_collection = err_collection.concat(&errs);
237 oks
238 }
239 };
240
241 (ok_result.leave_region(), err_collection.leave_region())
243 });
244
245 CollectionBundle::from_collections(ok_result, err_collection)
246 }
247
248 fn build_topk<S>(
250 &self,
251 collection: VecCollection<S, Row, Diff>,
252 group_key: Vec<usize>,
253 order_key: Vec<mz_expr::ColumnOrder>,
254 offset: usize,
255 limit: Option<mz_expr::MirScalarExpr>,
256 arity: usize,
257 buckets: Vec<u64>,
258 ) -> (
259 VecCollection<S, Row, Diff>,
260 VecCollection<S, DataflowError, Diff>,
261 )
262 where
263 S: Scope<Timestamp = G::Timestamp>,
264 {
265 let pairer = Pairer::new(1);
266 let mut datum_vec = mz_repr::DatumVec::new();
267 let mut collection = collection.map({
268 move |row| {
269 let group_row = {
270 let row_hash = row.hashed();
271 let datums = datum_vec.borrow_with(&row);
272 let iterator = group_key.iter().map(|i| datums[*i]);
273 pairer.merge(std::iter::once(Datum::from(row_hash)), iterator)
274 };
275 (group_row, row)
276 }
277 });
278
279 let mut validating = true;
280 let mut err_collection: Option<VecCollection<S, _, _>> = None;
281
282 if let Some(mut limit) = limit.clone() {
283 if offset > 0 {
286 let new_limit = (|| {
287 let limit = limit.as_literal_int64()?;
288 let offset = i64::try_from(offset).ok()?;
289 limit.checked_add(offset)
290 })();
291
292 if let Some(new_limit) = new_limit {
293 limit =
294 MirScalarExpr::literal_ok(Datum::Int64(new_limit), SqlScalarType::Int64);
295 } else {
296 limit = limit.call_binary(
297 MirScalarExpr::literal_ok(
298 Datum::UInt64(u64::cast_from(offset)),
299 SqlScalarType::UInt64,
300 )
301 .call_unary(UnaryFunc::CastUint64ToInt64(CastUint64ToInt64)),
302 BinaryFunc::AddInt64(func::AddInt64),
303 );
304 }
305 }
306
307 for bucket in buckets.into_iter() {
311 let (oks, errs) = self.build_topk_stage(
315 collection,
316 order_key.clone(),
317 bucket,
318 0,
319 Some(limit.clone()),
320 arity,
321 validating,
322 );
323 collection = oks;
324 if validating {
325 err_collection = errs;
326 validating = false;
327 }
328 }
329 }
330
331 let (oks, errs) = self.build_topk_stage(
335 collection, order_key, 1u64, offset, limit, arity, validating,
336 );
337 let oks = oks.consolidate_named::<KeyBatcher<_, _, _>>("TopK final consolidate");
339 collection = oks;
340 if validating {
341 err_collection = errs;
342 }
343 (
344 collection.map(|(_key_hash, row)| row),
345 err_collection.expect("at least one stage validated its inputs"),
346 )
347 }
348
349 fn build_topk_stage<S>(
382 &self,
383 collection: VecCollection<S, (Row, Row), Diff>,
384 order_key: Vec<mz_expr::ColumnOrder>,
385 modulus: u64,
386 offset: usize,
387 limit: Option<mz_expr::MirScalarExpr>,
388 arity: usize,
389 validating: bool,
390 ) -> (
391 VecCollection<S, (Row, Row), Diff>,
392 Option<VecCollection<S, DataflowError, Diff>>,
393 )
394 where
395 S: Scope<Timestamp = G::Timestamp>,
396 {
397 let input = collection.map(move |(hash_key, row)| {
400 let mut hash_key_iter = hash_key.iter();
401 let hash = hash_key_iter.next().unwrap().unwrap_uint64() % modulus;
402 let hash_key = SharedRow::pack(std::iter::once(hash.into()).chain(hash_key_iter));
403 (hash_key, row)
404 });
405
406 let (input, oks, errs) = if validating {
408 let (input, stage) = build_topk_negated_stage::<
410 S,
411 RowValBuilder<_, _, _>,
412 RowValSpine<Result<Row, Row>, _, _>,
413 >(&input, order_key, offset, limit, arity);
414 let stage = stage.as_collection(|k, v| (k.to_row(), v.clone()));
415
416 let error_logger = self.error_logger();
418 type CB<C> = CapacityContainerBuilder<C>;
419 let (oks, errs) = stage.map_fallible::<CB<_>, CB<_>, _, _, _>(
420 "Demuxing Errors",
421 move |(hk, result)| match result {
422 Err(v) => {
423 let mut hk_iter = hk.iter();
424 let h = hk_iter.next().unwrap().unwrap_uint64();
425 let k = SharedRow::pack(hk_iter);
426 let message = "Negative multiplicities in TopK";
427 error_logger.log(message, &format!("k={k:?}, h={h}, v={v:?}"));
428 Err(EvalError::Internal(message.into()).into())
429 }
430 Ok(t) => Ok((hk, t)),
431 },
432 );
433 (input, oks, Some(errs))
434 } else {
435 let (input, stage) =
437 build_topk_negated_stage::<S, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
438 &input, order_key, offset, limit, arity,
439 );
440 let stage = stage.as_collection(|k, v| (k.to_row(), v.to_row()));
442
443 (input, stage, None)
444 };
445 let input = input.as_collection(|k, v| (k.to_row(), v.to_row()));
446 (oks.concat(&input), errs)
447 }
448
449 fn render_top1_monotonic<S>(
450 &self,
451 collection: VecCollection<S, Row, Diff>,
452 group_key: Vec<usize>,
453 order_key: Vec<mz_expr::ColumnOrder>,
454 must_consolidate: bool,
455 ) -> (
456 VecCollection<S, Row, Diff>,
457 VecCollection<S, DataflowError, Diff>,
458 )
459 where
460 S: Scope<Timestamp = G::Timestamp>,
461 {
462 let collection = collection
467 .map({
468 let mut datum_vec = mz_repr::DatumVec::new();
469 move |row| {
470 let group_key = {
472 let datums = datum_vec.borrow_with(&row);
473 SharedRow::pack(group_key.iter().map(|i| datums[*i]))
474 };
475 (group_key, row)
476 }
477 })
478 .consolidate_named_if::<KeyBatcher<_, _, _>>(
479 must_consolidate,
480 "Consolidated MonotonicTop1 input",
481 );
482
483 let error_logger = self.error_logger();
485 let (partial, errs) = collection.ensure_monotonic(move |data, diff| {
486 error_logger.log(
487 "Non-monotonic input to MonotonicTop1",
488 &format!("data={data:?}, diff={diff}"),
489 );
490 let m = "tried to build monotonic top-1 on non-monotonic input".into();
491 (EvalError::Internal(m).into(), Diff::ONE)
492 });
493 let partial: KeyCollection<_, _, _> = partial
494 .explode_one(move |(group_key, row)| {
495 (
496 group_key,
497 monoids::Top1Monoid {
498 row,
499 order_key: order_key.clone(),
500 },
501 )
502 })
503 .into();
504 let result = partial
505 .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
506 "Arranged MonotonicTop1 partial [val: empty]",
507 )
508 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
509 "MonotonicTop1",
510 move |_key, input, output| {
511 let accum: &monoids::Top1Monoid = &input[0].1;
512 output.push((accum.row.clone(), Diff::ONE));
513 },
514 );
515 (result.as_collection(|_k, v| v.to_row()), errs)
517 }
518}
519
520fn build_topk_negated_stage<G, Bu, Tr>(
528 input: &VecCollection<G, (Row, Row), Diff>,
529 order_key: Vec<mz_expr::ColumnOrder>,
530 offset: usize,
531 limit: Option<mz_expr::MirScalarExpr>,
532 arity: usize,
533) -> (
534 Arranged<G, TraceAgent<RowRowSpine<G::Timestamp, Diff>>>,
535 Arranged<G, TraceAgent<Tr>>,
536)
537where
538 G: Scope,
539 G::Timestamp: MzTimestamp,
540 Bu: Builder<
541 Time = G::Timestamp,
542 Input: Container + MergerChunk + PushInto<((Row, Tr::ValOwn), G::Timestamp, Diff)>,
543 Output = Tr::Batch,
544 >,
545 Tr: for<'a> Trace<
546 Key<'a> = DatumSeq<'a>,
547 KeyOwn = Row,
548 ValOwn: Data + MaybeValidatingRow<Row, Row>,
549 Time = G::Timestamp,
550 Diff = Diff,
551 > + 'static,
552 Arranged<G, TraceAgent<Tr>>: ArrangementSize,
553{
554 let mut datum_vec = mz_repr::DatumVec::new();
555
556 let arranged = input.mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
561 "Arranged TopK input",
562 );
563
564 let limit = limit.map(|l| match l.as_literal() {
566 Some(Ok(Datum::Null)) => Ok(Diff::MAX),
567 Some(Ok(d)) => Ok(Diff::from(d.unwrap_int64())),
568 _ => Err(l),
569 });
570
571 let reduced = arranged.mz_reduce_abelian::<_, Bu, Tr>("Reduced TopK input", {
572 move |mut hash_key, source, target: &mut Vec<(Tr::ValOwn, Diff)>| {
573 let limit = match &limit {
575 Some(Ok(lit)) => Some(*lit),
576 Some(Err(expr)) => {
577 let temp_storage = mz_repr::RowArena::new();
580 let _hash = hash_key.next();
581 let mut key_datums = datum_vec.borrow();
582 key_datums.extend(hash_key);
583 let datum_limit = expr
584 .eval(&key_datums, &temp_storage)
585 .unwrap_or(Datum::Int64(0));
586 Some(match datum_limit {
587 Datum::Null => Diff::MAX,
588 d => Diff::from(d.unwrap_int64()),
589 })
590 }
591 None => None,
592 };
593
594 if let Some(err) = Tr::ValOwn::into_error() {
595 for (datums, diff) in source.iter() {
596 if diff.is_positive() {
597 continue;
598 }
599 target.push((err((*datums).to_row()), Diff::ONE));
600 return;
601 }
602 }
603
604 let must_shrink = offset > 0
606 || limit
607 .map(|l| source.iter().map(|(_, d)| *d).sum::<Diff>() > l)
608 .unwrap_or(false);
609 if !must_shrink {
610 return;
611 }
612
613 target.reserve(source.len());
617 for (datums, diff) in source.iter() {
618 target.push((Tr::ValOwn::ok((*datums).to_row()), -diff));
619 }
620 let mut offset = offset;
622 let mut limit = limit;
623
624 let mut indexes = (0..source.len()).collect::<Vec<_>>();
626 let mut buffer = datum_vec.borrow();
629 for (index, (datums, _)) in source.iter().enumerate() {
630 buffer.extend(*datums);
631 assert_eq!(buffer.len(), arity * (index + 1));
632 }
633 let width = buffer.len() / source.len();
634
635 indexes.sort_by(|left, right| {
637 let left = &buffer[left * width..][..width];
638 let right = &buffer[right * width..][..width];
639 mz_expr::compare_columns(&order_key, left, right, || left.cmp(right))
642 });
643
644 for index in indexes.into_iter() {
647 let (datums, mut diff) = source[index];
648 if !diff.is_positive() {
649 continue;
650 }
651 if offset > 0 {
653 let to_skip =
654 std::cmp::min(offset, usize::try_from(diff.into_inner()).unwrap());
655 offset -= to_skip;
656 diff -= Diff::try_from(to_skip).unwrap();
657 }
658 if let Some(limit) = &mut limit {
660 diff = std::cmp::min(diff, Diff::from(*limit));
661 *limit -= diff;
662 }
663 if diff.is_positive() {
665 target.push((Tr::ValOwn::ok(datums.to_row()), diff));
668 }
669 }
670 }
671 });
672 (arranged, reduced)
673}
674
675fn render_intra_ts_thinning<S>(
676 collection: VecCollection<S, (Row, Row), Diff>,
677 order_key: Vec<mz_expr::ColumnOrder>,
678 limit: mz_expr::MirScalarExpr,
679) -> VecCollection<S, (Row, Row), Diff>
680where
681 S: Scope,
682 S::Timestamp: Lattice,
683{
684 let mut datum_vec = mz_repr::DatumVec::new();
685
686 let mut aggregates = BTreeMap::new();
687 let shared = Rc::new(RefCell::new(monoids::Top1MonoidShared {
688 order_key,
689 left: DatumVec::new(),
690 right: DatumVec::new(),
691 }));
692 collection
693 .inner
694 .unary_notify(
695 Pipeline,
696 "TopKIntraTimeThinning",
697 [],
698 move |input, output, notificator| {
699 input.for_each_time(|time, data| {
700 let agg_time = aggregates
701 .entry(time.time().clone())
702 .or_insert_with(BTreeMap::new);
703 for ((grp_row, row), record_time, diff) in data.flat_map(|data| data.drain(..))
704 {
705 let monoid = monoids::Top1MonoidLocal {
706 row,
707 shared: Rc::clone(&shared),
708 };
709
710 let limit = if let Some(l) = limit.as_literal_int64() {
712 l
713 } else {
714 let temp_storage = mz_repr::RowArena::new();
715 let key_datums = datum_vec.borrow_with(&grp_row);
716 let datum_limit = limit
719 .eval(&key_datums, &temp_storage)
720 .unwrap_or(mz_repr::Datum::Int64(0));
721 if datum_limit == Datum::Null {
722 i64::MAX
723 } else {
724 datum_limit.unwrap_int64()
725 }
726 };
727
728 let topk = agg_time
729 .entry((grp_row, record_time))
730 .or_insert_with(move || topk_agg::TopKBatch::new(limit));
731 topk.update(monoid, diff.into_inner());
732 }
733 notificator.notify_at(time.retain());
734 });
735
736 notificator.for_each(|time, _, _| {
737 if let Some(aggs) = aggregates.remove(time.time()) {
738 let mut session = output.session(&time);
739 for ((grp_row, record_time), topk) in aggs {
740 session.give_iterator(topk.into_iter().map(|(monoid, diff)| {
741 (
742 (grp_row.clone(), monoid.into_row()),
743 record_time.clone(),
744 diff.into(),
745 )
746 }))
747 }
748 }
749 });
750 },
751 )
752 .as_collection()
753}
754
755pub mod topk_agg {
757 use differential_dataflow::consolidation;
758 use smallvec::SmallVec;
759
760 pub struct TopKBatch<T> {
767 updates: SmallVec<[(T, i64); 16]>,
768 clean: usize,
769 limit: i64,
770 }
771
772 impl<T: Ord> TopKBatch<T> {
773 pub fn new(limit: i64) -> Self {
774 Self {
775 updates: SmallVec::new(),
776 clean: 0,
777 limit,
778 }
779 }
780
781 #[inline]
788 pub fn update(&mut self, item: T, value: i64) {
789 self.updates.push((item, value));
790 self.maintain_bounds();
791 }
792
793 #[inline]
799 pub fn compact(&mut self) {
800 if self.clean < self.updates.len() && self.updates.len() > 1 {
801 let len = consolidation::consolidate_slice(&mut self.updates);
802 self.updates.truncate(len);
803
804 let mut limit = self.limit;
806 self.updates.retain(|x| {
807 if limit > 0 {
808 limit -= x.1;
809 true
810 } else {
811 false
812 }
813 });
814 if limit < 0 {
822 if let Some(item) = self.updates.last_mut() {
823 item.1 -= -limit;
826 }
827 }
828 }
829 self.clean = self.updates.len();
830 }
831
832 fn maintain_bounds(&mut self) {
835 if self.updates.len() > 32 && self.updates.len() >> 1 >= self.clean {
837 self.compact()
838 }
839 }
840 }
841
842 impl<T: Ord> IntoIterator for TopKBatch<T> {
843 type Item = (T, i64);
844 type IntoIter = smallvec::IntoIter<[(T, i64); 16]>;
845
846 fn into_iter(mut self) -> Self::IntoIter {
847 self.compact();
848 self.updates.into_iter()
849 }
850 }
851}
852
853pub mod monoids {
855 use std::cell::RefCell;
856 use std::cmp::Ordering;
857 use std::hash::{Hash, Hasher};
858 use std::rc::Rc;
859
860 use differential_dataflow::containers::{Columnation, Region};
861 use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
862 use mz_expr::ColumnOrder;
863 use mz_repr::{DatumVec, Diff, Row};
864 use serde::{Deserialize, Serialize};
865
866 #[derive(Eq, PartialEq, Debug, Serialize, Deserialize, Hash, Default)]
868 pub struct Top1Monoid {
869 pub row: Row,
870 pub order_key: Vec<ColumnOrder>,
871 }
872
873 impl Clone for Top1Monoid {
874 #[inline]
875 fn clone(&self) -> Self {
876 Self {
877 row: self.row.clone(),
878 order_key: self.order_key.clone(),
879 }
880 }
881
882 #[inline]
883 fn clone_from(&mut self, source: &Self) {
884 self.row.clone_from(&source.row);
885 self.order_key.clone_from(&source.order_key);
886 }
887 }
888
889 impl Multiply<Diff> for Top1Monoid {
890 type Output = Self;
891
892 fn multiply(self, factor: &Diff) -> Self {
893 assert!(factor.is_positive());
898 self
899 }
900 }
901
902 impl Ord for Top1Monoid {
903 fn cmp(&self, other: &Self) -> Ordering {
904 debug_assert_eq!(self.order_key, other.order_key);
905
906 let left: Vec<_> = self.row.unpack();
909 let right: Vec<_> = other.row.unpack();
910 mz_expr::compare_columns(&self.order_key, &left, &right, || left.cmp(&right))
911 }
912 }
913 impl PartialOrd for Top1Monoid {
914 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
915 Some(self.cmp(other))
916 }
917 }
918
919 impl Semigroup for Top1Monoid {
920 fn plus_equals(&mut self, rhs: &Self) {
921 let cmp = (*self).cmp(rhs);
922 if cmp == Ordering::Greater {
924 self.clone_from(rhs);
925 }
926 }
927 }
928
929 impl IsZero for Top1Monoid {
930 fn is_zero(&self) -> bool {
931 false
932 }
933 }
934
935 impl Columnation for Top1Monoid {
936 type InnerRegion = Top1MonoidRegion;
937 }
938
939 #[derive(Default)]
940 pub struct Top1MonoidRegion {
941 row_region: <Row as Columnation>::InnerRegion,
942 order_key_region: <Vec<ColumnOrder> as Columnation>::InnerRegion,
943 }
944
945 impl Region for Top1MonoidRegion {
946 type Item = Top1Monoid;
947
948 unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
949 let row = unsafe { self.row_region.copy(&item.row) };
950 let order_key = unsafe { self.order_key_region.copy(&item.order_key) };
951 Self::Item { row, order_key }
952 }
953
954 fn clear(&mut self) {
955 self.row_region.clear();
956 self.order_key_region.clear();
957 }
958
959 fn reserve_items<'a, I>(&mut self, items1: I)
960 where
961 Self: 'a,
962 I: Iterator<Item = &'a Self::Item> + Clone,
963 {
964 let items2 = items1.clone();
965 self.row_region
966 .reserve_items(items1.into_iter().map(|s| &s.row));
967 self.order_key_region
968 .reserve_items(items2.into_iter().map(|s| &s.order_key));
969 }
970
971 fn reserve_regions<'a, I>(&mut self, regions1: I)
972 where
973 Self: 'a,
974 I: Iterator<Item = &'a Self> + Clone,
975 {
976 let regions2 = regions1.clone();
977 self.row_region
978 .reserve_regions(regions1.into_iter().map(|s| &s.row_region));
979 self.order_key_region
980 .reserve_regions(regions2.into_iter().map(|s| &s.order_key_region));
981 }
982
983 fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
984 self.row_region.heap_size(&mut callback);
985 self.order_key_region.heap_size(callback);
986 }
987 }
988
989 #[derive(Debug)]
991 pub struct Top1MonoidShared {
992 pub order_key: Vec<ColumnOrder>,
993 pub left: DatumVec,
994 pub right: DatumVec,
995 }
996
997 #[derive(Debug, Clone)]
1000 pub struct Top1MonoidLocal {
1001 pub row: Row,
1002 pub shared: Rc<RefCell<Top1MonoidShared>>,
1003 }
1004
1005 impl Top1MonoidLocal {
1006 pub fn into_row(self) -> Row {
1007 self.row
1008 }
1009 }
1010
1011 impl PartialEq for Top1MonoidLocal {
1012 fn eq(&self, other: &Self) -> bool {
1013 self.row.eq(&other.row)
1014 }
1015 }
1016
1017 impl Eq for Top1MonoidLocal {}
1018
1019 impl Hash for Top1MonoidLocal {
1020 fn hash<H: Hasher>(&self, state: &mut H) {
1021 self.row.hash(state);
1022 }
1023 }
1024
1025 impl Ord for Top1MonoidLocal {
1026 fn cmp(&self, other: &Self) -> Ordering {
1027 debug_assert!(Rc::ptr_eq(&self.shared, &other.shared));
1028 let Top1MonoidShared {
1029 left,
1030 right,
1031 order_key,
1032 } = &mut *self.shared.borrow_mut();
1033
1034 let left = left.borrow_with(&self.row);
1035 let right = right.borrow_with(&other.row);
1036 mz_expr::compare_columns(order_key, &left, &right, || left.cmp(&right))
1037 }
1038 }
1039
1040 impl PartialOrd for Top1MonoidLocal {
1041 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1042 Some(self.cmp(other))
1043 }
1044 }
1045
1046 impl Semigroup for Top1MonoidLocal {
1047 fn plus_equals(&mut self, rhs: &Self) {
1048 let cmp = (*self).cmp(rhs);
1049 if cmp == Ordering::Greater {
1051 self.clone_from(rhs);
1052 }
1053 }
1054 }
1055
1056 impl IsZero for Top1MonoidLocal {
1057 fn is_zero(&self) -> bool {
1058 false
1059 }
1060 }
1061}