1use std::cell::RefCell;
15use std::collections::BTreeMap;
16use std::rc::Rc;
17
18use differential_dataflow::AsCollection;
19use differential_dataflow::hashable::Hashable;
20use differential_dataflow::lattice::Lattice;
21use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
22use differential_dataflow::operators::iterate::Variable as SemigroupVariable;
23use differential_dataflow::trace::implementations::BatchContainer;
24use differential_dataflow::trace::{Builder, Trace};
25use differential_dataflow::{Data, VecCollection};
26use mz_compute_types::dyncfgs::{ENABLE_COMPUTE_TEMPORAL_BUCKETING, TEMPORAL_BUCKETING_SUMMARY};
27use mz_compute_types::plan::ArrangementStrategy;
28use mz_compute_types::plan::top_k::{
29 BasicTopKPlan, MonotonicTop1Plan, MonotonicTopKPlan, TopKPlan,
30};
31use mz_expr::func::CastUint64ToInt64;
32use mz_expr::{BinaryFunc, Columns, Eval, EvalError, MirScalarExpr, UnaryFunc, func};
33use mz_ore::cast::CastFrom;
34use mz_ore::soft_assert_or_log;
35use mz_repr::{Datum, DatumVec, Diff, ReprScalarType, Row, SharedRow};
36use mz_timely_util::columnation::ColumnationChunker;
37use mz_timely_util::operator::CollectionExt;
38use timely::Container;
39use timely::container::{CapacityContainerBuilder, PushInto};
40use timely::dataflow::channels::pact::Pipeline;
41use timely::dataflow::operators::Operator;
42
43use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
44use crate::extensions::reduce::{ClearContainer, MzReduce};
45use crate::render::Pairer;
46use crate::render::context::{CollectionBundle, Context};
47use crate::render::errors::DataflowErrorSer;
48use crate::render::errors::MaybeValidatingRow;
49use crate::typedefs::{KeyBatcher, MzTimestamp, RowRowSpine, RowSpine};
50use mz_row_spine::{
51 DatumSeq, RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowValBuilder, RowValSpine,
52};
53
54impl<'scope, T: crate::render::RenderTimestamp + crate::render::MaybeBucketByTime>
56 Context<'scope, T>
57{
58 pub(crate) fn render_topk(
59 &self,
60 input: CollectionBundle<'scope, T>,
61 top_k_plan: TopKPlan,
62 temporal_bucketing_strategy: ArrangementStrategy,
63 ) -> CollectionBundle<'scope, T> {
64 let (ok_input, err_input) = input.as_specific_collection(None, &self.config_set);
65
66 if matches!(
84 temporal_bucketing_strategy,
85 ArrangementStrategy::TemporalBucketing
86 ) {
87 let must_consolidate = match &top_k_plan {
88 TopKPlan::MonotonicTop1(p) => p.must_consolidate,
89 TopKPlan::MonotonicTopK(p) => p.must_consolidate,
90 TopKPlan::Basic(_) => true,
91 };
92 soft_assert_or_log!(
93 must_consolidate,
94 "TopK with `TemporalBucketing` should not have `must_consolidate = false`; \
95 `RelaxMustConsolidate` only runs on single-time dataflows where \
96 `mz_now()` has been const-folded and no temporal bucketing is set",
97 );
98 }
99 let ok_input = if matches!(
100 temporal_bucketing_strategy,
101 ArrangementStrategy::TemporalBucketing
102 ) && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(&self.config_set)
103 {
104 let summary: mz_repr::Timestamp = TEMPORAL_BUCKETING_SUMMARY
105 .get(&self.config_set)
106 .try_into()
107 .expect("must fit");
108 T::maybe_apply_temporal_bucketing(ok_input.inner, self.as_of_frontier.clone(), summary)
109 } else {
110 ok_input
111 };
112
113 let outer_scope = ok_input.scope();
115 let (ok_result, err_collection) = outer_scope.clone().region_named("TopK", |inner| {
116 let ok_input = ok_input.enter_region(inner);
117 let mut err_collection = err_input.enter_region(inner);
118
119 match top_k_plan.limit().map(|l| (l.as_literal(), l)) {
127 None => {}
128 Some((Some(Ok(literal)), _))
129 if literal == Datum::Null || literal.unwrap_int64() >= 0 => {}
130 Some((_, expr)) => {
131 let expr = expr.clone();
137 let mut datum_vec = mz_repr::DatumVec::new();
138 let errors = ok_input.clone().flat_map(move |row| {
139 let temp_storage = mz_repr::RowArena::new();
140 let datums = datum_vec.borrow_with(&row);
141 match expr.eval(&datums[..], &temp_storage) {
142 Ok(l) if l != Datum::Null && l.unwrap_int64() < 0 => {
143 Some(EvalError::NegLimit.into())
144 }
145 Ok(_) => None,
146 Err(e) => Some(e.into()),
147 }
148 });
149 err_collection = err_collection.concat(errors);
150 }
151 }
152
153 let ok_result = match top_k_plan {
154 TopKPlan::MonotonicTop1(MonotonicTop1Plan {
155 group_key,
156 order_key,
157 must_consolidate,
158 }) => {
159 let (oks, errs) = self.render_top1_monotonic(
160 ok_input,
161 group_key,
162 order_key,
163 must_consolidate,
164 );
165 err_collection = err_collection.concat(errs);
166 oks
167 }
168 TopKPlan::MonotonicTopK(MonotonicTopKPlan {
169 order_key,
170 group_key,
171 arity,
172 mut limit,
173 must_consolidate,
174 }) => {
175 if let Some(expr) = limit.as_mut() {
177 let mut map = BTreeMap::new();
178 for (index, column) in group_key.iter().enumerate() {
179 map.insert(*column, index);
180 }
181 expr.permute_map(&map);
182 }
183
184 let mut datum_vec = mz_repr::DatumVec::new();
186 let ok_scope = ok_input.scope();
187 let collection = ok_input
188 .map(move |row| {
189 let group_row = {
190 let datums = datum_vec.borrow_with(&row);
191 SharedRow::pack(group_key.iter().map(|i| datums[*i]))
192 };
193 (group_row, row)
194 })
195 .consolidate_named_if::<KeyBatcher<_, _, _>>(
196 must_consolidate,
197 "Consolidated MonotonicTopK input",
198 );
199
200 let error_logger = self.error_logger();
202 let (collection, errs) = collection.ensure_monotonic(move |data, diff| {
203 error_logger.log(
204 "Non-monotonic input to MonotonicTopK",
205 &format!("data={data:?}, diff={diff}"),
206 );
207 let m = "tried to build monotonic top-k on non-monotonic input".into();
208 (DataflowErrorSer::from(EvalError::Internal(m)), Diff::ONE)
209 });
210 err_collection = err_collection.concat(errs);
211
212 let collection = if let Some(limit) = limit.clone() {
218 render_intra_ts_thinning(collection, order_key.clone(), limit)
219 } else {
220 collection
221 };
222
223 let pairer = Pairer::new(1);
224 let collection = collection.map(move |(group_row, row)| {
225 let hash = row.hashed();
226 let hash_key = pairer.merge(std::iter::once(Datum::from(hash)), &group_row);
227 (hash_key, row)
228 });
229
230 let delay = std::time::Duration::from_secs(10);
239 let (retractions_var, retractions) = SemigroupVariable::new(
240 ok_scope,
241 <T as crate::render::RenderTimestamp>::system_delay(
242 delay.try_into().expect("must fit"),
243 ),
244 );
245 let thinned = collection.clone().concat(retractions.negate());
246
247 let (result, errs) =
253 self.build_topk_stage(thinned, order_key, 1u64, 0, limit, arity, false);
254 let result = CollectionExt::consolidate_named::<KeyBatcher<_, _, _>>(
256 result,
257 "Monotonic TopK final consolidate",
258 );
259 retractions_var.set(collection.concat(result.clone().negate()));
260 soft_assert_or_log!(
261 errs.is_none(),
262 "requested no validation, but received error collection"
263 );
264
265 result.map(|(_key_hash, row)| row)
266 }
267 TopKPlan::Basic(BasicTopKPlan {
268 group_key,
269 order_key,
270 offset,
271 mut limit,
272 arity,
273 buckets,
274 }) => {
275 if let Some(expr) = limit.as_mut() {
277 let mut map = BTreeMap::new();
278 for (index, column) in group_key.iter().enumerate() {
279 map.insert(*column, index);
280 }
281 expr.permute_map(&map);
282 }
283
284 let (oks, errs) = self.build_topk(
285 ok_input, group_key, order_key, offset, limit, arity, buckets,
286 );
287 err_collection = err_collection.concat(errs);
288 oks
289 }
290 };
291
292 (
294 ok_result.leave_region(outer_scope),
295 err_collection.leave_region(outer_scope),
296 )
297 });
298
299 CollectionBundle::from_collections(ok_result, err_collection)
300 }
301
302 fn build_topk<'s>(
304 &self,
305 collection: VecCollection<'s, T, Row, Diff>,
306 group_key: Vec<usize>,
307 order_key: Vec<mz_expr::ColumnOrder>,
308 offset: usize,
309 limit: Option<mz_expr::MirScalarExpr>,
310 arity: usize,
311 buckets: Vec<u64>,
312 ) -> (
313 VecCollection<'s, T, Row, Diff>,
314 VecCollection<'s, T, DataflowErrorSer, Diff>,
315 ) {
316 let pairer = Pairer::new(1);
317 let mut datum_vec = mz_repr::DatumVec::new();
318 let mut collection = collection.map({
319 move |row| {
320 let group_row = {
321 let row_hash = row.hashed();
322 let datums = datum_vec.borrow_with(&row);
323 let iterator = group_key.iter().map(|i| datums[*i]);
324 pairer.merge(std::iter::once(Datum::from(row_hash)), iterator)
325 };
326 (group_row, row)
327 }
328 });
329
330 let mut validating = true;
331 let mut err_collection: Option<VecCollection<'s, T, _, _>> = None;
332
333 if let Some(mut limit) = limit.clone() {
334 if offset > 0 {
337 let new_limit = (|| {
338 let limit = limit.as_literal_int64()?;
339 let offset = i64::try_from(offset).ok()?;
340 limit.checked_add(offset)
341 })();
342
343 if let Some(new_limit) = new_limit {
344 limit =
345 MirScalarExpr::literal_ok(Datum::Int64(new_limit), ReprScalarType::Int64);
346 } else {
347 limit = limit.call_binary(
348 MirScalarExpr::literal_ok(
349 Datum::UInt64(u64::cast_from(offset)),
350 ReprScalarType::UInt64,
351 )
352 .call_unary(UnaryFunc::CastUint64ToInt64(CastUint64ToInt64)),
353 BinaryFunc::AddInt64(func::AddInt64),
354 );
355 }
356 }
357
358 for bucket in buckets.into_iter() {
362 let (oks, errs) = self.build_topk_stage(
366 collection,
367 order_key.clone(),
368 bucket,
369 0,
370 Some(limit.clone()),
371 arity,
372 validating,
373 );
374 collection = oks;
375 if validating {
376 err_collection = errs;
377 validating = false;
378 }
379 }
380 }
381
382 let (oks, errs) = self.build_topk_stage(
386 collection, order_key, 1u64, offset, limit, arity, validating,
387 );
388 let oks =
390 CollectionExt::consolidate_named::<KeyBatcher<_, _, _>>(oks, "TopK final consolidate");
391 collection = oks;
392 if validating {
393 err_collection = errs;
394 }
395 (
396 collection.map(|(_key_hash, row)| row),
397 err_collection.expect("at least one stage validated its inputs"),
398 )
399 }
400
401 fn build_topk_stage<'s>(
434 &self,
435 collection: VecCollection<'s, T, (Row, Row), Diff>,
436 order_key: Vec<mz_expr::ColumnOrder>,
437 modulus: u64,
438 offset: usize,
439 limit: Option<mz_expr::MirScalarExpr>,
440 arity: usize,
441 validating: bool,
442 ) -> (
443 VecCollection<'s, T, (Row, Row), Diff>,
444 Option<VecCollection<'s, T, DataflowErrorSer, Diff>>,
445 ) {
446 let input = collection.map(move |(hash_key, row)| {
449 let mut hash_key_iter = hash_key.iter();
450 let hash = hash_key_iter.next().unwrap().unwrap_uint64() % modulus;
451 let hash_key = SharedRow::pack(std::iter::once(hash.into()).chain(hash_key_iter));
452 (hash_key, row)
453 });
454
455 let (input, oks, errs) = if validating {
457 let (input, stage) = build_topk_negated_stage::<
459 T,
460 RowValBuilder<_, _, _>,
461 RowValSpine<Result<Row, Row>, _, _>,
462 >(&input, order_key, offset, limit, arity);
463 let stage = stage.as_collection(|k, v| (k.to_row(), v.clone()));
464
465 let error_logger = self.error_logger();
467 type CB<C> = CapacityContainerBuilder<C>;
468 let (oks, errs) = stage.map_fallible::<CB<_>, CB<_>, _, _, _>(
469 "Demuxing Errors",
470 move |(hk, result)| match result {
471 Err(v) => {
472 let mut hk_iter = hk.iter();
473 let h = hk_iter.next().unwrap().unwrap_uint64();
474 let k = SharedRow::pack(hk_iter);
475 let message = "Negative multiplicities in TopK";
476 error_logger.log(message, &format!("k={k:?}, h={h}, v={v:?}"));
477 Err(EvalError::Internal(message.into()).into())
478 }
479 Ok(t) => Ok((hk, t)),
480 },
481 );
482 (input, oks, Some(errs))
483 } else {
484 let (input, stage) =
486 build_topk_negated_stage::<T, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
487 &input, order_key, offset, limit, arity,
488 );
489 let stage = stage.as_collection(|k, v| (k.to_row(), v.to_row()));
491
492 (input, stage, None)
493 };
494 let input = input.as_collection(|k, v| (k.to_row(), v.to_row()));
495 (oks.concat(input), errs)
496 }
497
498 fn render_top1_monotonic<'s>(
499 &self,
500 collection: VecCollection<'s, T, Row, Diff>,
501 group_key: Vec<usize>,
502 order_key: Vec<mz_expr::ColumnOrder>,
503 must_consolidate: bool,
504 ) -> (
505 VecCollection<'s, T, Row, Diff>,
506 VecCollection<'s, T, DataflowErrorSer, Diff>,
507 ) {
508 let collection = collection
513 .map({
514 let mut datum_vec = mz_repr::DatumVec::new();
515 move |row| {
516 let group_key = {
518 let datums = datum_vec.borrow_with(&row);
519 SharedRow::pack(group_key.iter().map(|i| datums[*i]))
520 };
521 (group_key, row)
522 }
523 })
524 .consolidate_named_if::<KeyBatcher<_, _, _>>(
525 must_consolidate,
526 "Consolidated MonotonicTop1 input",
527 );
528
529 let error_logger = self.error_logger();
531 let (partial, errs) = collection.ensure_monotonic(move |data, diff| {
532 error_logger.log(
533 "Non-monotonic input to MonotonicTop1",
534 &format!("data={data:?}, diff={diff}"),
535 );
536 let m = "tried to build monotonic top-1 on non-monotonic input".into();
537 (EvalError::Internal(m).into(), Diff::ONE)
538 });
539 let partial: KeyCollection<_, _, _> = partial
540 .explode_one(move |(group_key, row)| {
541 (
542 group_key,
543 monoids::Top1Monoid {
544 row,
545 order_key: order_key.clone(),
546 },
547 )
548 })
549 .into();
550 let result = partial
551 .mz_arrange::<
552 ColumnationChunker<_>,
553 RowBatcher<_, _>,
554 RowBuilder<_, _>,
555 RowSpine<_, _>,
556 >(
557 "Arranged MonotonicTop1 partial [val: empty]",
558 )
559 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
560 "MonotonicTop1",
561 move |_key, input, output| {
562 let accum: &monoids::Top1Monoid = &input[0].1;
563 output.push((accum.row.clone(), Diff::ONE));
564 },
565 );
566 (result.as_collection(|_k, v| v.to_row()), errs)
568 }
569}
570
571fn build_topk_negated_stage<'s, T, Bu, Tr>(
579 input: &VecCollection<'s, T, (Row, Row), Diff>,
580 order_key: Vec<mz_expr::ColumnOrder>,
581 offset: usize,
582 limit: Option<mz_expr::MirScalarExpr>,
583 arity: usize,
584) -> (
585 Arranged<'s, TraceAgent<RowRowSpine<T, Diff>>>,
586 Arranged<'s, TraceAgent<Tr>>,
587)
588where
589 T: MzTimestamp,
590 Bu: Builder<
591 Time = T,
592 Input: Container + ClearContainer + PushInto<((Row, Tr::ValOwn), T, Diff)>,
593 Output = Tr::Batch,
594 >,
595 Tr: for<'a> Trace<
596 Key<'a> = DatumSeq<'a>,
597 KeyContainer: BatchContainer<Owned = Row>,
598 ValOwn: Data + MaybeValidatingRow<Row, Row>,
599 Time = T,
600 Diff = Diff,
601 > + 'static,
602 Arranged<'s, TraceAgent<Tr>>: ArrangementSize,
603{
604 let mut datum_vec = mz_repr::DatumVec::new();
605
606 let arranged = input
611 .clone()
612 .mz_arrange::<
613 ColumnationChunker<_>,
614 RowRowBatcher<_, _>,
615 RowRowBuilder<_, _>,
616 RowRowSpine<_, _>,
617 >(
618 "Arranged TopK input",
619 );
620
621 let limit = limit.map(|l| match l.as_literal() {
623 Some(Ok(Datum::Null)) => Ok(Diff::MAX),
624 Some(Ok(d)) => Ok(Diff::from(d.unwrap_int64())),
625 _ => Err(l),
626 });
627
628 let reduced = arranged
629 .clone()
630 .mz_reduce_abelian::<_, Bu, Tr>("Reduced TopK input", {
631 move |mut hash_key, source, target: &mut Vec<(Tr::ValOwn, Diff)>| {
632 let limit = match &limit {
634 Some(Ok(lit)) => Some(*lit),
635 Some(Err(expr)) => {
636 let temp_storage = mz_repr::RowArena::new();
639 let _hash = hash_key.next();
640 let mut key_datums = datum_vec.borrow();
641 key_datums.extend(hash_key);
642 let datum_limit = expr
643 .eval(&key_datums, &temp_storage)
644 .unwrap_or(Datum::Int64(0));
645 Some(match datum_limit {
646 Datum::Null => Diff::MAX,
647 d => Diff::from(d.unwrap_int64()),
648 })
649 }
650 None => None,
651 };
652
653 if let Some(err) = Tr::ValOwn::into_error() {
654 for (datums, diff) in source.iter() {
655 if diff.is_positive() {
656 continue;
657 }
658 target.push((err((*datums).to_row()), Diff::ONE));
659 return;
660 }
661 }
662
663 let must_shrink = offset > 0
665 || limit
666 .map(|l| source.iter().map(|(_, d)| *d).sum::<Diff>() > l)
667 .unwrap_or(false);
668 if !must_shrink {
669 return;
670 }
671
672 target.reserve(source.len());
676 for (datums, diff) in source.iter() {
677 target.push((Tr::ValOwn::ok((*datums).to_row()), -diff));
678 }
679 let mut offset = offset;
681 let mut limit = limit;
682
683 let mut indexes = (0..source.len()).collect::<Vec<_>>();
685 let mut buffer = datum_vec.borrow();
688 for (index, (datums, _)) in source.iter().enumerate() {
689 buffer.extend(*datums);
690 assert_eq!(buffer.len(), arity * (index + 1));
691 }
692 let width = buffer.len() / source.len();
693
694 indexes.sort_by(|left, right| {
696 let left = &buffer[left * width..][..width];
697 let right = &buffer[right * width..][..width];
698 mz_expr::compare_columns(&order_key, left, right, || left.cmp(right))
701 });
702
703 for index in indexes.into_iter() {
706 let (datums, mut diff) = source[index];
707 if !diff.is_positive() {
708 continue;
709 }
710 if offset > 0 {
712 let to_skip =
713 std::cmp::min(offset, usize::try_from(diff.into_inner()).unwrap());
714 offset -= to_skip;
715 diff -= Diff::try_from(to_skip).unwrap();
716 }
717 if let Some(limit) = &mut limit {
719 diff = std::cmp::min(diff, Diff::from(*limit));
720 *limit -= diff;
721 }
722 if diff.is_positive() {
724 target.push((Tr::ValOwn::ok(datums.to_row()), diff));
727 }
728 }
729 }
730 });
731 (arranged, reduced)
732}
733
734fn render_intra_ts_thinning<'s, T>(
735 collection: VecCollection<'s, T, (Row, Row), Diff>,
736 order_key: Vec<mz_expr::ColumnOrder>,
737 limit: mz_expr::MirScalarExpr,
738) -> VecCollection<'s, T, (Row, Row), Diff>
739where
740 T: timely::progress::Timestamp + Lattice,
741{
742 let mut datum_vec = mz_repr::DatumVec::new();
743
744 let mut aggregates = BTreeMap::new();
745 let shared = Rc::new(RefCell::new(monoids::Top1MonoidShared {
746 order_key,
747 left: DatumVec::new(),
748 right: DatumVec::new(),
749 }));
750 collection
751 .inner
752 .unary_notify(
753 Pipeline,
754 "TopKIntraTimeThinning",
755 [],
756 move |input, output, notificator| {
757 input.for_each_time(|time, data| {
758 let agg_time = aggregates
759 .entry(time.time().clone())
760 .or_insert_with(BTreeMap::new);
761 for ((grp_row, row), record_time, diff) in data.flat_map(|data| data.drain(..))
762 {
763 let monoid = monoids::Top1MonoidLocal {
764 row,
765 shared: Rc::clone(&shared),
766 };
767
768 let limit = if let Some(l) = limit.as_literal_int64() {
770 l
771 } else {
772 let temp_storage = mz_repr::RowArena::new();
773 let key_datums = datum_vec.borrow_with(&grp_row);
774 let datum_limit = limit
777 .eval(&key_datums, &temp_storage)
778 .unwrap_or(mz_repr::Datum::Int64(0));
779 if datum_limit == Datum::Null {
780 i64::MAX
781 } else {
782 datum_limit.unwrap_int64()
783 }
784 };
785
786 let topk = agg_time
787 .entry((grp_row, record_time))
788 .or_insert_with(move || topk_agg::TopKBatch::new(limit));
789 topk.update(monoid, diff.into_inner());
790 }
791 notificator.notify_at(time.retain(0));
792 });
793
794 notificator.for_each(|time, _, _| {
795 if let Some(aggs) = aggregates.remove(time.time()) {
796 let mut session = output.session(&time);
797 for ((grp_row, record_time), topk) in aggs {
798 session.give_iterator(topk.into_iter().map(|(monoid, diff)| {
799 (
800 (grp_row.clone(), monoid.into_row()),
801 record_time.clone(),
802 diff.into(),
803 )
804 }))
805 }
806 }
807 });
808 },
809 )
810 .as_collection()
811}
812
813pub mod topk_agg {
815 use differential_dataflow::consolidation;
816 use smallvec::SmallVec;
817
818 pub struct TopKBatch<T> {
825 updates: SmallVec<[(T, i64); 16]>,
826 clean: usize,
827 limit: i64,
828 }
829
830 impl<T: Ord> TopKBatch<T> {
831 pub fn new(limit: i64) -> Self {
832 Self {
833 updates: SmallVec::new(),
834 clean: 0,
835 limit,
836 }
837 }
838
839 #[inline]
846 pub fn update(&mut self, item: T, value: i64) {
847 self.updates.push((item, value));
848 self.maintain_bounds();
849 }
850
851 #[inline]
857 pub fn compact(&mut self) {
858 if self.clean < self.updates.len() && self.updates.len() > 1 {
859 let len = consolidation::consolidate_slice(&mut self.updates);
860 self.updates.truncate(len);
861
862 let mut limit = self.limit;
864 self.updates.retain(|x| {
865 if limit > 0 {
866 limit -= x.1;
867 true
868 } else {
869 false
870 }
871 });
872 if limit < 0 {
880 if let Some(item) = self.updates.last_mut() {
881 item.1 -= -limit;
884 }
885 }
886 }
887 self.clean = self.updates.len();
888 }
889
890 fn maintain_bounds(&mut self) {
893 if self.updates.len() > 32 && self.updates.len() >> 1 >= self.clean {
895 self.compact()
896 }
897 }
898 }
899
900 impl<T: Ord> IntoIterator for TopKBatch<T> {
901 type Item = (T, i64);
902 type IntoIter = smallvec::IntoIter<[(T, i64); 16]>;
903
904 fn into_iter(mut self) -> Self::IntoIter {
905 self.compact();
906 self.updates.into_iter()
907 }
908 }
909}
910
911pub mod monoids {
913 use std::cell::RefCell;
914 use std::cmp::Ordering;
915 use std::hash::{Hash, Hasher};
916 use std::rc::Rc;
917
918 use columnation::{Columnation, Region};
919 use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
920 use mz_expr::ColumnOrder;
921 use mz_repr::{DatumVec, Diff, Row};
922 use serde::{Deserialize, Serialize};
923
924 #[derive(Eq, PartialEq, Debug, Serialize, Deserialize, Hash, Default)]
926 pub struct Top1Monoid {
927 pub row: Row,
928 pub order_key: Vec<ColumnOrder>,
929 }
930
931 impl Clone for Top1Monoid {
932 #[inline]
933 fn clone(&self) -> Self {
934 Self {
935 row: self.row.clone(),
936 order_key: self.order_key.clone(),
937 }
938 }
939
940 #[inline]
941 fn clone_from(&mut self, source: &Self) {
942 self.row.clone_from(&source.row);
943 self.order_key.clone_from(&source.order_key);
944 }
945 }
946
947 impl Multiply<Diff> for Top1Monoid {
948 type Output = Self;
949
950 fn multiply(self, factor: &Diff) -> Self {
951 assert!(factor.is_positive());
956 self
957 }
958 }
959
960 impl Ord for Top1Monoid {
961 fn cmp(&self, other: &Self) -> Ordering {
962 debug_assert_eq!(self.order_key, other.order_key);
963
964 let left: Vec<_> = self.row.unpack();
967 let right: Vec<_> = other.row.unpack();
968 mz_expr::compare_columns(&self.order_key, &left, &right, || left.cmp(&right))
969 }
970 }
971 impl PartialOrd for Top1Monoid {
972 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
973 Some(self.cmp(other))
974 }
975 }
976
977 impl Semigroup for Top1Monoid {
978 fn plus_equals(&mut self, rhs: &Self) {
979 let cmp = (*self).cmp(rhs);
980 if cmp == Ordering::Greater {
982 self.clone_from(rhs);
983 }
984 }
985 }
986
987 impl IsZero for Top1Monoid {
988 fn is_zero(&self) -> bool {
989 false
990 }
991 }
992
993 impl Columnation for Top1Monoid {
994 type InnerRegion = Top1MonoidRegion;
995 }
996
997 #[derive(Default)]
998 pub struct Top1MonoidRegion {
999 row_region: <Row as Columnation>::InnerRegion,
1000 order_key_region: <Vec<ColumnOrder> as Columnation>::InnerRegion,
1001 }
1002
1003 impl Region for Top1MonoidRegion {
1004 type Item = Top1Monoid;
1005
1006 unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
1007 let row = unsafe { self.row_region.copy(&item.row) };
1008 let order_key = unsafe { self.order_key_region.copy(&item.order_key) };
1009 Self::Item { row, order_key }
1010 }
1011
1012 fn clear(&mut self) {
1013 self.row_region.clear();
1014 self.order_key_region.clear();
1015 }
1016
1017 fn reserve_items<'a, I>(&mut self, items1: I)
1018 where
1019 Self: 'a,
1020 I: Iterator<Item = &'a Self::Item> + Clone,
1021 {
1022 let items2 = items1.clone();
1023 self.row_region
1024 .reserve_items(items1.into_iter().map(|s| &s.row));
1025 self.order_key_region
1026 .reserve_items(items2.into_iter().map(|s| &s.order_key));
1027 }
1028
1029 fn reserve_regions<'a, I>(&mut self, regions1: I)
1030 where
1031 Self: 'a,
1032 I: Iterator<Item = &'a Self> + Clone,
1033 {
1034 let regions2 = regions1.clone();
1035 self.row_region
1036 .reserve_regions(regions1.into_iter().map(|s| &s.row_region));
1037 self.order_key_region
1038 .reserve_regions(regions2.into_iter().map(|s| &s.order_key_region));
1039 }
1040
1041 fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
1042 self.row_region.heap_size(&mut callback);
1043 self.order_key_region.heap_size(callback);
1044 }
1045 }
1046
1047 #[derive(Debug)]
1049 pub struct Top1MonoidShared {
1050 pub order_key: Vec<ColumnOrder>,
1051 pub left: DatumVec,
1052 pub right: DatumVec,
1053 }
1054
1055 #[derive(Debug, Clone)]
1058 pub struct Top1MonoidLocal {
1059 pub row: Row,
1060 pub shared: Rc<RefCell<Top1MonoidShared>>,
1061 }
1062
1063 impl Top1MonoidLocal {
1064 pub fn into_row(self) -> Row {
1065 self.row
1066 }
1067 }
1068
1069 impl PartialEq for Top1MonoidLocal {
1070 fn eq(&self, other: &Self) -> bool {
1071 self.row.eq(&other.row)
1072 }
1073 }
1074
1075 impl Eq for Top1MonoidLocal {}
1076
1077 impl Hash for Top1MonoidLocal {
1078 fn hash<H: Hasher>(&self, state: &mut H) {
1079 self.row.hash(state);
1080 }
1081 }
1082
1083 impl Ord for Top1MonoidLocal {
1084 fn cmp(&self, other: &Self) -> Ordering {
1085 debug_assert!(Rc::ptr_eq(&self.shared, &other.shared));
1086 let Top1MonoidShared {
1087 left,
1088 right,
1089 order_key,
1090 } = &mut *self.shared.borrow_mut();
1091
1092 let left = left.borrow_with(&self.row);
1093 let right = right.borrow_with(&other.row);
1094 mz_expr::compare_columns(order_key, &left, &right, || left.cmp(&right))
1095 }
1096 }
1097
1098 impl PartialOrd for Top1MonoidLocal {
1099 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1100 Some(self.cmp(other))
1101 }
1102 }
1103
1104 impl Semigroup for Top1MonoidLocal {
1105 fn plus_equals(&mut self, rhs: &Self) {
1106 let cmp = (*self).cmp(rhs);
1107 if cmp == Ordering::Greater {
1109 self.clone_from(rhs);
1110 }
1111 }
1112 }
1113
1114 impl IsZero for Top1MonoidLocal {
1115 fn is_zero(&self) -> bool {
1116 false
1117 }
1118 }
1119}