Skip to main content

mz_compute/render/
top_k.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! TopK execution logic.
11//!
12//! Consult [TopKPlan] documentation for details.
13
14use std::cell::RefCell;
15use std::collections::BTreeMap;
16use std::rc::Rc;
17
18use differential_dataflow::AsCollection;
19use differential_dataflow::hashable::Hashable;
20use differential_dataflow::lattice::Lattice;
21use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
22use differential_dataflow::operators::iterate::Variable as SemigroupVariable;
23use differential_dataflow::trace::implementations::BatchContainer;
24use differential_dataflow::trace::{Builder, Trace};
25use differential_dataflow::{Data, VecCollection};
26use mz_compute_types::dyncfgs::{ENABLE_COMPUTE_TEMPORAL_BUCKETING, TEMPORAL_BUCKETING_SUMMARY};
27use mz_compute_types::plan::ArrangementStrategy;
28use mz_compute_types::plan::top_k::{
29    BasicTopKPlan, MonotonicTop1Plan, MonotonicTopKPlan, TopKPlan,
30};
31use mz_expr::func::CastUint64ToInt64;
32use mz_expr::{BinaryFunc, Columns, Eval, EvalError, MirScalarExpr, UnaryFunc, func};
33use mz_ore::cast::CastFrom;
34use mz_ore::soft_assert_or_log;
35use mz_repr::{Datum, DatumVec, Diff, ReprScalarType, Row, SharedRow};
36use mz_timely_util::columnation::ColumnationChunker;
37use mz_timely_util::operator::CollectionExt;
38use timely::Container;
39use timely::container::{CapacityContainerBuilder, PushInto};
40use timely::dataflow::channels::pact::Pipeline;
41use timely::dataflow::operators::Operator;
42
43use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
44use crate::extensions::reduce::{ClearContainer, MzReduce};
45use crate::render::Pairer;
46use crate::render::context::{CollectionBundle, Context};
47use crate::render::errors::DataflowErrorSer;
48use crate::render::errors::MaybeValidatingRow;
49use crate::typedefs::{KeyBatcher, MzTimestamp, RowRowSpine, RowSpine};
50use mz_row_spine::{
51    DatumSeq, RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowValBuilder, RowValSpine,
52};
53
54// The implementation requires integer timestamps to be able to delay feedback for monotonic inputs.
55impl<'scope, T: crate::render::RenderTimestamp + crate::render::MaybeBucketByTime>
56    Context<'scope, T>
57{
58    pub(crate) fn render_topk(
59        &self,
60        input: CollectionBundle<'scope, T>,
61        top_k_plan: TopKPlan,
62        temporal_bucketing_strategy: ArrangementStrategy,
63    ) -> CollectionBundle<'scope, T> {
64        let (ok_input, err_input) = input.as_specific_collection(None, &self.config_set);
65
66        // Bucket the per-row input stream when lowering chose `TemporalBucketing`.
67        // `TopK` builds its own arrangement(s) inside the variants below, bypassing
68        // `ensure_collections`, so the strategy is plumbed through `PlanNode::TopK`
69        // rather than inferred at the arrangement site. `apply_bucketing_strategy`
70        // is a no-op for `Direct`.
71        //
72        // Note: a `MonotonicTop1Plan`/`MonotonicTopKPlan` with `must_consolidate =
73        // false` together with `TemporalBucketing` here would mean we install a
74        // bucket operator with no downstream consolidator -- pure overhead. That
75        // combination cannot actually occur: `RelaxMustConsolidate` (which is the
76        // only writer of `must_consolidate = false`) runs only on single-time
77        // dataflows (one-shot peeks / `COPY TO`), and in single-time dataflows
78        // `ExprPrepOneShot` constant-folds `mz_now()` to the dataflow `as_of`
79        // before lowering, so no temporal predicates survive into LIR and
80        // `has_future_updates` is `false` everywhere -- meaning no operator (TopK
81        // included) is ever lowered with `TemporalBucketing`. The assertion below
82        // pins down this invariant.
83        if matches!(
84            temporal_bucketing_strategy,
85            ArrangementStrategy::TemporalBucketing
86        ) {
87            let must_consolidate = match &top_k_plan {
88                TopKPlan::MonotonicTop1(p) => p.must_consolidate,
89                TopKPlan::MonotonicTopK(p) => p.must_consolidate,
90                TopKPlan::Basic(_) => true,
91            };
92            soft_assert_or_log!(
93                must_consolidate,
94                "TopK with `TemporalBucketing` should not have `must_consolidate = false`; \
95                 `RelaxMustConsolidate` only runs on single-time dataflows where \
96                 `mz_now()` has been const-folded and no temporal bucketing is set",
97            );
98        }
99        let ok_input = if matches!(
100            temporal_bucketing_strategy,
101            ArrangementStrategy::TemporalBucketing
102        ) && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(&self.config_set)
103        {
104            let summary: mz_repr::Timestamp = TEMPORAL_BUCKETING_SUMMARY
105                .get(&self.config_set)
106                .try_into()
107                .expect("must fit");
108            T::maybe_apply_temporal_bucketing(ok_input.inner, self.as_of_frontier.clone(), summary)
109        } else {
110            ok_input
111        };
112
113        // We create a new region to compartmentalize the topk logic.
114        let outer_scope = ok_input.scope();
115        let (ok_result, err_collection) = outer_scope.clone().region_named("TopK", |inner| {
116            let ok_input = ok_input.enter_region(inner);
117            let mut err_collection = err_input.enter_region(inner);
118
119            // Determine if there should be errors due to limit evaluation; update `err_collection`.
120            // TODO(vmarcos): We evaluate the limit expression below for each input update. There
121            // is an opportunity to do so for every group key instead if the error handling is
122            // integrated with: 1. The intra-timestamp thinning step in monotonic top-k, e.g., by
123            // adding an error output there; 2. The validating reduction on basic top-k
124            // (database-issues#7108).
125
126            match top_k_plan.limit().map(|l| (l.as_literal(), l)) {
127                None => {}
128                Some((Some(Ok(literal)), _))
129                    if literal == Datum::Null || literal.unwrap_int64() >= 0 => {}
130                Some((_, expr)) => {
131                    // Produce errors from limit selectors that error or are
132                    // negative, and nothing from limit selectors that do
133                    // not. Note that even if expr.could_error() is false,
134                    // the expression might still return a negative limit and
135                    // thus needs to be checked.
136                    let expr = expr.clone();
137                    let mut datum_vec = mz_repr::DatumVec::new();
138                    let errors = ok_input.clone().flat_map(move |row| {
139                        let temp_storage = mz_repr::RowArena::new();
140                        let datums = datum_vec.borrow_with(&row);
141                        match expr.eval(&datums[..], &temp_storage) {
142                            Ok(l) if l != Datum::Null && l.unwrap_int64() < 0 => {
143                                Some(EvalError::NegLimit.into())
144                            }
145                            Ok(_) => None,
146                            Err(e) => Some(e.into()),
147                        }
148                    });
149                    err_collection = err_collection.concat(errors);
150                }
151            }
152
153            let ok_result = match top_k_plan {
154                TopKPlan::MonotonicTop1(MonotonicTop1Plan {
155                    group_key,
156                    order_key,
157                    must_consolidate,
158                }) => {
159                    let (oks, errs) = self.render_top1_monotonic(
160                        ok_input,
161                        group_key,
162                        order_key,
163                        must_consolidate,
164                    );
165                    err_collection = err_collection.concat(errs);
166                    oks
167                }
168                TopKPlan::MonotonicTopK(MonotonicTopKPlan {
169                    order_key,
170                    group_key,
171                    arity,
172                    mut limit,
173                    must_consolidate,
174                }) => {
175                    // Must permute `limit` to reference `group_key` elements as if in order.
176                    if let Some(expr) = limit.as_mut() {
177                        let mut map = BTreeMap::new();
178                        for (index, column) in group_key.iter().enumerate() {
179                            map.insert(*column, index);
180                        }
181                        expr.permute_map(&map);
182                    }
183
184                    // Map the group key along with the row and consolidate if required to do so.
185                    let mut datum_vec = mz_repr::DatumVec::new();
186                    let ok_scope = ok_input.scope();
187                    let collection = ok_input
188                        .map(move |row| {
189                            let group_row = {
190                                let datums = datum_vec.borrow_with(&row);
191                                SharedRow::pack(group_key.iter().map(|i| datums[*i]))
192                            };
193                            (group_row, row)
194                        })
195                        .consolidate_named_if::<KeyBatcher<_, _, _>>(
196                            must_consolidate,
197                            "Consolidated MonotonicTopK input",
198                        );
199
200                    // It should be now possible to ensure that we have a monotonic collection.
201                    let error_logger = self.error_logger();
202                    let (collection, errs) = collection.ensure_monotonic(move |data, diff| {
203                        error_logger.log(
204                            "Non-monotonic input to MonotonicTopK",
205                            &format!("data={data:?}, diff={diff}"),
206                        );
207                        let m = "tried to build monotonic top-k on non-monotonic input".into();
208                        (DataflowErrorSer::from(EvalError::Internal(m)), Diff::ONE)
209                    });
210                    err_collection = err_collection.concat(errs);
211
212                    // For monotonic inputs, we are able to thin the input relation in two stages:
213                    // 1. First, we can do an intra-timestamp thinning which has the advantage of
214                    //    being computed in a streaming fashion, even for the initial snapshot.
215                    // 2. Then, we can do inter-timestamp thinning by feeding back negations for
216                    //    any records that have been invalidated.
217                    let collection = if let Some(limit) = limit.clone() {
218                        render_intra_ts_thinning(collection, order_key.clone(), limit)
219                    } else {
220                        collection
221                    };
222
223                    let pairer = Pairer::new(1);
224                    let collection = collection.map(move |(group_row, row)| {
225                        let hash = row.hashed();
226                        let hash_key = pairer.merge(std::iter::once(Datum::from(hash)), &group_row);
227                        (hash_key, row)
228                    });
229
230                    // For monotonic inputs, we are able to retract inputs that can no longer be produced
231                    // as outputs. Any inputs beyond `offset + limit` will never again be produced as
232                    // outputs, and can be removed. The simplest form of this is when `offset == 0` and
233                    // these removable records are those in the input not produced in the output.
234                    // TODO: consider broadening this optimization to `offset > 0` by first filtering
235                    // down to `offset = 0` and `limit = offset + limit`, followed by a finishing act
236                    // of `offset` and `limit`, discarding only the records not produced in the intermediate
237                    // stage.
238                    let delay = std::time::Duration::from_secs(10);
239                    let (retractions_var, retractions) = SemigroupVariable::new(
240                        ok_scope,
241                        <T as crate::render::RenderTimestamp>::system_delay(
242                            delay.try_into().expect("must fit"),
243                        ),
244                    );
245                    let thinned = collection.clone().concat(retractions.negate());
246
247                    // As an additional optimization, we can skip creating the full topk hierachy
248                    // here since we now have an upper bound on the number records due to the
249                    // intra-ts thinning. The maximum number of records per timestamp is
250                    // (num_workers * limit), which we expect to be a small number and so we render
251                    // a single topk stage.
252                    let (result, errs) =
253                        self.build_topk_stage(thinned, order_key, 1u64, 0, limit, arity, false);
254                    // Consolidate the output of `build_topk_stage` because it's not guaranteed to be.
255                    let result = CollectionExt::consolidate_named::<KeyBatcher<_, _, _>>(
256                        result,
257                        "Monotonic TopK final consolidate",
258                    );
259                    retractions_var.set(collection.concat(result.clone().negate()));
260                    soft_assert_or_log!(
261                        errs.is_none(),
262                        "requested no validation, but received error collection"
263                    );
264
265                    result.map(|(_key_hash, row)| row)
266                }
267                TopKPlan::Basic(BasicTopKPlan {
268                    group_key,
269                    order_key,
270                    offset,
271                    mut limit,
272                    arity,
273                    buckets,
274                }) => {
275                    // Must permute `limit` to reference `group_key` elements as if in order.
276                    if let Some(expr) = limit.as_mut() {
277                        let mut map = BTreeMap::new();
278                        for (index, column) in group_key.iter().enumerate() {
279                            map.insert(*column, index);
280                        }
281                        expr.permute_map(&map);
282                    }
283
284                    let (oks, errs) = self.build_topk(
285                        ok_input, group_key, order_key, offset, limit, arity, buckets,
286                    );
287                    err_collection = err_collection.concat(errs);
288                    oks
289                }
290            };
291
292            // Extract the results from the region.
293            (
294                ok_result.leave_region(outer_scope),
295                err_collection.leave_region(outer_scope),
296            )
297        });
298
299        CollectionBundle::from_collections(ok_result, err_collection)
300    }
301
302    /// Constructs a TopK dataflow subgraph.
303    fn build_topk<'s>(
304        &self,
305        collection: VecCollection<'s, T, Row, Diff>,
306        group_key: Vec<usize>,
307        order_key: Vec<mz_expr::ColumnOrder>,
308        offset: usize,
309        limit: Option<mz_expr::MirScalarExpr>,
310        arity: usize,
311        buckets: Vec<u64>,
312    ) -> (
313        VecCollection<'s, T, Row, Diff>,
314        VecCollection<'s, T, DataflowErrorSer, Diff>,
315    ) {
316        let pairer = Pairer::new(1);
317        let mut datum_vec = mz_repr::DatumVec::new();
318        let mut collection = collection.map({
319            move |row| {
320                let group_row = {
321                    let row_hash = row.hashed();
322                    let datums = datum_vec.borrow_with(&row);
323                    let iterator = group_key.iter().map(|i| datums[*i]);
324                    pairer.merge(std::iter::once(Datum::from(row_hash)), iterator)
325                };
326                (group_row, row)
327            }
328        });
329
330        let mut validating = true;
331        let mut err_collection: Option<VecCollection<'s, T, _, _>> = None;
332
333        if let Some(mut limit) = limit.clone() {
334            // We may need a new `limit` that reflects the addition of `offset`.
335            // Ideally we compile it down to a literal if at all possible.
336            if offset > 0 {
337                let new_limit = (|| {
338                    let limit = limit.as_literal_int64()?;
339                    let offset = i64::try_from(offset).ok()?;
340                    limit.checked_add(offset)
341                })();
342
343                if let Some(new_limit) = new_limit {
344                    limit =
345                        MirScalarExpr::literal_ok(Datum::Int64(new_limit), ReprScalarType::Int64);
346                } else {
347                    limit = limit.call_binary(
348                        MirScalarExpr::literal_ok(
349                            Datum::UInt64(u64::cast_from(offset)),
350                            ReprScalarType::UInt64,
351                        )
352                        .call_unary(UnaryFunc::CastUint64ToInt64(CastUint64ToInt64)),
353                        BinaryFunc::AddInt64(func::AddInt64),
354                    );
355                }
356            }
357
358            // These bucket values define the shifts that happen to the 64 bit hash of the
359            // record, and should have the properties that 1. there are not too many of them,
360            // and 2. each has a modest difference to the next.
361            for bucket in buckets.into_iter() {
362                // here we do not apply `offset`, but instead restrict ourself with a limit
363                // that includes the offset. We cannot apply `offset` until we perform the
364                // final, complete reduction.
365                let (oks, errs) = self.build_topk_stage(
366                    collection,
367                    order_key.clone(),
368                    bucket,
369                    0,
370                    Some(limit.clone()),
371                    arity,
372                    validating,
373                );
374                collection = oks;
375                if validating {
376                    err_collection = errs;
377                    validating = false;
378                }
379            }
380        }
381
382        // We do a final step, both to make sure that we complete the reduction, and to correctly
383        // apply `offset` to the final group, as we have not yet been applying it to the partially
384        // formed groups.
385        let (oks, errs) = self.build_topk_stage(
386            collection, order_key, 1u64, offset, limit, arity, validating,
387        );
388        // Consolidate the output of `build_topk_stage` because it's not guaranteed to be.
389        let oks =
390            CollectionExt::consolidate_named::<KeyBatcher<_, _, _>>(oks, "TopK final consolidate");
391        collection = oks;
392        if validating {
393            err_collection = errs;
394        }
395        (
396            collection.map(|(_key_hash, row)| row),
397            err_collection.expect("at least one stage validated its inputs"),
398        )
399    }
400
401    /// To provide a robust incremental orderby-limit experience, we want to avoid grouping *all*
402    /// records (or even large groups) and then applying the ordering and limit. Instead, a more
403    /// robust approach forms groups of bounded size and applies the offset and limit to each,
404    /// and then increases the sizes of the groups.
405    ///
406    /// Builds a "stage", which uses a finer grouping than is required to reduce the volume of
407    /// updates, and to reduce the amount of work on the critical path for updates. The cost is
408    /// a larger number of arrangements when this optimization does nothing beneficial.
409    ///
410    /// The function accepts a collection of the form `(hash_key, row)`, a modulus it applies to the
411    /// `hash_key`'s hash datum, an `offset` for returning results, and a `limit` to restrict the
412    /// output size. `arity` represents the number of columns in the input data, and
413    /// if `validating` is true, we check for negative multiplicities, which indicate
414    /// an error in the input data.
415    ///
416    /// The output of this function is _not consolidated_.
417    ///
418    /// The dataflow fragment has the following shape:
419    /// ```text
420    ///     | input
421    ///     |
422    ///   arrange
423    ///     |\
424    ///     | \
425    ///     |  reduce
426    ///     |  |
427    ///     concat
428    ///     |
429    ///     | output
430    /// ```
431    /// There are additional map/flat_map operators as well as error demuxing operators, but we're
432    /// omitting them here for the sake of simplicity.
433    fn build_topk_stage<'s>(
434        &self,
435        collection: VecCollection<'s, T, (Row, Row), Diff>,
436        order_key: Vec<mz_expr::ColumnOrder>,
437        modulus: u64,
438        offset: usize,
439        limit: Option<mz_expr::MirScalarExpr>,
440        arity: usize,
441        validating: bool,
442    ) -> (
443        VecCollection<'s, T, (Row, Row), Diff>,
444        Option<VecCollection<'s, T, DataflowErrorSer, Diff>>,
445    ) {
446        // Form appropriate input by updating the `hash` column (first datum in `hash_key`) by
447        // applying `modulus`.
448        let input = collection.map(move |(hash_key, row)| {
449            let mut hash_key_iter = hash_key.iter();
450            let hash = hash_key_iter.next().unwrap().unwrap_uint64() % modulus;
451            let hash_key = SharedRow::pack(std::iter::once(hash.into()).chain(hash_key_iter));
452            (hash_key, row)
453        });
454
455        // If validating: demux errors, otherwise we cannot produce errors.
456        let (input, oks, errs) = if validating {
457            // Build topk stage, produce errors for invalid multiplicities.
458            let (input, stage) = build_topk_negated_stage::<
459                T,
460                RowValBuilder<_, _, _>,
461                RowValSpine<Result<Row, Row>, _, _>,
462            >(&input, order_key, offset, limit, arity);
463            let stage = stage.as_collection(|k, v| (k.to_row(), v.clone()));
464
465            // Demux oks and errors.
466            let error_logger = self.error_logger();
467            type CB<C> = CapacityContainerBuilder<C>;
468            let (oks, errs) = stage.map_fallible::<CB<_>, CB<_>, _, _, _>(
469                "Demuxing Errors",
470                move |(hk, result)| match result {
471                    Err(v) => {
472                        let mut hk_iter = hk.iter();
473                        let h = hk_iter.next().unwrap().unwrap_uint64();
474                        let k = SharedRow::pack(hk_iter);
475                        let message = "Negative multiplicities in TopK";
476                        error_logger.log(message, &format!("k={k:?}, h={h}, v={v:?}"));
477                        Err(EvalError::Internal(message.into()).into())
478                    }
479                    Ok(t) => Ok((hk, t)),
480                },
481            );
482            (input, oks, Some(errs))
483        } else {
484            // Build non-validating topk stage.
485            let (input, stage) =
486                build_topk_negated_stage::<T, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
487                    &input, order_key, offset, limit, arity,
488                );
489            // Turn arrangement into collection.
490            let stage = stage.as_collection(|k, v| (k.to_row(), v.to_row()));
491
492            (input, stage, None)
493        };
494        let input = input.as_collection(|k, v| (k.to_row(), v.to_row()));
495        (oks.concat(input), errs)
496    }
497
498    fn render_top1_monotonic<'s>(
499        &self,
500        collection: VecCollection<'s, T, Row, Diff>,
501        group_key: Vec<usize>,
502        order_key: Vec<mz_expr::ColumnOrder>,
503        must_consolidate: bool,
504    ) -> (
505        VecCollection<'s, T, Row, Diff>,
506        VecCollection<'s, T, DataflowErrorSer, Diff>,
507    ) {
508        // We can place our rows directly into the diff field, and only keep the relevant one
509        // corresponding to evaluating our aggregate, instead of having to do a hierarchical
510        // reduction. We start by mapping the group key along with the row and consolidating
511        // if required to do so.
512        let collection = collection
513            .map({
514                let mut datum_vec = mz_repr::DatumVec::new();
515                move |row| {
516                    // Scoped to allow borrow of `row` to drop.
517                    let group_key = {
518                        let datums = datum_vec.borrow_with(&row);
519                        SharedRow::pack(group_key.iter().map(|i| datums[*i]))
520                    };
521                    (group_key, row)
522                }
523            })
524            .consolidate_named_if::<KeyBatcher<_, _, _>>(
525                must_consolidate,
526                "Consolidated MonotonicTop1 input",
527            );
528
529        // It should be now possible to ensure that we have a monotonic collection and process it.
530        let error_logger = self.error_logger();
531        let (partial, errs) = collection.ensure_monotonic(move |data, diff| {
532            error_logger.log(
533                "Non-monotonic input to MonotonicTop1",
534                &format!("data={data:?}, diff={diff}"),
535            );
536            let m = "tried to build monotonic top-1 on non-monotonic input".into();
537            (EvalError::Internal(m).into(), Diff::ONE)
538        });
539        let partial: KeyCollection<_, _, _> = partial
540            .explode_one(move |(group_key, row)| {
541                (
542                    group_key,
543                    monoids::Top1Monoid {
544                        row,
545                        order_key: order_key.clone(),
546                    },
547                )
548            })
549            .into();
550        let result = partial
551            .mz_arrange::<
552                ColumnationChunker<_>,
553                RowBatcher<_, _>,
554                RowBuilder<_, _>,
555                RowSpine<_, _>,
556            >(
557                "Arranged MonotonicTop1 partial [val: empty]",
558            )
559            .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
560                "MonotonicTop1",
561                move |_key, input, output| {
562                    let accum: &monoids::Top1Monoid = &input[0].1;
563                    output.push((accum.row.clone(), Diff::ONE));
564                },
565            );
566        // TODO(database-issues#2288): Here we discard the arranged output.
567        (result.as_collection(|_k, v| v.to_row()), errs)
568    }
569}
570
571/// Build a stage of a topk reduction. Maintains the _retractions_ of the output instead of emitted
572/// rows. This has the benefit that we have to maintain state proportionally to size of the output
573/// instead of the size of the input.
574///
575/// Returns two arrangements:
576/// * The arranged input data without modifications, and
577/// * the maintained negated output data.
578fn build_topk_negated_stage<'s, T, Bu, Tr>(
579    input: &VecCollection<'s, T, (Row, Row), Diff>,
580    order_key: Vec<mz_expr::ColumnOrder>,
581    offset: usize,
582    limit: Option<mz_expr::MirScalarExpr>,
583    arity: usize,
584) -> (
585    Arranged<'s, TraceAgent<RowRowSpine<T, Diff>>>,
586    Arranged<'s, TraceAgent<Tr>>,
587)
588where
589    T: MzTimestamp,
590    Bu: Builder<
591            Time = T,
592            Input: Container + ClearContainer + PushInto<((Row, Tr::ValOwn), T, Diff)>,
593            Output = Tr::Batch,
594        >,
595    Tr: for<'a> Trace<
596            Key<'a> = DatumSeq<'a>,
597            KeyContainer: BatchContainer<Owned = Row>,
598            ValOwn: Data + MaybeValidatingRow<Row, Row>,
599            Time = T,
600            Diff = Diff,
601        > + 'static,
602    Arranged<'s, TraceAgent<Tr>>: ArrangementSize,
603{
604    let mut datum_vec = mz_repr::DatumVec::new();
605
606    // We only want to arrange parts of the input that are not part of the actual output
607    // such that `input.concat(&negated_output)` yields the correct TopK
608    // NOTE(vmarcos): The arranged input operator name below is used in the tuning advice
609    // built-in view mz_introspection.mz_expected_group_size_advice.
610    let arranged = input
611        .clone()
612        .mz_arrange::<
613            ColumnationChunker<_>,
614            RowRowBatcher<_, _>,
615            RowRowBuilder<_, _>,
616            RowRowSpine<_, _>,
617        >(
618            "Arranged TopK input",
619        );
620
621    // Eagerly evaluate literal limits.
622    let limit = limit.map(|l| match l.as_literal() {
623        Some(Ok(Datum::Null)) => Ok(Diff::MAX),
624        Some(Ok(d)) => Ok(Diff::from(d.unwrap_int64())),
625        _ => Err(l),
626    });
627
628    let reduced = arranged
629        .clone()
630        .mz_reduce_abelian::<_, Bu, Tr>("Reduced TopK input", {
631            move |mut hash_key, source, target: &mut Vec<(Tr::ValOwn, Diff)>| {
632                // Unpack the limit, either into an integer literal or an expression to evaluate.
633                let limit = match &limit {
634                    Some(Ok(lit)) => Some(*lit),
635                    Some(Err(expr)) => {
636                        // Unpack `key` after skipping the hash and determine the limit.
637                        // If the limit errors, use a zero limit; errors are surfaced elsewhere.
638                        let temp_storage = mz_repr::RowArena::new();
639                        let _hash = hash_key.next();
640                        let mut key_datums = datum_vec.borrow();
641                        key_datums.extend(hash_key);
642                        let datum_limit = expr
643                            .eval(&key_datums, &temp_storage)
644                            .unwrap_or(Datum::Int64(0));
645                        Some(match datum_limit {
646                            Datum::Null => Diff::MAX,
647                            d => Diff::from(d.unwrap_int64()),
648                        })
649                    }
650                    None => None,
651                };
652
653                if let Some(err) = Tr::ValOwn::into_error() {
654                    for (datums, diff) in source.iter() {
655                        if diff.is_positive() {
656                            continue;
657                        }
658                        target.push((err((*datums).to_row()), Diff::ONE));
659                        return;
660                    }
661                }
662
663                // Determine if we must actually shrink the result set.
664                let must_shrink = offset > 0
665                    || limit
666                        .map(|l| source.iter().map(|(_, d)| *d).sum::<Diff>() > l)
667                        .unwrap_or(false);
668                if !must_shrink {
669                    return;
670                }
671
672                // First go ahead and emit all records. Note that we ensure target
673                // has the capacity to hold at least these records, and avoid any
674                // dependencies on the user-provided (potentially unbounded) limit.
675                target.reserve(source.len());
676                for (datums, diff) in source.iter() {
677                    target.push((Tr::ValOwn::ok((*datums).to_row()), -diff));
678                }
679                // local copies that may count down to zero.
680                let mut offset = offset;
681                let mut limit = limit;
682
683                // The order in which we should produce rows.
684                let mut indexes = (0..source.len()).collect::<Vec<_>>();
685                // We decode the datums once, into a common buffer for efficiency.
686                // Each row should contain `arity` columns; we should check that.
687                let mut buffer = datum_vec.borrow();
688                for (index, (datums, _)) in source.iter().enumerate() {
689                    buffer.extend(*datums);
690                    assert_eq!(buffer.len(), arity * (index + 1));
691                }
692                let width = buffer.len() / source.len();
693
694                //todo: use arrangements or otherwise make the sort more performant?
695                indexes.sort_by(|left, right| {
696                    let left = &buffer[left * width..][..width];
697                    let right = &buffer[right * width..][..width];
698                    // Note: source was originally ordered by the u8 array representation
699                    // of rows, but left.cmp(right) uses Datum::cmp.
700                    mz_expr::compare_columns(&order_key, left, right, || left.cmp(right))
701                });
702
703                // We now need to lay out the data in order of `buffer`, but respecting
704                // the `offset` and `limit` constraints.
705                for index in indexes.into_iter() {
706                    let (datums, mut diff) = source[index];
707                    if !diff.is_positive() {
708                        continue;
709                    }
710                    // If we are still skipping early records ...
711                    if offset > 0 {
712                        let to_skip =
713                            std::cmp::min(offset, usize::try_from(diff.into_inner()).unwrap());
714                        offset -= to_skip;
715                        diff -= Diff::try_from(to_skip).unwrap();
716                    }
717                    // We should produce at most `limit` records.
718                    if let Some(limit) = &mut limit {
719                        diff = std::cmp::min(diff, Diff::from(*limit));
720                        *limit -= diff;
721                    }
722                    // Output the indicated number of rows.
723                    if diff.is_positive() {
724                        // Emit retractions for the elements actually part of
725                        // the set of TopK elements.
726                        target.push((Tr::ValOwn::ok(datums.to_row()), diff));
727                    }
728                }
729            }
730        });
731    (arranged, reduced)
732}
733
734fn render_intra_ts_thinning<'s, T>(
735    collection: VecCollection<'s, T, (Row, Row), Diff>,
736    order_key: Vec<mz_expr::ColumnOrder>,
737    limit: mz_expr::MirScalarExpr,
738) -> VecCollection<'s, T, (Row, Row), Diff>
739where
740    T: timely::progress::Timestamp + Lattice,
741{
742    let mut datum_vec = mz_repr::DatumVec::new();
743
744    let mut aggregates = BTreeMap::new();
745    let shared = Rc::new(RefCell::new(monoids::Top1MonoidShared {
746        order_key,
747        left: DatumVec::new(),
748        right: DatumVec::new(),
749    }));
750    collection
751        .inner
752        .unary_notify(
753            Pipeline,
754            "TopKIntraTimeThinning",
755            [],
756            move |input, output, notificator| {
757                input.for_each_time(|time, data| {
758                    let agg_time = aggregates
759                        .entry(time.time().clone())
760                        .or_insert_with(BTreeMap::new);
761                    for ((grp_row, row), record_time, diff) in data.flat_map(|data| data.drain(..))
762                    {
763                        let monoid = monoids::Top1MonoidLocal {
764                            row,
765                            shared: Rc::clone(&shared),
766                        };
767
768                        // Evalute the limit, first as a constant and then against the key if needed.
769                        let limit = if let Some(l) = limit.as_literal_int64() {
770                            l
771                        } else {
772                            let temp_storage = mz_repr::RowArena::new();
773                            let key_datums = datum_vec.borrow_with(&grp_row);
774                            // Unpack `key` and determine the limit.
775                            // If the limit errors, use a zero limit; errors are surfaced elsewhere.
776                            let datum_limit = limit
777                                .eval(&key_datums, &temp_storage)
778                                .unwrap_or(mz_repr::Datum::Int64(0));
779                            if datum_limit == Datum::Null {
780                                i64::MAX
781                            } else {
782                                datum_limit.unwrap_int64()
783                            }
784                        };
785
786                        let topk = agg_time
787                            .entry((grp_row, record_time))
788                            .or_insert_with(move || topk_agg::TopKBatch::new(limit));
789                        topk.update(monoid, diff.into_inner());
790                    }
791                    notificator.notify_at(time.retain(0));
792                });
793
794                notificator.for_each(|time, _, _| {
795                    if let Some(aggs) = aggregates.remove(time.time()) {
796                        let mut session = output.session(&time);
797                        for ((grp_row, record_time), topk) in aggs {
798                            session.give_iterator(topk.into_iter().map(|(monoid, diff)| {
799                                (
800                                    (grp_row.clone(), monoid.into_row()),
801                                    record_time.clone(),
802                                    diff.into(),
803                                )
804                            }))
805                        }
806                    }
807                });
808            },
809        )
810        .as_collection()
811}
812
813/// Types for in-place intra-ts aggregation of monotonic streams.
814pub mod topk_agg {
815    use differential_dataflow::consolidation;
816    use smallvec::SmallVec;
817
818    // TODO: This struct looks a lot like ChangeBatch and indeed its code is a modified version of
819    // that. It would be nice to find a way to reuse some or all of the code from there.
820    //
821    // Additionally, because we're calling into DD's consolidate method we are forced to work with
822    // the `Ord` trait which for the usage we do above means that we need to clone the `order_key`
823    // for each record. It would be nice to also remove the need for cloning that piece of data
824    pub struct TopKBatch<T> {
825        updates: SmallVec<[(T, i64); 16]>,
826        clean: usize,
827        limit: i64,
828    }
829
830    impl<T: Ord> TopKBatch<T> {
831        pub fn new(limit: i64) -> Self {
832            Self {
833                updates: SmallVec::new(),
834                clean: 0,
835                limit,
836            }
837        }
838
839        /// Adds a new update, for `item` with `value`.
840        ///
841        /// This could be optimized to perform compaction when the number of "dirty" elements exceeds
842        /// half the length of the list, which would keep the total footprint within reasonable bounds
843        /// even under an arbitrary number of updates. This has a cost, and it isn't clear whether it
844        /// is worth paying without some experimentation.
845        #[inline]
846        pub fn update(&mut self, item: T, value: i64) {
847            self.updates.push((item, value));
848            self.maintain_bounds();
849        }
850
851        /// Compact the internal representation.
852        ///
853        /// This method sort `self.updates` and consolidates elements with equal item, discarding
854        /// any whose accumulation is zero. It is optimized to only do this if the number of dirty
855        /// elements is non-zero.
856        #[inline]
857        pub fn compact(&mut self) {
858            if self.clean < self.updates.len() && self.updates.len() > 1 {
859                let len = consolidation::consolidate_slice(&mut self.updates);
860                self.updates.truncate(len);
861
862                // We can now retain only the first K records and throw away everything else
863                let mut limit = self.limit;
864                self.updates.retain(|x| {
865                    if limit > 0 {
866                        limit -= x.1;
867                        true
868                    } else {
869                        false
870                    }
871                });
872                // By the end of the loop above `limit` will either be:
873                // (a) Positive, in which case all updates were retained;
874                // (b) Zero, in which case we discarded all updates after limit became zero;
875                // (c) Negative, in which case the last record we retained had more copies
876                // than necessary. In this latter case, we need to do one final adjustment
877                // of the diff field of the last record so that the total sum of the diffs
878                // in the batch is K.
879                if limit < 0 {
880                    if let Some(item) = self.updates.last_mut() {
881                        // We are subtracting the limit *negated*, therefore we are subtracting a value
882                        // that is *greater* than or equal to zero, which represents the excess.
883                        item.1 -= -limit;
884                    }
885                }
886            }
887            self.clean = self.updates.len();
888        }
889
890        /// Maintain the bounds of pending (non-compacted) updates versus clean (compacted) data.
891        /// This function tries to minimize work by only compacting if enough work has accumulated.
892        fn maintain_bounds(&mut self) {
893            // if we have more than 32 elements and at least half of them are not clean, compact
894            if self.updates.len() > 32 && self.updates.len() >> 1 >= self.clean {
895                self.compact()
896            }
897        }
898    }
899
900    impl<T: Ord> IntoIterator for TopKBatch<T> {
901        type Item = (T, i64);
902        type IntoIter = smallvec::IntoIter<[(T, i64); 16]>;
903
904        fn into_iter(mut self) -> Self::IntoIter {
905            self.compact();
906            self.updates.into_iter()
907        }
908    }
909}
910
911/// Monoids for in-place compaction of monotonic streams.
912pub mod monoids {
913    use std::cell::RefCell;
914    use std::cmp::Ordering;
915    use std::hash::{Hash, Hasher};
916    use std::rc::Rc;
917
918    use columnation::{Columnation, Region};
919    use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
920    use mz_expr::ColumnOrder;
921    use mz_repr::{DatumVec, Diff, Row};
922    use serde::{Deserialize, Serialize};
923
924    /// A monoid containing a row and an ordering.
925    #[derive(Eq, PartialEq, Debug, Serialize, Deserialize, Hash, Default)]
926    pub struct Top1Monoid {
927        pub row: Row,
928        pub order_key: Vec<ColumnOrder>,
929    }
930
931    impl Clone for Top1Monoid {
932        #[inline]
933        fn clone(&self) -> Self {
934            Self {
935                row: self.row.clone(),
936                order_key: self.order_key.clone(),
937            }
938        }
939
940        #[inline]
941        fn clone_from(&mut self, source: &Self) {
942            self.row.clone_from(&source.row);
943            self.order_key.clone_from(&source.order_key);
944        }
945    }
946
947    impl Multiply<Diff> for Top1Monoid {
948        type Output = Self;
949
950        fn multiply(self, factor: &Diff) -> Self {
951            // Multiplication in Top1Monoid is idempotent, and its
952            // users must ascertain its monotonicity beforehand
953            // (typically with ensure_monotonic) since it has no zero
954            // value for us to use here.
955            assert!(factor.is_positive());
956            self
957        }
958    }
959
960    impl Ord for Top1Monoid {
961        fn cmp(&self, other: &Self) -> Ordering {
962            debug_assert_eq!(self.order_key, other.order_key);
963
964            // It might be nice to cache this row decoding like the non-monotonic codepath, but we'd
965            // have to store the decoded Datums in the same struct as the Row, which gets tricky.
966            let left: Vec<_> = self.row.unpack();
967            let right: Vec<_> = other.row.unpack();
968            mz_expr::compare_columns(&self.order_key, &left, &right, || left.cmp(&right))
969        }
970    }
971    impl PartialOrd for Top1Monoid {
972        fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
973            Some(self.cmp(other))
974        }
975    }
976
977    impl Semigroup for Top1Monoid {
978        fn plus_equals(&mut self, rhs: &Self) {
979            let cmp = (*self).cmp(rhs);
980            // NB: Reminder that TopK returns the _minimum_ K items.
981            if cmp == Ordering::Greater {
982                self.clone_from(rhs);
983            }
984        }
985    }
986
987    impl IsZero for Top1Monoid {
988        fn is_zero(&self) -> bool {
989            false
990        }
991    }
992
993    impl Columnation for Top1Monoid {
994        type InnerRegion = Top1MonoidRegion;
995    }
996
997    #[derive(Default)]
998    pub struct Top1MonoidRegion {
999        row_region: <Row as Columnation>::InnerRegion,
1000        order_key_region: <Vec<ColumnOrder> as Columnation>::InnerRegion,
1001    }
1002
1003    impl Region for Top1MonoidRegion {
1004        type Item = Top1Monoid;
1005
1006        unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
1007            let row = unsafe { self.row_region.copy(&item.row) };
1008            let order_key = unsafe { self.order_key_region.copy(&item.order_key) };
1009            Self::Item { row, order_key }
1010        }
1011
1012        fn clear(&mut self) {
1013            self.row_region.clear();
1014            self.order_key_region.clear();
1015        }
1016
1017        fn reserve_items<'a, I>(&mut self, items1: I)
1018        where
1019            Self: 'a,
1020            I: Iterator<Item = &'a Self::Item> + Clone,
1021        {
1022            let items2 = items1.clone();
1023            self.row_region
1024                .reserve_items(items1.into_iter().map(|s| &s.row));
1025            self.order_key_region
1026                .reserve_items(items2.into_iter().map(|s| &s.order_key));
1027        }
1028
1029        fn reserve_regions<'a, I>(&mut self, regions1: I)
1030        where
1031            Self: 'a,
1032            I: Iterator<Item = &'a Self> + Clone,
1033        {
1034            let regions2 = regions1.clone();
1035            self.row_region
1036                .reserve_regions(regions1.into_iter().map(|s| &s.row_region));
1037            self.order_key_region
1038                .reserve_regions(regions2.into_iter().map(|s| &s.order_key_region));
1039        }
1040
1041        fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
1042            self.row_region.heap_size(&mut callback);
1043            self.order_key_region.heap_size(callback);
1044        }
1045    }
1046
1047    /// A shared portion of a thread-local top-1 monoid implementation.
1048    #[derive(Debug)]
1049    pub struct Top1MonoidShared {
1050        pub order_key: Vec<ColumnOrder>,
1051        pub left: DatumVec,
1052        pub right: DatumVec,
1053    }
1054
1055    /// A monoid containing a row and a shared pointer to a shared structure.
1056    /// Only suitable for thread-local aggregations.
1057    #[derive(Debug, Clone)]
1058    pub struct Top1MonoidLocal {
1059        pub row: Row,
1060        pub shared: Rc<RefCell<Top1MonoidShared>>,
1061    }
1062
1063    impl Top1MonoidLocal {
1064        pub fn into_row(self) -> Row {
1065            self.row
1066        }
1067    }
1068
1069    impl PartialEq for Top1MonoidLocal {
1070        fn eq(&self, other: &Self) -> bool {
1071            self.row.eq(&other.row)
1072        }
1073    }
1074
1075    impl Eq for Top1MonoidLocal {}
1076
1077    impl Hash for Top1MonoidLocal {
1078        fn hash<H: Hasher>(&self, state: &mut H) {
1079            self.row.hash(state);
1080        }
1081    }
1082
1083    impl Ord for Top1MonoidLocal {
1084        fn cmp(&self, other: &Self) -> Ordering {
1085            debug_assert!(Rc::ptr_eq(&self.shared, &other.shared));
1086            let Top1MonoidShared {
1087                left,
1088                right,
1089                order_key,
1090            } = &mut *self.shared.borrow_mut();
1091
1092            let left = left.borrow_with(&self.row);
1093            let right = right.borrow_with(&other.row);
1094            mz_expr::compare_columns(order_key, &left, &right, || left.cmp(&right))
1095        }
1096    }
1097
1098    impl PartialOrd for Top1MonoidLocal {
1099        fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1100            Some(self.cmp(other))
1101        }
1102    }
1103
1104    impl Semigroup for Top1MonoidLocal {
1105        fn plus_equals(&mut self, rhs: &Self) {
1106            let cmp = (*self).cmp(rhs);
1107            // NB: Reminder that TopK returns the _minimum_ K items.
1108            if cmp == Ordering::Greater {
1109                self.clone_from(rhs);
1110            }
1111        }
1112    }
1113
1114    impl IsZero for Top1MonoidLocal {
1115        fn is_zero(&self) -> bool {
1116            false
1117        }
1118    }
1119}