Skip to main content

mz_compute/render/
top_k.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! TopK execution logic.
11//!
12//! Consult [TopKPlan] documentation for details.
13
14use std::cell::RefCell;
15use std::collections::BTreeMap;
16use std::rc::Rc;
17
18use differential_dataflow::AsCollection;
19use differential_dataflow::hashable::Hashable;
20use differential_dataflow::lattice::Lattice;
21use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
22use differential_dataflow::operators::iterate::Variable as SemigroupVariable;
23use differential_dataflow::trace::implementations::BatchContainer;
24use differential_dataflow::trace::{Builder, Trace};
25use differential_dataflow::{Data, VecCollection};
26use mz_compute_types::dyncfgs::{ENABLE_COMPUTE_TEMPORAL_BUCKETING, TEMPORAL_BUCKETING_SUMMARY};
27use mz_compute_types::plan::ArrangementStrategy;
28use mz_compute_types::plan::top_k::{
29    BasicTopKPlan, MonotonicTop1Plan, MonotonicTopKPlan, TopKPlan,
30};
31use mz_expr::func::CastUint64ToInt64;
32use mz_expr::{BinaryFunc, Columns, Eval, EvalError, MirScalarExpr, UnaryFunc, func};
33use mz_ore::cast::CastFrom;
34use mz_ore::soft_assert_or_log;
35use mz_repr::fixed_length::ExtendDatums;
36use mz_repr::{Datum, DatumVec, Diff, ReprScalarType, Row, SharedRow};
37use mz_timely_util::columnation::ColumnationChunker;
38use mz_timely_util::operator::CollectionExt;
39use timely::Container;
40use timely::container::{CapacityContainerBuilder, PushInto};
41use timely::dataflow::channels::pact::Pipeline;
42use timely::dataflow::operators::Operator;
43
44use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
45use crate::extensions::reduce::{ClearContainer, MzReduce};
46use crate::render::Pairer;
47use crate::render::context::{CollectionBundle, Context};
48use crate::render::errors::DataflowErrorSer;
49use crate::render::errors::MaybeValidatingRow;
50use crate::typedefs::{KeyBatcher, MzTimestamp, RowRowSpine, RowSpine};
51use mz_row_spine::{
52    DatumSeq, RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowValBuilder, RowValSpine,
53};
54
55// The implementation requires integer timestamps to be able to delay feedback for monotonic inputs.
56impl<'scope, T: crate::render::RenderTimestamp + crate::render::MaybeBucketByTime>
57    Context<'scope, T>
58{
59    pub(crate) fn render_topk(
60        &self,
61        input: CollectionBundle<'scope, T>,
62        top_k_plan: TopKPlan,
63        temporal_bucketing_strategy: ArrangementStrategy,
64    ) -> CollectionBundle<'scope, T> {
65        let (ok_input, err_input) = input.as_specific_collection(None, &self.config_set);
66
67        // Bucket the per-row input stream when lowering chose `TemporalBucketing`.
68        // `TopK` builds its own arrangement(s) inside the variants below, bypassing
69        // `ensure_collections`, so the strategy is plumbed through `PlanNode::TopK`
70        // rather than inferred at the arrangement site. `apply_bucketing_strategy`
71        // is a no-op for `Direct`.
72        //
73        // Note: a `MonotonicTop1Plan`/`MonotonicTopKPlan` with `must_consolidate =
74        // false` together with `TemporalBucketing` here would mean we install a
75        // bucket operator with no downstream consolidator -- pure overhead. That
76        // combination cannot actually occur: `RelaxMustConsolidate` (which is the
77        // only writer of `must_consolidate = false`) runs only on single-time
78        // dataflows (one-shot peeks / `COPY TO`), and in single-time dataflows
79        // `ExprPrepOneShot` constant-folds `mz_now()` to the dataflow `as_of`
80        // before lowering, so no temporal predicates survive into LIR and
81        // `has_future_updates` is `false` everywhere -- meaning no operator (TopK
82        // included) is ever lowered with `TemporalBucketing`. The assertion below
83        // pins down this invariant.
84        if matches!(
85            temporal_bucketing_strategy,
86            ArrangementStrategy::TemporalBucketing
87        ) {
88            let must_consolidate = match &top_k_plan {
89                TopKPlan::MonotonicTop1(p) => p.must_consolidate,
90                TopKPlan::MonotonicTopK(p) => p.must_consolidate,
91                TopKPlan::Basic(_) => true,
92            };
93            soft_assert_or_log!(
94                must_consolidate,
95                "TopK with `TemporalBucketing` should not have `must_consolidate = false`; \
96                 `RelaxMustConsolidate` only runs on single-time dataflows where \
97                 `mz_now()` has been const-folded and no temporal bucketing is set",
98            );
99        }
100        let ok_input = if matches!(
101            temporal_bucketing_strategy,
102            ArrangementStrategy::TemporalBucketing
103        ) && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(&self.config_set)
104        {
105            let summary: mz_repr::Timestamp = TEMPORAL_BUCKETING_SUMMARY
106                .get(&self.config_set)
107                .try_into()
108                .expect("must fit");
109            T::maybe_apply_temporal_bucketing(ok_input.inner, self.as_of_frontier.clone(), summary)
110        } else {
111            ok_input
112        };
113
114        // We create a new region to compartmentalize the topk logic.
115        let outer_scope = ok_input.scope();
116        let (ok_result, err_collection) = outer_scope.clone().region_named("TopK", |inner| {
117            let ok_input = ok_input.enter_region(inner);
118            let mut err_collection = err_input.enter_region(inner);
119
120            // Determine if there should be errors due to limit evaluation; update `err_collection`.
121            // TODO(vmarcos): We evaluate the limit expression below for each input update. There
122            // is an opportunity to do so for every group key instead if the error handling is
123            // integrated with: 1. The intra-timestamp thinning step in monotonic top-k, e.g., by
124            // adding an error output there; 2. The validating reduction on basic top-k
125            // (database-issues#7108).
126
127            match top_k_plan.limit().map(|l| (l.as_literal(), l)) {
128                None => {}
129                Some((Some(Ok(literal)), _))
130                    if literal == Datum::Null || literal.unwrap_int64() >= 0 => {}
131                Some((_, expr)) => {
132                    // Produce errors from limit selectors that error or are
133                    // negative, and nothing from limit selectors that do
134                    // not. Note that even if expr.could_error() is false,
135                    // the expression might still return a negative limit and
136                    // thus needs to be checked.
137                    let expr = expr.clone();
138                    let mut datum_vec = mz_repr::DatumVec::new();
139                    let errors = ok_input.clone().flat_map(move |row| {
140                        let temp_storage = mz_repr::RowArena::new();
141                        let datums = datum_vec.borrow_with(&row);
142                        match expr.eval(&datums[..], &temp_storage) {
143                            Ok(l) if l != Datum::Null && l.unwrap_int64() < 0 => {
144                                Some(EvalError::NegLimit.into())
145                            }
146                            Ok(_) => None,
147                            Err(e) => Some(e.into()),
148                        }
149                    });
150                    err_collection = err_collection.concat(errors);
151                }
152            }
153
154            let ok_result = match top_k_plan {
155                TopKPlan::MonotonicTop1(MonotonicTop1Plan {
156                    group_key,
157                    order_key,
158                    must_consolidate,
159                }) => {
160                    let (oks, errs) = self.render_top1_monotonic(
161                        ok_input,
162                        group_key,
163                        order_key,
164                        must_consolidate,
165                    );
166                    err_collection = err_collection.concat(errs);
167                    oks
168                }
169                TopKPlan::MonotonicTopK(MonotonicTopKPlan {
170                    order_key,
171                    group_key,
172                    arity,
173                    mut limit,
174                    must_consolidate,
175                }) => {
176                    // Must permute `limit` to reference `group_key` elements as if in order.
177                    if let Some(expr) = limit.as_mut() {
178                        let mut map = BTreeMap::new();
179                        for (index, column) in group_key.iter().enumerate() {
180                            map.insert(*column, index);
181                        }
182                        expr.permute_map(&map);
183                    }
184
185                    // Map the group key along with the row and consolidate if required to do so.
186                    let mut datum_vec = mz_repr::DatumVec::new();
187                    let ok_scope = ok_input.scope();
188                    let collection = ok_input
189                        .map(move |row| {
190                            let group_row = {
191                                let datums = datum_vec.borrow_with(&row);
192                                SharedRow::pack(group_key.iter().map(|i| datums[*i]))
193                            };
194                            (group_row, row)
195                        })
196                        .consolidate_named_if::<KeyBatcher<_, _, _>>(
197                            must_consolidate,
198                            "Consolidated MonotonicTopK input",
199                        );
200
201                    // It should be now possible to ensure that we have a monotonic collection.
202                    let error_logger = self.error_logger();
203                    let (collection, errs) = collection.ensure_monotonic(move |data, diff| {
204                        error_logger.log(
205                            "Non-monotonic input to MonotonicTopK",
206                            &format!("data={data:?}, diff={diff}"),
207                        );
208                        let m = "tried to build monotonic top-k on non-monotonic input".into();
209                        (DataflowErrorSer::from(EvalError::Internal(m)), Diff::ONE)
210                    });
211                    err_collection = err_collection.concat(errs);
212
213                    // For monotonic inputs, we are able to thin the input relation in two stages:
214                    // 1. First, we can do an intra-timestamp thinning which has the advantage of
215                    //    being computed in a streaming fashion, even for the initial snapshot.
216                    // 2. Then, we can do inter-timestamp thinning by feeding back negations for
217                    //    any records that have been invalidated.
218                    let collection = if let Some(limit) = limit.clone() {
219                        render_intra_ts_thinning(collection, order_key.clone(), limit)
220                    } else {
221                        collection
222                    };
223
224                    let pairer = Pairer::new(1);
225                    let collection = collection.map(move |(group_row, row)| {
226                        let hash = row.hashed();
227                        let hash_key = pairer.merge(std::iter::once(Datum::from(hash)), &group_row);
228                        (hash_key, row)
229                    });
230
231                    // For monotonic inputs, we are able to retract inputs that can no longer be produced
232                    // as outputs. Any inputs beyond `offset + limit` will never again be produced as
233                    // outputs, and can be removed. The simplest form of this is when `offset == 0` and
234                    // these removable records are those in the input not produced in the output.
235                    // TODO: consider broadening this optimization to `offset > 0` by first filtering
236                    // down to `offset = 0` and `limit = offset + limit`, followed by a finishing act
237                    // of `offset` and `limit`, discarding only the records not produced in the intermediate
238                    // stage.
239                    let delay = std::time::Duration::from_secs(10);
240                    let (retractions_var, retractions) = SemigroupVariable::new(
241                        ok_scope,
242                        <T as crate::render::RenderTimestamp>::system_delay(
243                            delay.try_into().expect("must fit"),
244                        ),
245                    );
246                    let thinned = collection.clone().concat(retractions.negate());
247
248                    // As an additional optimization, we can skip creating the full topk hierachy
249                    // here since we now have an upper bound on the number records due to the
250                    // intra-ts thinning. The maximum number of records per timestamp is
251                    // (num_workers * limit), which we expect to be a small number and so we render
252                    // a single topk stage.
253                    let (result, errs) =
254                        self.build_topk_stage(thinned, order_key, 1u64, 0, limit, arity, false);
255                    // Consolidate the output of `build_topk_stage` because it's not guaranteed to be.
256                    let result = CollectionExt::consolidate_named::<KeyBatcher<_, _, _>>(
257                        result,
258                        "Monotonic TopK final consolidate",
259                    );
260                    retractions_var.set(collection.concat(result.clone().negate()));
261                    soft_assert_or_log!(
262                        errs.is_none(),
263                        "requested no validation, but received error collection"
264                    );
265
266                    result.map(|(_key_hash, row)| row)
267                }
268                TopKPlan::Basic(BasicTopKPlan {
269                    group_key,
270                    order_key,
271                    offset,
272                    mut limit,
273                    arity,
274                    buckets,
275                }) => {
276                    // Must permute `limit` to reference `group_key` elements as if in order.
277                    if let Some(expr) = limit.as_mut() {
278                        let mut map = BTreeMap::new();
279                        for (index, column) in group_key.iter().enumerate() {
280                            map.insert(*column, index);
281                        }
282                        expr.permute_map(&map);
283                    }
284
285                    let (oks, errs) = self.build_topk(
286                        ok_input, group_key, order_key, offset, limit, arity, buckets,
287                    );
288                    err_collection = err_collection.concat(errs);
289                    oks
290                }
291            };
292
293            // Extract the results from the region.
294            (
295                ok_result.leave_region(outer_scope),
296                err_collection.leave_region(outer_scope),
297            )
298        });
299
300        CollectionBundle::from_collections(ok_result, err_collection)
301    }
302
303    /// Constructs a TopK dataflow subgraph.
304    fn build_topk<'s>(
305        &self,
306        collection: VecCollection<'s, T, Row, Diff>,
307        group_key: Vec<usize>,
308        order_key: Vec<mz_expr::ColumnOrder>,
309        offset: usize,
310        limit: Option<mz_expr::MirScalarExpr>,
311        arity: usize,
312        buckets: Vec<u64>,
313    ) -> (
314        VecCollection<'s, T, Row, Diff>,
315        VecCollection<'s, T, DataflowErrorSer, Diff>,
316    ) {
317        let pairer = Pairer::new(1);
318        let mut datum_vec = mz_repr::DatumVec::new();
319        let mut collection = collection.map({
320            move |row| {
321                let group_row = {
322                    let row_hash = row.hashed();
323                    let datums = datum_vec.borrow_with(&row);
324                    let iterator = group_key.iter().map(|i| datums[*i]);
325                    pairer.merge(std::iter::once(Datum::from(row_hash)), iterator)
326                };
327                (group_row, row)
328            }
329        });
330
331        let mut validating = true;
332        let mut err_collection: Option<VecCollection<'s, T, _, _>> = None;
333
334        if let Some(mut limit) = limit.clone() {
335            // We may need a new `limit` that reflects the addition of `offset`.
336            // Ideally we compile it down to a literal if at all possible.
337            if offset > 0 {
338                let new_limit = (|| {
339                    let limit = limit.as_literal_int64()?;
340                    let offset = i64::try_from(offset).ok()?;
341                    limit.checked_add(offset)
342                })();
343
344                if let Some(new_limit) = new_limit {
345                    limit =
346                        MirScalarExpr::literal_ok(Datum::Int64(new_limit), ReprScalarType::Int64);
347                } else {
348                    limit = limit.call_binary(
349                        MirScalarExpr::literal_ok(
350                            Datum::UInt64(u64::cast_from(offset)),
351                            ReprScalarType::UInt64,
352                        )
353                        .call_unary(UnaryFunc::CastUint64ToInt64(CastUint64ToInt64)),
354                        BinaryFunc::AddInt64(func::AddInt64),
355                    );
356                }
357            }
358
359            // These bucket values define the shifts that happen to the 64 bit hash of the
360            // record, and should have the properties that 1. there are not too many of them,
361            // and 2. each has a modest difference to the next.
362            for bucket in buckets.into_iter() {
363                // here we do not apply `offset`, but instead restrict ourself with a limit
364                // that includes the offset. We cannot apply `offset` until we perform the
365                // final, complete reduction.
366                let (oks, errs) = self.build_topk_stage(
367                    collection,
368                    order_key.clone(),
369                    bucket,
370                    0,
371                    Some(limit.clone()),
372                    arity,
373                    validating,
374                );
375                collection = oks;
376                if validating {
377                    err_collection = errs;
378                    validating = false;
379                }
380            }
381        }
382
383        // We do a final step, both to make sure that we complete the reduction, and to correctly
384        // apply `offset` to the final group, as we have not yet been applying it to the partially
385        // formed groups.
386        let (oks, errs) = self.build_topk_stage(
387            collection, order_key, 1u64, offset, limit, arity, validating,
388        );
389        // Consolidate the output of `build_topk_stage` because it's not guaranteed to be.
390        let oks =
391            CollectionExt::consolidate_named::<KeyBatcher<_, _, _>>(oks, "TopK final consolidate");
392        collection = oks;
393        if validating {
394            err_collection = errs;
395        }
396        (
397            collection.map(|(_key_hash, row)| row),
398            err_collection.expect("at least one stage validated its inputs"),
399        )
400    }
401
402    /// To provide a robust incremental orderby-limit experience, we want to avoid grouping *all*
403    /// records (or even large groups) and then applying the ordering and limit. Instead, a more
404    /// robust approach forms groups of bounded size and applies the offset and limit to each,
405    /// and then increases the sizes of the groups.
406    ///
407    /// Builds a "stage", which uses a finer grouping than is required to reduce the volume of
408    /// updates, and to reduce the amount of work on the critical path for updates. The cost is
409    /// a larger number of arrangements when this optimization does nothing beneficial.
410    ///
411    /// The function accepts a collection of the form `(hash_key, row)`, a modulus it applies to the
412    /// `hash_key`'s hash datum, an `offset` for returning results, and a `limit` to restrict the
413    /// output size. `arity` represents the number of columns in the input data, and
414    /// if `validating` is true, we check for negative multiplicities, which indicate
415    /// an error in the input data.
416    ///
417    /// The output of this function is _not consolidated_.
418    ///
419    /// The dataflow fragment has the following shape:
420    /// ```text
421    ///     | input
422    ///     |
423    ///   arrange
424    ///     |\
425    ///     | \
426    ///     |  reduce
427    ///     |  |
428    ///     concat
429    ///     |
430    ///     | output
431    /// ```
432    /// There are additional map/flat_map operators as well as error demuxing operators, but we're
433    /// omitting them here for the sake of simplicity.
434    fn build_topk_stage<'s>(
435        &self,
436        collection: VecCollection<'s, T, (Row, Row), Diff>,
437        order_key: Vec<mz_expr::ColumnOrder>,
438        modulus: u64,
439        offset: usize,
440        limit: Option<mz_expr::MirScalarExpr>,
441        arity: usize,
442        validating: bool,
443    ) -> (
444        VecCollection<'s, T, (Row, Row), Diff>,
445        Option<VecCollection<'s, T, DataflowErrorSer, Diff>>,
446    ) {
447        // Form appropriate input by updating the `hash` column (first datum in `hash_key`) by
448        // applying `modulus`.
449        let input = collection.map(move |(hash_key, row)| {
450            let mut hash_key_iter = hash_key.iter();
451            let hash = hash_key_iter.next().unwrap().unwrap_uint64() % modulus;
452            let hash_key = SharedRow::pack(std::iter::once(hash.into()).chain(hash_key_iter));
453            (hash_key, row)
454        });
455
456        // If validating: demux errors, otherwise we cannot produce errors.
457        let (input, oks, errs) = if validating {
458            // Build topk stage, produce errors for invalid multiplicities.
459            let (input, stage) = build_topk_negated_stage::<
460                T,
461                RowValBuilder<_, _, _>,
462                RowValSpine<Result<Row, Row>, _, _>,
463            >(&input, order_key, offset, limit, arity);
464            let stage = stage.as_collection(|k, v| (k.to_row(), v.clone()));
465
466            // Demux oks and errors.
467            let error_logger = self.error_logger();
468            type CB<C> = CapacityContainerBuilder<C>;
469            let (oks, errs) = stage.map_fallible::<CB<_>, CB<_>, _, _, _>(
470                "Demuxing Errors",
471                move |(hk, result)| match result {
472                    Err(v) => {
473                        let mut hk_iter = hk.iter();
474                        let h = hk_iter.next().unwrap().unwrap_uint64();
475                        let k = SharedRow::pack(hk_iter);
476                        let message = "Negative multiplicities in TopK";
477                        error_logger.log(message, &format!("k={k:?}, h={h}, v={v:?}"));
478                        Err(EvalError::Internal(message.into()).into())
479                    }
480                    Ok(t) => Ok((hk, t)),
481                },
482            );
483            (input, oks, Some(errs))
484        } else {
485            // Build non-validating topk stage.
486            let (input, stage) =
487                build_topk_negated_stage::<T, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
488                    &input, order_key, offset, limit, arity,
489                );
490            // Turn arrangement into collection.
491            let stage = stage.as_collection(|k, v| (k.to_row(), v.to_row()));
492
493            (input, stage, None)
494        };
495        let input = input.as_collection(|k, v| (k.to_row(), v.to_row()));
496        (oks.concat(input), errs)
497    }
498
499    fn render_top1_monotonic<'s>(
500        &self,
501        collection: VecCollection<'s, T, Row, Diff>,
502        group_key: Vec<usize>,
503        order_key: Vec<mz_expr::ColumnOrder>,
504        must_consolidate: bool,
505    ) -> (
506        VecCollection<'s, T, Row, Diff>,
507        VecCollection<'s, T, DataflowErrorSer, Diff>,
508    ) {
509        // We can place our rows directly into the diff field, and only keep the relevant one
510        // corresponding to evaluating our aggregate, instead of having to do a hierarchical
511        // reduction. We start by mapping the group key along with the row and consolidating
512        // if required to do so.
513        let collection = collection
514            .map({
515                let mut datum_vec = mz_repr::DatumVec::new();
516                move |row| {
517                    // Scoped to allow borrow of `row` to drop.
518                    let group_key = {
519                        let datums = datum_vec.borrow_with(&row);
520                        SharedRow::pack(group_key.iter().map(|i| datums[*i]))
521                    };
522                    (group_key, row)
523                }
524            })
525            .consolidate_named_if::<KeyBatcher<_, _, _>>(
526                must_consolidate,
527                "Consolidated MonotonicTop1 input",
528            );
529
530        // It should be now possible to ensure that we have a monotonic collection and process it.
531        let error_logger = self.error_logger();
532        let (partial, errs) = collection.ensure_monotonic(move |data, diff| {
533            error_logger.log(
534                "Non-monotonic input to MonotonicTop1",
535                &format!("data={data:?}, diff={diff}"),
536            );
537            let m = "tried to build monotonic top-1 on non-monotonic input".into();
538            (EvalError::Internal(m).into(), Diff::ONE)
539        });
540        let partial: KeyCollection<_, _, _> = partial
541            .explode_one(move |(group_key, row)| {
542                (
543                    group_key,
544                    monoids::Top1Monoid {
545                        row,
546                        order_key: order_key.clone(),
547                    },
548                )
549            })
550            .into();
551        let result = partial
552            .mz_arrange::<
553                ColumnationChunker<_>,
554                RowBatcher<_, _>,
555                RowBuilder<_, _>,
556                RowSpine<_, _>,
557            >(
558                "Arranged MonotonicTop1 partial [val: empty]",
559            )
560            .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
561                "MonotonicTop1",
562                move |_key, input, output| {
563                    let accum: &monoids::Top1Monoid = &input[0].1;
564                    output.push((accum.row.clone(), Diff::ONE));
565                },
566            );
567        // TODO(database-issues#2288): Here we discard the arranged output.
568        (result.as_collection(|_k, v| v.to_row()), errs)
569    }
570}
571
572/// Build a stage of a topk reduction. Maintains the _retractions_ of the output instead of emitted
573/// rows. This has the benefit that we have to maintain state proportionally to size of the output
574/// instead of the size of the input.
575///
576/// Returns two arrangements:
577/// * The arranged input data without modifications, and
578/// * the maintained negated output data.
579fn build_topk_negated_stage<'s, T, Bu, Tr>(
580    input: &VecCollection<'s, T, (Row, Row), Diff>,
581    order_key: Vec<mz_expr::ColumnOrder>,
582    offset: usize,
583    limit: Option<mz_expr::MirScalarExpr>,
584    arity: usize,
585) -> (
586    Arranged<'s, TraceAgent<RowRowSpine<T, Diff>>>,
587    Arranged<'s, TraceAgent<Tr>>,
588)
589where
590    T: MzTimestamp,
591    Bu: Builder<
592            Time = T,
593            Input: Container + ClearContainer + PushInto<((Row, Tr::ValOwn), T, Diff)>,
594            Output = Tr::Batch,
595        >,
596    Tr: for<'a> Trace<
597            Key<'a> = DatumSeq<'a>,
598            KeyContainer: BatchContainer<Owned = Row>,
599            ValOwn: Data + MaybeValidatingRow<Row, Row>,
600            Time = T,
601            Diff = Diff,
602        > + 'static,
603    Arranged<'s, TraceAgent<Tr>>: ArrangementSize,
604{
605    let mut datum_vec = mz_repr::DatumVec::new();
606
607    // We only want to arrange parts of the input that are not part of the actual output
608    // such that `input.concat(&negated_output)` yields the correct TopK
609    // NOTE(vmarcos): The arranged input operator name below is used in the tuning advice
610    // built-in view mz_introspection.mz_expected_group_size_advice.
611    let arranged = input
612        .clone()
613        .mz_arrange::<
614            ColumnationChunker<_>,
615            RowRowBatcher<_, _>,
616            RowRowBuilder<_, _>,
617            RowRowSpine<_, _>,
618        >(
619            "Arranged TopK input",
620        );
621
622    // Eagerly evaluate literal limits.
623    let limit = limit.map(|l| match l.as_literal() {
624        Some(Ok(Datum::Null)) => Ok(Diff::MAX),
625        Some(Ok(d)) => Ok(Diff::from(d.unwrap_int64())),
626        _ => Err(l),
627    });
628
629    let reduced = arranged
630        .clone()
631        .mz_reduce_abelian::<_, Bu, Tr>("Reduced TopK input", {
632            move |hash_key, source, target: &mut Vec<(Tr::ValOwn, Diff)>| {
633                // Unpack the limit, either into an integer literal or an expression to evaluate.
634                let limit = match &limit {
635                    Some(Ok(lit)) => Some(*lit),
636                    Some(Err(expr)) => {
637                        // Unpack `key` after skipping the hash and determine the limit.
638                        // If the limit errors, use a zero limit; errors are surfaced elsewhere.
639                        let temp_storage = mz_repr::RowArena::new();
640                        let mut key_datums = datum_vec.borrow();
641                        hash_key.extend_datums(&temp_storage, &mut key_datums, None);
642                        // `key_datums[0]` is the hash; the key columns follow it.
643                        let datum_limit = expr
644                            .eval(&key_datums[1..], &temp_storage)
645                            .unwrap_or(Datum::Int64(0));
646                        Some(match datum_limit {
647                            Datum::Null => Diff::MAX,
648                            d => Diff::from(d.unwrap_int64()),
649                        })
650                    }
651                    None => None,
652                };
653
654                if let Some(err) = Tr::ValOwn::into_error() {
655                    for (datums, diff) in source.iter() {
656                        if diff.is_positive() {
657                            continue;
658                        }
659                        target.push((err((*datums).to_row()), Diff::ONE));
660                        return;
661                    }
662                }
663
664                // Determine if we must actually shrink the result set.
665                let must_shrink = offset > 0
666                    || limit
667                        .map(|l| source.iter().map(|(_, d)| *d).sum::<Diff>() > l)
668                        .unwrap_or(false);
669                if !must_shrink {
670                    return;
671                }
672
673                // First go ahead and emit all records. Note that we ensure target
674                // has the capacity to hold at least these records, and avoid any
675                // dependencies on the user-provided (potentially unbounded) limit.
676                target.reserve(source.len());
677                for (datums, diff) in source.iter() {
678                    target.push((Tr::ValOwn::ok((*datums).to_row()), -diff));
679                }
680                // local copies that may count down to zero.
681                let mut offset = offset;
682                let mut limit = limit;
683
684                // The order in which we should produce rows.
685                let mut indexes = (0..source.len()).collect::<Vec<_>>();
686                // We decode the datums once, into a common buffer for efficiency.
687                // Each row should contain `arity` columns; we should check that.
688                let temp_storage = mz_repr::RowArena::new();
689                let mut buffer = datum_vec.borrow();
690                for (index, (datums, _)) in source.iter().enumerate() {
691                    datums.extend_datums(&temp_storage, &mut buffer, None);
692                    assert_eq!(buffer.len(), arity * (index + 1));
693                }
694                let width = buffer.len() / source.len();
695
696                //todo: use arrangements or otherwise make the sort more performant?
697                indexes.sort_by(|left, right| {
698                    let left = &buffer[left * width..][..width];
699                    let right = &buffer[right * width..][..width];
700                    // Note: source was originally ordered by the u8 array representation
701                    // of rows, but left.cmp(right) uses Datum::cmp.
702                    mz_expr::compare_columns(&order_key, left, right, || left.cmp(right))
703                });
704
705                // We now need to lay out the data in order of `buffer`, but respecting
706                // the `offset` and `limit` constraints.
707                for index in indexes.into_iter() {
708                    let (datums, mut diff) = source[index];
709                    if !diff.is_positive() {
710                        continue;
711                    }
712                    // If we are still skipping early records ...
713                    if offset > 0 {
714                        let to_skip =
715                            std::cmp::min(offset, usize::try_from(diff.into_inner()).unwrap());
716                        offset -= to_skip;
717                        diff -= Diff::try_from(to_skip).unwrap();
718                    }
719                    // We should produce at most `limit` records.
720                    if let Some(limit) = &mut limit {
721                        diff = std::cmp::min(diff, Diff::from(*limit));
722                        *limit -= diff;
723                    }
724                    // Output the indicated number of rows.
725                    if diff.is_positive() {
726                        // Emit retractions for the elements actually part of
727                        // the set of TopK elements.
728                        target.push((Tr::ValOwn::ok(datums.to_row()), diff));
729                    }
730                }
731            }
732        });
733    (arranged, reduced)
734}
735
736fn render_intra_ts_thinning<'s, T>(
737    collection: VecCollection<'s, T, (Row, Row), Diff>,
738    order_key: Vec<mz_expr::ColumnOrder>,
739    limit: mz_expr::MirScalarExpr,
740) -> VecCollection<'s, T, (Row, Row), Diff>
741where
742    T: timely::progress::Timestamp + Lattice,
743{
744    let mut datum_vec = mz_repr::DatumVec::new();
745
746    let mut aggregates = BTreeMap::new();
747    let shared = Rc::new(RefCell::new(monoids::Top1MonoidShared {
748        order_key,
749        left: DatumVec::new(),
750        right: DatumVec::new(),
751    }));
752    collection
753        .inner
754        .unary_notify(
755            Pipeline,
756            "TopKIntraTimeThinning",
757            [],
758            move |input, output, notificator| {
759                input.for_each_time(|time, data| {
760                    let agg_time = aggregates
761                        .entry(time.time().clone())
762                        .or_insert_with(BTreeMap::new);
763                    for ((grp_row, row), record_time, diff) in data.flat_map(|data| data.drain(..))
764                    {
765                        let monoid = monoids::Top1MonoidLocal {
766                            row,
767                            shared: Rc::clone(&shared),
768                        };
769
770                        // Evalute the limit, first as a constant and then against the key if needed.
771                        let limit = if let Some(l) = limit.as_literal_int64() {
772                            l
773                        } else {
774                            let temp_storage = mz_repr::RowArena::new();
775                            let key_datums = datum_vec.borrow_with(&grp_row);
776                            // Unpack `key` and determine the limit.
777                            // If the limit errors, use a zero limit; errors are surfaced elsewhere.
778                            let datum_limit = limit
779                                .eval(&key_datums, &temp_storage)
780                                .unwrap_or(mz_repr::Datum::Int64(0));
781                            if datum_limit == Datum::Null {
782                                i64::MAX
783                            } else {
784                                datum_limit.unwrap_int64()
785                            }
786                        };
787
788                        let topk = agg_time
789                            .entry((grp_row, record_time))
790                            .or_insert_with(move || topk_agg::TopKBatch::new(limit));
791                        topk.update(monoid, diff.into_inner());
792                    }
793                    notificator.notify_at(time.retain(0));
794                });
795
796                notificator.for_each(|time, _, _| {
797                    if let Some(aggs) = aggregates.remove(time.time()) {
798                        let mut session = output.session(&time);
799                        for ((grp_row, record_time), topk) in aggs {
800                            session.give_iterator(topk.into_iter().map(|(monoid, diff)| {
801                                (
802                                    (grp_row.clone(), monoid.into_row()),
803                                    record_time.clone(),
804                                    diff.into(),
805                                )
806                            }))
807                        }
808                    }
809                });
810            },
811        )
812        .as_collection()
813}
814
815/// Types for in-place intra-ts aggregation of monotonic streams.
816pub mod topk_agg {
817    use differential_dataflow::consolidation;
818    use smallvec::SmallVec;
819
820    // TODO: This struct looks a lot like ChangeBatch and indeed its code is a modified version of
821    // that. It would be nice to find a way to reuse some or all of the code from there.
822    //
823    // Additionally, because we're calling into DD's consolidate method we are forced to work with
824    // the `Ord` trait which for the usage we do above means that we need to clone the `order_key`
825    // for each record. It would be nice to also remove the need for cloning that piece of data
826    pub struct TopKBatch<T> {
827        updates: SmallVec<[(T, i64); 16]>,
828        clean: usize,
829        limit: i64,
830    }
831
832    impl<T: Ord> TopKBatch<T> {
833        pub fn new(limit: i64) -> Self {
834            Self {
835                updates: SmallVec::new(),
836                clean: 0,
837                limit,
838            }
839        }
840
841        /// Adds a new update, for `item` with `value`.
842        ///
843        /// This could be optimized to perform compaction when the number of "dirty" elements exceeds
844        /// half the length of the list, which would keep the total footprint within reasonable bounds
845        /// even under an arbitrary number of updates. This has a cost, and it isn't clear whether it
846        /// is worth paying without some experimentation.
847        #[inline]
848        pub fn update(&mut self, item: T, value: i64) {
849            self.updates.push((item, value));
850            self.maintain_bounds();
851        }
852
853        /// Compact the internal representation.
854        ///
855        /// This method sort `self.updates` and consolidates elements with equal item, discarding
856        /// any whose accumulation is zero. It is optimized to only do this if the number of dirty
857        /// elements is non-zero.
858        #[inline]
859        pub fn compact(&mut self) {
860            if self.clean < self.updates.len() && self.updates.len() > 1 {
861                let len = consolidation::consolidate_slice(&mut self.updates);
862                self.updates.truncate(len);
863
864                // We can now retain only the first K records and throw away everything else
865                let mut limit = self.limit;
866                self.updates.retain(|x| {
867                    if limit > 0 {
868                        limit -= x.1;
869                        true
870                    } else {
871                        false
872                    }
873                });
874                // By the end of the loop above `limit` will either be:
875                // (a) Positive, in which case all updates were retained;
876                // (b) Zero, in which case we discarded all updates after limit became zero;
877                // (c) Negative, in which case the last record we retained had more copies
878                // than necessary. In this latter case, we need to do one final adjustment
879                // of the diff field of the last record so that the total sum of the diffs
880                // in the batch is K.
881                if limit < 0 {
882                    if let Some(item) = self.updates.last_mut() {
883                        // We are subtracting the limit *negated*, therefore we are subtracting a value
884                        // that is *greater* than or equal to zero, which represents the excess.
885                        item.1 -= -limit;
886                    }
887                }
888            }
889            self.clean = self.updates.len();
890        }
891
892        /// Maintain the bounds of pending (non-compacted) updates versus clean (compacted) data.
893        /// This function tries to minimize work by only compacting if enough work has accumulated.
894        fn maintain_bounds(&mut self) {
895            // if we have more than 32 elements and at least half of them are not clean, compact
896            if self.updates.len() > 32 && self.updates.len() >> 1 >= self.clean {
897                self.compact()
898            }
899        }
900    }
901
902    impl<T: Ord> IntoIterator for TopKBatch<T> {
903        type Item = (T, i64);
904        type IntoIter = smallvec::IntoIter<[(T, i64); 16]>;
905
906        fn into_iter(mut self) -> Self::IntoIter {
907            self.compact();
908            self.updates.into_iter()
909        }
910    }
911}
912
913/// Monoids for in-place compaction of monotonic streams.
914pub mod monoids {
915    use std::cell::RefCell;
916    use std::cmp::Ordering;
917    use std::hash::{Hash, Hasher};
918    use std::rc::Rc;
919
920    use columnation::{Columnation, Region};
921    use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
922    use mz_expr::ColumnOrder;
923    use mz_repr::{DatumVec, Diff, Row};
924    use serde::{Deserialize, Serialize};
925
926    /// A monoid containing a row and an ordering.
927    #[derive(Eq, PartialEq, Debug, Serialize, Deserialize, Hash, Default)]
928    pub struct Top1Monoid {
929        pub row: Row,
930        pub order_key: Vec<ColumnOrder>,
931    }
932
933    impl Clone for Top1Monoid {
934        #[inline]
935        fn clone(&self) -> Self {
936            Self {
937                row: self.row.clone(),
938                order_key: self.order_key.clone(),
939            }
940        }
941
942        #[inline]
943        fn clone_from(&mut self, source: &Self) {
944            self.row.clone_from(&source.row);
945            self.order_key.clone_from(&source.order_key);
946        }
947    }
948
949    impl Multiply<Diff> for Top1Monoid {
950        type Output = Self;
951
952        fn multiply(self, factor: &Diff) -> Self {
953            // Multiplication in Top1Monoid is idempotent, and its
954            // users must ascertain its monotonicity beforehand
955            // (typically with ensure_monotonic) since it has no zero
956            // value for us to use here.
957            assert!(factor.is_positive());
958            self
959        }
960    }
961
962    impl Ord for Top1Monoid {
963        fn cmp(&self, other: &Self) -> Ordering {
964            debug_assert_eq!(self.order_key, other.order_key);
965
966            // It might be nice to cache this row decoding like the non-monotonic codepath, but we'd
967            // have to store the decoded Datums in the same struct as the Row, which gets tricky.
968            let left: Vec<_> = self.row.unpack();
969            let right: Vec<_> = other.row.unpack();
970            mz_expr::compare_columns(&self.order_key, &left, &right, || left.cmp(&right))
971        }
972    }
973    impl PartialOrd for Top1Monoid {
974        fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
975            Some(self.cmp(other))
976        }
977    }
978
979    impl Semigroup for Top1Monoid {
980        fn plus_equals(&mut self, rhs: &Self) {
981            let cmp = (*self).cmp(rhs);
982            // NB: Reminder that TopK returns the _minimum_ K items.
983            if cmp == Ordering::Greater {
984                self.clone_from(rhs);
985            }
986        }
987    }
988
989    impl IsZero for Top1Monoid {
990        fn is_zero(&self) -> bool {
991            false
992        }
993    }
994
995    impl Columnation for Top1Monoid {
996        type InnerRegion = Top1MonoidRegion;
997    }
998
999    #[derive(Default)]
1000    pub struct Top1MonoidRegion {
1001        row_region: <Row as Columnation>::InnerRegion,
1002        order_key_region: <Vec<ColumnOrder> as Columnation>::InnerRegion,
1003    }
1004
1005    impl Region for Top1MonoidRegion {
1006        type Item = Top1Monoid;
1007
1008        unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
1009            let row = unsafe { self.row_region.copy(&item.row) };
1010            let order_key = unsafe { self.order_key_region.copy(&item.order_key) };
1011            Self::Item { row, order_key }
1012        }
1013
1014        fn clear(&mut self) {
1015            self.row_region.clear();
1016            self.order_key_region.clear();
1017        }
1018
1019        fn reserve_items<'a, I>(&mut self, items1: I)
1020        where
1021            Self: 'a,
1022            I: Iterator<Item = &'a Self::Item> + Clone,
1023        {
1024            let items2 = items1.clone();
1025            self.row_region
1026                .reserve_items(items1.into_iter().map(|s| &s.row));
1027            self.order_key_region
1028                .reserve_items(items2.into_iter().map(|s| &s.order_key));
1029        }
1030
1031        fn reserve_regions<'a, I>(&mut self, regions1: I)
1032        where
1033            Self: 'a,
1034            I: Iterator<Item = &'a Self> + Clone,
1035        {
1036            let regions2 = regions1.clone();
1037            self.row_region
1038                .reserve_regions(regions1.into_iter().map(|s| &s.row_region));
1039            self.order_key_region
1040                .reserve_regions(regions2.into_iter().map(|s| &s.order_key_region));
1041        }
1042
1043        fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
1044            self.row_region.heap_size(&mut callback);
1045            self.order_key_region.heap_size(callback);
1046        }
1047    }
1048
1049    /// A shared portion of a thread-local top-1 monoid implementation.
1050    #[derive(Debug)]
1051    pub struct Top1MonoidShared {
1052        pub order_key: Vec<ColumnOrder>,
1053        pub left: DatumVec,
1054        pub right: DatumVec,
1055    }
1056
1057    /// A monoid containing a row and a shared pointer to a shared structure.
1058    /// Only suitable for thread-local aggregations.
1059    #[derive(Debug, Clone)]
1060    pub struct Top1MonoidLocal {
1061        pub row: Row,
1062        pub shared: Rc<RefCell<Top1MonoidShared>>,
1063    }
1064
1065    impl Top1MonoidLocal {
1066        pub fn into_row(self) -> Row {
1067            self.row
1068        }
1069    }
1070
1071    impl PartialEq for Top1MonoidLocal {
1072        fn eq(&self, other: &Self) -> bool {
1073            self.row.eq(&other.row)
1074        }
1075    }
1076
1077    impl Eq for Top1MonoidLocal {}
1078
1079    impl Hash for Top1MonoidLocal {
1080        fn hash<H: Hasher>(&self, state: &mut H) {
1081            self.row.hash(state);
1082        }
1083    }
1084
1085    impl Ord for Top1MonoidLocal {
1086        fn cmp(&self, other: &Self) -> Ordering {
1087            debug_assert!(Rc::ptr_eq(&self.shared, &other.shared));
1088            let Top1MonoidShared {
1089                left,
1090                right,
1091                order_key,
1092            } = &mut *self.shared.borrow_mut();
1093
1094            let left = left.borrow_with(&self.row);
1095            let right = right.borrow_with(&other.row);
1096            mz_expr::compare_columns(order_key, &left, &right, || left.cmp(&right))
1097        }
1098    }
1099
1100    impl PartialOrd for Top1MonoidLocal {
1101        fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1102            Some(self.cmp(other))
1103        }
1104    }
1105
1106    impl Semigroup for Top1MonoidLocal {
1107        fn plus_equals(&mut self, rhs: &Self) {
1108            let cmp = (*self).cmp(rhs);
1109            // NB: Reminder that TopK returns the _minimum_ K items.
1110            if cmp == Ordering::Greater {
1111                self.clone_from(rhs);
1112            }
1113        }
1114    }
1115
1116    impl IsZero for Top1MonoidLocal {
1117        fn is_zero(&self) -> bool {
1118            false
1119        }
1120    }
1121}