1use std::cell::RefCell;
15use std::collections::BTreeMap;
16use std::rc::Rc;
17
18use differential_dataflow::AsCollection;
19use differential_dataflow::hashable::Hashable;
20use differential_dataflow::lattice::Lattice;
21use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
22use differential_dataflow::operators::iterate::Variable as SemigroupVariable;
23use differential_dataflow::trace::implementations::BatchContainer;
24use differential_dataflow::trace::{Builder, Trace};
25use differential_dataflow::{Data, VecCollection};
26use mz_compute_types::dyncfgs::{ENABLE_COMPUTE_TEMPORAL_BUCKETING, TEMPORAL_BUCKETING_SUMMARY};
27use mz_compute_types::plan::ArrangementStrategy;
28use mz_compute_types::plan::top_k::{
29 BasicTopKPlan, MonotonicTop1Plan, MonotonicTopKPlan, TopKPlan,
30};
31use mz_expr::func::CastUint64ToInt64;
32use mz_expr::{BinaryFunc, Columns, Eval, EvalError, MirScalarExpr, UnaryFunc, func};
33use mz_ore::cast::CastFrom;
34use mz_ore::soft_assert_or_log;
35use mz_repr::fixed_length::ExtendDatums;
36use mz_repr::{Datum, DatumVec, Diff, ReprScalarType, Row, SharedRow};
37use mz_timely_util::columnation::ColumnationChunker;
38use mz_timely_util::operator::CollectionExt;
39use timely::Container;
40use timely::container::{CapacityContainerBuilder, PushInto};
41use timely::dataflow::channels::pact::Pipeline;
42use timely::dataflow::operators::Operator;
43
44use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
45use crate::extensions::reduce::{ClearContainer, MzReduce};
46use crate::render::Pairer;
47use crate::render::context::{CollectionBundle, Context};
48use crate::render::errors::DataflowErrorSer;
49use crate::render::errors::MaybeValidatingRow;
50use crate::typedefs::{KeyBatcher, MzTimestamp, RowRowSpine, RowSpine};
51use mz_row_spine::{
52 DatumSeq, RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowValBuilder, RowValSpine,
53};
54
55impl<'scope, T: crate::render::RenderTimestamp + crate::render::MaybeBucketByTime>
57 Context<'scope, T>
58{
59 pub(crate) fn render_topk(
60 &self,
61 input: CollectionBundle<'scope, T>,
62 top_k_plan: TopKPlan,
63 temporal_bucketing_strategy: ArrangementStrategy,
64 ) -> CollectionBundle<'scope, T> {
65 let (ok_input, err_input) = input.as_specific_collection(None, &self.config_set);
66
67 if matches!(
85 temporal_bucketing_strategy,
86 ArrangementStrategy::TemporalBucketing
87 ) {
88 let must_consolidate = match &top_k_plan {
89 TopKPlan::MonotonicTop1(p) => p.must_consolidate,
90 TopKPlan::MonotonicTopK(p) => p.must_consolidate,
91 TopKPlan::Basic(_) => true,
92 };
93 soft_assert_or_log!(
94 must_consolidate,
95 "TopK with `TemporalBucketing` should not have `must_consolidate = false`; \
96 `RelaxMustConsolidate` only runs on single-time dataflows where \
97 `mz_now()` has been const-folded and no temporal bucketing is set",
98 );
99 }
100 let ok_input = if matches!(
101 temporal_bucketing_strategy,
102 ArrangementStrategy::TemporalBucketing
103 ) && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(&self.config_set)
104 {
105 let summary: mz_repr::Timestamp = TEMPORAL_BUCKETING_SUMMARY
106 .get(&self.config_set)
107 .try_into()
108 .expect("must fit");
109 T::maybe_apply_temporal_bucketing(ok_input.inner, self.as_of_frontier.clone(), summary)
110 } else {
111 ok_input
112 };
113
114 let outer_scope = ok_input.scope();
116 let (ok_result, err_collection) = outer_scope.clone().region_named("TopK", |inner| {
117 let ok_input = ok_input.enter_region(inner);
118 let mut err_collection = err_input.enter_region(inner);
119
120 match top_k_plan.limit().map(|l| (l.as_literal(), l)) {
128 None => {}
129 Some((Some(Ok(literal)), _))
130 if literal == Datum::Null || literal.unwrap_int64() >= 0 => {}
131 Some((_, expr)) => {
132 let expr = expr.clone();
138 let mut datum_vec = mz_repr::DatumVec::new();
139 let errors = ok_input.clone().flat_map(move |row| {
140 let temp_storage = mz_repr::RowArena::new();
141 let datums = datum_vec.borrow_with(&row);
142 match expr.eval(&datums[..], &temp_storage) {
143 Ok(l) if l != Datum::Null && l.unwrap_int64() < 0 => {
144 Some(EvalError::NegLimit.into())
145 }
146 Ok(_) => None,
147 Err(e) => Some(e.into()),
148 }
149 });
150 err_collection = err_collection.concat(errors);
151 }
152 }
153
154 let ok_result = match top_k_plan {
155 TopKPlan::MonotonicTop1(MonotonicTop1Plan {
156 group_key,
157 order_key,
158 must_consolidate,
159 }) => {
160 let (oks, errs) = self.render_top1_monotonic(
161 ok_input,
162 group_key,
163 order_key,
164 must_consolidate,
165 );
166 err_collection = err_collection.concat(errs);
167 oks
168 }
169 TopKPlan::MonotonicTopK(MonotonicTopKPlan {
170 order_key,
171 group_key,
172 arity,
173 mut limit,
174 must_consolidate,
175 }) => {
176 if let Some(expr) = limit.as_mut() {
178 let mut map = BTreeMap::new();
179 for (index, column) in group_key.iter().enumerate() {
180 map.insert(*column, index);
181 }
182 expr.permute_map(&map);
183 }
184
185 let mut datum_vec = mz_repr::DatumVec::new();
187 let ok_scope = ok_input.scope();
188 let collection = ok_input
189 .map(move |row| {
190 let group_row = {
191 let datums = datum_vec.borrow_with(&row);
192 SharedRow::pack(group_key.iter().map(|i| datums[*i]))
193 };
194 (group_row, row)
195 })
196 .consolidate_named_if::<KeyBatcher<_, _, _>>(
197 must_consolidate,
198 "Consolidated MonotonicTopK input",
199 );
200
201 let error_logger = self.error_logger();
203 let (collection, errs) = collection.ensure_monotonic(move |data, diff| {
204 error_logger.log(
205 "Non-monotonic input to MonotonicTopK",
206 &format!("data={data:?}, diff={diff}"),
207 );
208 let m = "tried to build monotonic top-k on non-monotonic input".into();
209 (DataflowErrorSer::from(EvalError::Internal(m)), Diff::ONE)
210 });
211 err_collection = err_collection.concat(errs);
212
213 let collection = if let Some(limit) = limit.clone() {
219 render_intra_ts_thinning(collection, order_key.clone(), limit)
220 } else {
221 collection
222 };
223
224 let pairer = Pairer::new(1);
225 let collection = collection.map(move |(group_row, row)| {
226 let hash = row.hashed();
227 let hash_key = pairer.merge(std::iter::once(Datum::from(hash)), &group_row);
228 (hash_key, row)
229 });
230
231 let delay = std::time::Duration::from_secs(10);
240 let (retractions_var, retractions) = SemigroupVariable::new(
241 ok_scope,
242 <T as crate::render::RenderTimestamp>::system_delay(
243 delay.try_into().expect("must fit"),
244 ),
245 );
246 let thinned = collection.clone().concat(retractions.negate());
247
248 let (result, errs) =
254 self.build_topk_stage(thinned, order_key, 1u64, 0, limit, arity, false);
255 let result = CollectionExt::consolidate_named::<KeyBatcher<_, _, _>>(
257 result,
258 "Monotonic TopK final consolidate",
259 );
260 retractions_var.set(collection.concat(result.clone().negate()));
261 soft_assert_or_log!(
262 errs.is_none(),
263 "requested no validation, but received error collection"
264 );
265
266 result.map(|(_key_hash, row)| row)
267 }
268 TopKPlan::Basic(BasicTopKPlan {
269 group_key,
270 order_key,
271 offset,
272 mut limit,
273 arity,
274 buckets,
275 }) => {
276 if let Some(expr) = limit.as_mut() {
278 let mut map = BTreeMap::new();
279 for (index, column) in group_key.iter().enumerate() {
280 map.insert(*column, index);
281 }
282 expr.permute_map(&map);
283 }
284
285 let (oks, errs) = self.build_topk(
286 ok_input, group_key, order_key, offset, limit, arity, buckets,
287 );
288 err_collection = err_collection.concat(errs);
289 oks
290 }
291 };
292
293 (
295 ok_result.leave_region(outer_scope),
296 err_collection.leave_region(outer_scope),
297 )
298 });
299
300 CollectionBundle::from_collections(ok_result, err_collection)
301 }
302
303 fn build_topk<'s>(
305 &self,
306 collection: VecCollection<'s, T, Row, Diff>,
307 group_key: Vec<usize>,
308 order_key: Vec<mz_expr::ColumnOrder>,
309 offset: usize,
310 limit: Option<mz_expr::MirScalarExpr>,
311 arity: usize,
312 buckets: Vec<u64>,
313 ) -> (
314 VecCollection<'s, T, Row, Diff>,
315 VecCollection<'s, T, DataflowErrorSer, Diff>,
316 ) {
317 let pairer = Pairer::new(1);
318 let mut datum_vec = mz_repr::DatumVec::new();
319 let mut collection = collection.map({
320 move |row| {
321 let group_row = {
322 let row_hash = row.hashed();
323 let datums = datum_vec.borrow_with(&row);
324 let iterator = group_key.iter().map(|i| datums[*i]);
325 pairer.merge(std::iter::once(Datum::from(row_hash)), iterator)
326 };
327 (group_row, row)
328 }
329 });
330
331 let mut validating = true;
332 let mut err_collection: Option<VecCollection<'s, T, _, _>> = None;
333
334 if let Some(mut limit) = limit.clone() {
335 if offset > 0 {
338 let new_limit = (|| {
339 let limit = limit.as_literal_int64()?;
340 let offset = i64::try_from(offset).ok()?;
341 limit.checked_add(offset)
342 })();
343
344 if let Some(new_limit) = new_limit {
345 limit =
346 MirScalarExpr::literal_ok(Datum::Int64(new_limit), ReprScalarType::Int64);
347 } else {
348 limit = limit.call_binary(
349 MirScalarExpr::literal_ok(
350 Datum::UInt64(u64::cast_from(offset)),
351 ReprScalarType::UInt64,
352 )
353 .call_unary(UnaryFunc::CastUint64ToInt64(CastUint64ToInt64)),
354 BinaryFunc::AddInt64(func::AddInt64),
355 );
356 }
357 }
358
359 for bucket in buckets.into_iter() {
363 let (oks, errs) = self.build_topk_stage(
367 collection,
368 order_key.clone(),
369 bucket,
370 0,
371 Some(limit.clone()),
372 arity,
373 validating,
374 );
375 collection = oks;
376 if validating {
377 err_collection = errs;
378 validating = false;
379 }
380 }
381 }
382
383 let (oks, errs) = self.build_topk_stage(
387 collection, order_key, 1u64, offset, limit, arity, validating,
388 );
389 let oks =
391 CollectionExt::consolidate_named::<KeyBatcher<_, _, _>>(oks, "TopK final consolidate");
392 collection = oks;
393 if validating {
394 err_collection = errs;
395 }
396 (
397 collection.map(|(_key_hash, row)| row),
398 err_collection.expect("at least one stage validated its inputs"),
399 )
400 }
401
402 fn build_topk_stage<'s>(
435 &self,
436 collection: VecCollection<'s, T, (Row, Row), Diff>,
437 order_key: Vec<mz_expr::ColumnOrder>,
438 modulus: u64,
439 offset: usize,
440 limit: Option<mz_expr::MirScalarExpr>,
441 arity: usize,
442 validating: bool,
443 ) -> (
444 VecCollection<'s, T, (Row, Row), Diff>,
445 Option<VecCollection<'s, T, DataflowErrorSer, Diff>>,
446 ) {
447 let input = collection.map(move |(hash_key, row)| {
450 let mut hash_key_iter = hash_key.iter();
451 let hash = hash_key_iter.next().unwrap().unwrap_uint64() % modulus;
452 let hash_key = SharedRow::pack(std::iter::once(hash.into()).chain(hash_key_iter));
453 (hash_key, row)
454 });
455
456 let (input, oks, errs) = if validating {
458 let (input, stage) = build_topk_negated_stage::<
460 T,
461 RowValBuilder<_, _, _>,
462 RowValSpine<Result<Row, Row>, _, _>,
463 >(&input, order_key, offset, limit, arity);
464 let stage = stage.as_collection(|k, v| (k.to_row(), v.clone()));
465
466 let error_logger = self.error_logger();
468 type CB<C> = CapacityContainerBuilder<C>;
469 let (oks, errs) = stage.map_fallible::<CB<_>, CB<_>, _, _, _>(
470 "Demuxing Errors",
471 move |(hk, result)| match result {
472 Err(v) => {
473 let mut hk_iter = hk.iter();
474 let h = hk_iter.next().unwrap().unwrap_uint64();
475 let k = SharedRow::pack(hk_iter);
476 let message = "Negative multiplicities in TopK";
477 error_logger.log(message, &format!("k={k:?}, h={h}, v={v:?}"));
478 Err(EvalError::Internal(message.into()).into())
479 }
480 Ok(t) => Ok((hk, t)),
481 },
482 );
483 (input, oks, Some(errs))
484 } else {
485 let (input, stage) =
487 build_topk_negated_stage::<T, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
488 &input, order_key, offset, limit, arity,
489 );
490 let stage = stage.as_collection(|k, v| (k.to_row(), v.to_row()));
492
493 (input, stage, None)
494 };
495 let input = input.as_collection(|k, v| (k.to_row(), v.to_row()));
496 (oks.concat(input), errs)
497 }
498
499 fn render_top1_monotonic<'s>(
500 &self,
501 collection: VecCollection<'s, T, Row, Diff>,
502 group_key: Vec<usize>,
503 order_key: Vec<mz_expr::ColumnOrder>,
504 must_consolidate: bool,
505 ) -> (
506 VecCollection<'s, T, Row, Diff>,
507 VecCollection<'s, T, DataflowErrorSer, Diff>,
508 ) {
509 let collection = collection
514 .map({
515 let mut datum_vec = mz_repr::DatumVec::new();
516 move |row| {
517 let group_key = {
519 let datums = datum_vec.borrow_with(&row);
520 SharedRow::pack(group_key.iter().map(|i| datums[*i]))
521 };
522 (group_key, row)
523 }
524 })
525 .consolidate_named_if::<KeyBatcher<_, _, _>>(
526 must_consolidate,
527 "Consolidated MonotonicTop1 input",
528 );
529
530 let error_logger = self.error_logger();
532 let (partial, errs) = collection.ensure_monotonic(move |data, diff| {
533 error_logger.log(
534 "Non-monotonic input to MonotonicTop1",
535 &format!("data={data:?}, diff={diff}"),
536 );
537 let m = "tried to build monotonic top-1 on non-monotonic input".into();
538 (EvalError::Internal(m).into(), Diff::ONE)
539 });
540 let partial: KeyCollection<_, _, _> = partial
541 .explode_one(move |(group_key, row)| {
542 (
543 group_key,
544 monoids::Top1Monoid {
545 row,
546 order_key: order_key.clone(),
547 },
548 )
549 })
550 .into();
551 let result = partial
552 .mz_arrange::<
553 ColumnationChunker<_>,
554 RowBatcher<_, _>,
555 RowBuilder<_, _>,
556 RowSpine<_, _>,
557 >(
558 "Arranged MonotonicTop1 partial [val: empty]",
559 )
560 .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
561 "MonotonicTop1",
562 move |_key, input, output| {
563 let accum: &monoids::Top1Monoid = &input[0].1;
564 output.push((accum.row.clone(), Diff::ONE));
565 },
566 );
567 (result.as_collection(|_k, v| v.to_row()), errs)
569 }
570}
571
572fn build_topk_negated_stage<'s, T, Bu, Tr>(
580 input: &VecCollection<'s, T, (Row, Row), Diff>,
581 order_key: Vec<mz_expr::ColumnOrder>,
582 offset: usize,
583 limit: Option<mz_expr::MirScalarExpr>,
584 arity: usize,
585) -> (
586 Arranged<'s, TraceAgent<RowRowSpine<T, Diff>>>,
587 Arranged<'s, TraceAgent<Tr>>,
588)
589where
590 T: MzTimestamp,
591 Bu: Builder<
592 Time = T,
593 Input: Container + ClearContainer + PushInto<((Row, Tr::ValOwn), T, Diff)>,
594 Output = Tr::Batch,
595 >,
596 Tr: for<'a> Trace<
597 Key<'a> = DatumSeq<'a>,
598 KeyContainer: BatchContainer<Owned = Row>,
599 ValOwn: Data + MaybeValidatingRow<Row, Row>,
600 Time = T,
601 Diff = Diff,
602 > + 'static,
603 Arranged<'s, TraceAgent<Tr>>: ArrangementSize,
604{
605 let mut datum_vec = mz_repr::DatumVec::new();
606
607 let arranged = input
612 .clone()
613 .mz_arrange::<
614 ColumnationChunker<_>,
615 RowRowBatcher<_, _>,
616 RowRowBuilder<_, _>,
617 RowRowSpine<_, _>,
618 >(
619 "Arranged TopK input",
620 );
621
622 let limit = limit.map(|l| match l.as_literal() {
624 Some(Ok(Datum::Null)) => Ok(Diff::MAX),
625 Some(Ok(d)) => Ok(Diff::from(d.unwrap_int64())),
626 _ => Err(l),
627 });
628
629 let reduced = arranged
630 .clone()
631 .mz_reduce_abelian::<_, Bu, Tr>("Reduced TopK input", {
632 move |hash_key, source, target: &mut Vec<(Tr::ValOwn, Diff)>| {
633 let limit = match &limit {
635 Some(Ok(lit)) => Some(*lit),
636 Some(Err(expr)) => {
637 let temp_storage = mz_repr::RowArena::new();
640 let mut key_datums = datum_vec.borrow();
641 hash_key.extend_datums(&temp_storage, &mut key_datums, None);
642 let datum_limit = expr
644 .eval(&key_datums[1..], &temp_storage)
645 .unwrap_or(Datum::Int64(0));
646 Some(match datum_limit {
647 Datum::Null => Diff::MAX,
648 d => Diff::from(d.unwrap_int64()),
649 })
650 }
651 None => None,
652 };
653
654 if let Some(err) = Tr::ValOwn::into_error() {
655 for (datums, diff) in source.iter() {
656 if diff.is_positive() {
657 continue;
658 }
659 target.push((err((*datums).to_row()), Diff::ONE));
660 return;
661 }
662 }
663
664 let must_shrink = offset > 0
666 || limit
667 .map(|l| source.iter().map(|(_, d)| *d).sum::<Diff>() > l)
668 .unwrap_or(false);
669 if !must_shrink {
670 return;
671 }
672
673 target.reserve(source.len());
677 for (datums, diff) in source.iter() {
678 target.push((Tr::ValOwn::ok((*datums).to_row()), -diff));
679 }
680 let mut offset = offset;
682 let mut limit = limit;
683
684 let mut indexes = (0..source.len()).collect::<Vec<_>>();
686 let temp_storage = mz_repr::RowArena::new();
689 let mut buffer = datum_vec.borrow();
690 for (index, (datums, _)) in source.iter().enumerate() {
691 datums.extend_datums(&temp_storage, &mut buffer, None);
692 assert_eq!(buffer.len(), arity * (index + 1));
693 }
694 let width = buffer.len() / source.len();
695
696 indexes.sort_by(|left, right| {
698 let left = &buffer[left * width..][..width];
699 let right = &buffer[right * width..][..width];
700 mz_expr::compare_columns(&order_key, left, right, || left.cmp(right))
703 });
704
705 for index in indexes.into_iter() {
708 let (datums, mut diff) = source[index];
709 if !diff.is_positive() {
710 continue;
711 }
712 if offset > 0 {
714 let to_skip =
715 std::cmp::min(offset, usize::try_from(diff.into_inner()).unwrap());
716 offset -= to_skip;
717 diff -= Diff::try_from(to_skip).unwrap();
718 }
719 if let Some(limit) = &mut limit {
721 diff = std::cmp::min(diff, Diff::from(*limit));
722 *limit -= diff;
723 }
724 if diff.is_positive() {
726 target.push((Tr::ValOwn::ok(datums.to_row()), diff));
729 }
730 }
731 }
732 });
733 (arranged, reduced)
734}
735
736fn render_intra_ts_thinning<'s, T>(
737 collection: VecCollection<'s, T, (Row, Row), Diff>,
738 order_key: Vec<mz_expr::ColumnOrder>,
739 limit: mz_expr::MirScalarExpr,
740) -> VecCollection<'s, T, (Row, Row), Diff>
741where
742 T: timely::progress::Timestamp + Lattice,
743{
744 let mut datum_vec = mz_repr::DatumVec::new();
745
746 let mut aggregates = BTreeMap::new();
747 let shared = Rc::new(RefCell::new(monoids::Top1MonoidShared {
748 order_key,
749 left: DatumVec::new(),
750 right: DatumVec::new(),
751 }));
752 collection
753 .inner
754 .unary_notify(
755 Pipeline,
756 "TopKIntraTimeThinning",
757 [],
758 move |input, output, notificator| {
759 input.for_each_time(|time, data| {
760 let agg_time = aggregates
761 .entry(time.time().clone())
762 .or_insert_with(BTreeMap::new);
763 for ((grp_row, row), record_time, diff) in data.flat_map(|data| data.drain(..))
764 {
765 let monoid = monoids::Top1MonoidLocal {
766 row,
767 shared: Rc::clone(&shared),
768 };
769
770 let limit = if let Some(l) = limit.as_literal_int64() {
772 l
773 } else {
774 let temp_storage = mz_repr::RowArena::new();
775 let key_datums = datum_vec.borrow_with(&grp_row);
776 let datum_limit = limit
779 .eval(&key_datums, &temp_storage)
780 .unwrap_or(mz_repr::Datum::Int64(0));
781 if datum_limit == Datum::Null {
782 i64::MAX
783 } else {
784 datum_limit.unwrap_int64()
785 }
786 };
787
788 let topk = agg_time
789 .entry((grp_row, record_time))
790 .or_insert_with(move || topk_agg::TopKBatch::new(limit));
791 topk.update(monoid, diff.into_inner());
792 }
793 notificator.notify_at(time.retain(0));
794 });
795
796 notificator.for_each(|time, _, _| {
797 if let Some(aggs) = aggregates.remove(time.time()) {
798 let mut session = output.session(&time);
799 for ((grp_row, record_time), topk) in aggs {
800 session.give_iterator(topk.into_iter().map(|(monoid, diff)| {
801 (
802 (grp_row.clone(), monoid.into_row()),
803 record_time.clone(),
804 diff.into(),
805 )
806 }))
807 }
808 }
809 });
810 },
811 )
812 .as_collection()
813}
814
815pub mod topk_agg {
817 use differential_dataflow::consolidation;
818 use smallvec::SmallVec;
819
820 pub struct TopKBatch<T> {
827 updates: SmallVec<[(T, i64); 16]>,
828 clean: usize,
829 limit: i64,
830 }
831
832 impl<T: Ord> TopKBatch<T> {
833 pub fn new(limit: i64) -> Self {
834 Self {
835 updates: SmallVec::new(),
836 clean: 0,
837 limit,
838 }
839 }
840
841 #[inline]
848 pub fn update(&mut self, item: T, value: i64) {
849 self.updates.push((item, value));
850 self.maintain_bounds();
851 }
852
853 #[inline]
859 pub fn compact(&mut self) {
860 if self.clean < self.updates.len() && self.updates.len() > 1 {
861 let len = consolidation::consolidate_slice(&mut self.updates);
862 self.updates.truncate(len);
863
864 let mut limit = self.limit;
866 self.updates.retain(|x| {
867 if limit > 0 {
868 limit -= x.1;
869 true
870 } else {
871 false
872 }
873 });
874 if limit < 0 {
882 if let Some(item) = self.updates.last_mut() {
883 item.1 -= -limit;
886 }
887 }
888 }
889 self.clean = self.updates.len();
890 }
891
892 fn maintain_bounds(&mut self) {
895 if self.updates.len() > 32 && self.updates.len() >> 1 >= self.clean {
897 self.compact()
898 }
899 }
900 }
901
902 impl<T: Ord> IntoIterator for TopKBatch<T> {
903 type Item = (T, i64);
904 type IntoIter = smallvec::IntoIter<[(T, i64); 16]>;
905
906 fn into_iter(mut self) -> Self::IntoIter {
907 self.compact();
908 self.updates.into_iter()
909 }
910 }
911}
912
913pub mod monoids {
915 use std::cell::RefCell;
916 use std::cmp::Ordering;
917 use std::hash::{Hash, Hasher};
918 use std::rc::Rc;
919
920 use columnation::{Columnation, Region};
921 use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
922 use mz_expr::ColumnOrder;
923 use mz_repr::{DatumVec, Diff, Row};
924 use serde::{Deserialize, Serialize};
925
926 #[derive(Eq, PartialEq, Debug, Serialize, Deserialize, Hash, Default)]
928 pub struct Top1Monoid {
929 pub row: Row,
930 pub order_key: Vec<ColumnOrder>,
931 }
932
933 impl Clone for Top1Monoid {
934 #[inline]
935 fn clone(&self) -> Self {
936 Self {
937 row: self.row.clone(),
938 order_key: self.order_key.clone(),
939 }
940 }
941
942 #[inline]
943 fn clone_from(&mut self, source: &Self) {
944 self.row.clone_from(&source.row);
945 self.order_key.clone_from(&source.order_key);
946 }
947 }
948
949 impl Multiply<Diff> for Top1Monoid {
950 type Output = Self;
951
952 fn multiply(self, factor: &Diff) -> Self {
953 assert!(factor.is_positive());
958 self
959 }
960 }
961
962 impl Ord for Top1Monoid {
963 fn cmp(&self, other: &Self) -> Ordering {
964 debug_assert_eq!(self.order_key, other.order_key);
965
966 let left: Vec<_> = self.row.unpack();
969 let right: Vec<_> = other.row.unpack();
970 mz_expr::compare_columns(&self.order_key, &left, &right, || left.cmp(&right))
971 }
972 }
973 impl PartialOrd for Top1Monoid {
974 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
975 Some(self.cmp(other))
976 }
977 }
978
979 impl Semigroup for Top1Monoid {
980 fn plus_equals(&mut self, rhs: &Self) {
981 let cmp = (*self).cmp(rhs);
982 if cmp == Ordering::Greater {
984 self.clone_from(rhs);
985 }
986 }
987 }
988
989 impl IsZero for Top1Monoid {
990 fn is_zero(&self) -> bool {
991 false
992 }
993 }
994
995 impl Columnation for Top1Monoid {
996 type InnerRegion = Top1MonoidRegion;
997 }
998
999 #[derive(Default)]
1000 pub struct Top1MonoidRegion {
1001 row_region: <Row as Columnation>::InnerRegion,
1002 order_key_region: <Vec<ColumnOrder> as Columnation>::InnerRegion,
1003 }
1004
1005 impl Region for Top1MonoidRegion {
1006 type Item = Top1Monoid;
1007
1008 unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
1009 let row = unsafe { self.row_region.copy(&item.row) };
1010 let order_key = unsafe { self.order_key_region.copy(&item.order_key) };
1011 Self::Item { row, order_key }
1012 }
1013
1014 fn clear(&mut self) {
1015 self.row_region.clear();
1016 self.order_key_region.clear();
1017 }
1018
1019 fn reserve_items<'a, I>(&mut self, items1: I)
1020 where
1021 Self: 'a,
1022 I: Iterator<Item = &'a Self::Item> + Clone,
1023 {
1024 let items2 = items1.clone();
1025 self.row_region
1026 .reserve_items(items1.into_iter().map(|s| &s.row));
1027 self.order_key_region
1028 .reserve_items(items2.into_iter().map(|s| &s.order_key));
1029 }
1030
1031 fn reserve_regions<'a, I>(&mut self, regions1: I)
1032 where
1033 Self: 'a,
1034 I: Iterator<Item = &'a Self> + Clone,
1035 {
1036 let regions2 = regions1.clone();
1037 self.row_region
1038 .reserve_regions(regions1.into_iter().map(|s| &s.row_region));
1039 self.order_key_region
1040 .reserve_regions(regions2.into_iter().map(|s| &s.order_key_region));
1041 }
1042
1043 fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
1044 self.row_region.heap_size(&mut callback);
1045 self.order_key_region.heap_size(callback);
1046 }
1047 }
1048
1049 #[derive(Debug)]
1051 pub struct Top1MonoidShared {
1052 pub order_key: Vec<ColumnOrder>,
1053 pub left: DatumVec,
1054 pub right: DatumVec,
1055 }
1056
1057 #[derive(Debug, Clone)]
1060 pub struct Top1MonoidLocal {
1061 pub row: Row,
1062 pub shared: Rc<RefCell<Top1MonoidShared>>,
1063 }
1064
1065 impl Top1MonoidLocal {
1066 pub fn into_row(self) -> Row {
1067 self.row
1068 }
1069 }
1070
1071 impl PartialEq for Top1MonoidLocal {
1072 fn eq(&self, other: &Self) -> bool {
1073 self.row.eq(&other.row)
1074 }
1075 }
1076
1077 impl Eq for Top1MonoidLocal {}
1078
1079 impl Hash for Top1MonoidLocal {
1080 fn hash<H: Hasher>(&self, state: &mut H) {
1081 self.row.hash(state);
1082 }
1083 }
1084
1085 impl Ord for Top1MonoidLocal {
1086 fn cmp(&self, other: &Self) -> Ordering {
1087 debug_assert!(Rc::ptr_eq(&self.shared, &other.shared));
1088 let Top1MonoidShared {
1089 left,
1090 right,
1091 order_key,
1092 } = &mut *self.shared.borrow_mut();
1093
1094 let left = left.borrow_with(&self.row);
1095 let right = right.borrow_with(&other.row);
1096 mz_expr::compare_columns(order_key, &left, &right, || left.cmp(&right))
1097 }
1098 }
1099
1100 impl PartialOrd for Top1MonoidLocal {
1101 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1102 Some(self.cmp(other))
1103 }
1104 }
1105
1106 impl Semigroup for Top1MonoidLocal {
1107 fn plus_equals(&mut self, rhs: &Self) {
1108 let cmp = (*self).cmp(rhs);
1109 if cmp == Ordering::Greater {
1111 self.clone_from(rhs);
1112 }
1113 }
1114 }
1115
1116 impl IsZero for Top1MonoidLocal {
1117 fn is_zero(&self) -> bool {
1118 false
1119 }
1120 }
1121}