Skip to main content

mz_compute/render/
top_k.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! TopK execution logic.
11//!
12//! Consult [TopKPlan] documentation for details.
13
14use std::cell::RefCell;
15use std::collections::BTreeMap;
16use std::rc::Rc;
17
18use differential_dataflow::AsCollection;
19use differential_dataflow::hashable::Hashable;
20use differential_dataflow::lattice::Lattice;
21use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
22use differential_dataflow::operators::iterate::Variable as SemigroupVariable;
23use differential_dataflow::trace::implementations::BatchContainer;
24use differential_dataflow::trace::implementations::merge_batcher::container::InternalMerge;
25use differential_dataflow::trace::{Builder, Trace};
26use differential_dataflow::{Data, VecCollection};
27use mz_compute_types::plan::top_k::{
28    BasicTopKPlan, MonotonicTop1Plan, MonotonicTopKPlan, TopKPlan,
29};
30use mz_expr::func::CastUint64ToInt64;
31use mz_expr::{BinaryFunc, EvalError, MirScalarExpr, UnaryFunc, func};
32use mz_ore::cast::CastFrom;
33use mz_ore::soft_assert_or_log;
34use mz_repr::{Datum, DatumVec, Diff, ReprScalarType, Row, SharedRow};
35use mz_storage_types::errors::DataflowError;
36use mz_timely_util::operator::CollectionExt;
37use timely::Container;
38use timely::container::{CapacityContainerBuilder, PushInto};
39use timely::dataflow::channels::pact::Pipeline;
40use timely::dataflow::operators::Operator;
41
42use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
43use crate::extensions::reduce::{ClearContainer, MzReduce};
44use crate::render::Pairer;
45use crate::render::context::{CollectionBundle, Context};
46use crate::render::errors::MaybeValidatingRow;
47use crate::row_spine::{
48    DatumSeq, RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowValBuilder, RowValSpine,
49};
50use crate::typedefs::{KeyBatcher, MzTimestamp, RowRowSpine, RowSpine};
51
52// The implementation requires integer timestamps to be able to delay feedback for monotonic inputs.
53impl<'scope, T: crate::render::RenderTimestamp> Context<'scope, T> {
54    pub(crate) fn render_topk(
55        &self,
56        input: CollectionBundle<'scope, T>,
57        top_k_plan: TopKPlan,
58    ) -> CollectionBundle<'scope, T> {
59        let (ok_input, err_input) = input.as_specific_collection(None, &self.config_set);
60
61        // We create a new region to compartmentalize the topk logic.
62        let outer_scope = ok_input.scope();
63        let (ok_result, err_collection) = outer_scope.clone().region_named("TopK", |inner| {
64            let ok_input = ok_input.enter_region(inner);
65            let mut err_collection = err_input.enter_region(inner);
66
67            // Determine if there should be errors due to limit evaluation; update `err_collection`.
68            // TODO(vmarcos): We evaluate the limit expression below for each input update. There
69            // is an opportunity to do so for every group key instead if the error handling is
70            // integrated with: 1. The intra-timestamp thinning step in monotonic top-k, e.g., by
71            // adding an error output there; 2. The validating reduction on basic top-k
72            // (database-issues#7108).
73
74            match top_k_plan.limit().map(|l| (l.as_literal(), l)) {
75                None => {}
76                Some((Some(Ok(literal)), _))
77                    if literal == Datum::Null || literal.unwrap_int64() >= 0 => {}
78                Some((_, expr)) => {
79                    // Produce errors from limit selectors that error or are
80                    // negative, and nothing from limit selectors that do
81                    // not. Note that even if expr.could_error() is false,
82                    // the expression might still return a negative limit and
83                    // thus needs to be checked.
84                    let expr = expr.clone();
85                    let mut datum_vec = mz_repr::DatumVec::new();
86                    let errors = ok_input.clone().flat_map(move |row| {
87                        let temp_storage = mz_repr::RowArena::new();
88                        let datums = datum_vec.borrow_with(&row);
89                        match expr.eval(&datums[..], &temp_storage) {
90                            Ok(l) if l != Datum::Null && l.unwrap_int64() < 0 => {
91                                Some(EvalError::NegLimit.into())
92                            }
93                            Ok(_) => None,
94                            Err(e) => Some(e.into()),
95                        }
96                    });
97                    err_collection = err_collection.concat(errors);
98                }
99            }
100
101            let ok_result = match top_k_plan {
102                TopKPlan::MonotonicTop1(MonotonicTop1Plan {
103                    group_key,
104                    order_key,
105                    must_consolidate,
106                }) => {
107                    let (oks, errs) = self.render_top1_monotonic(
108                        ok_input,
109                        group_key,
110                        order_key,
111                        must_consolidate,
112                    );
113                    err_collection = err_collection.concat(errs);
114                    oks
115                }
116                TopKPlan::MonotonicTopK(MonotonicTopKPlan {
117                    order_key,
118                    group_key,
119                    arity,
120                    mut limit,
121                    must_consolidate,
122                }) => {
123                    // Must permute `limit` to reference `group_key` elements as if in order.
124                    if let Some(expr) = limit.as_mut() {
125                        let mut map = BTreeMap::new();
126                        for (index, column) in group_key.iter().enumerate() {
127                            map.insert(*column, index);
128                        }
129                        expr.permute_map(&map);
130                    }
131
132                    // Map the group key along with the row and consolidate if required to do so.
133                    let mut datum_vec = mz_repr::DatumVec::new();
134                    let ok_scope = ok_input.scope();
135                    let collection = ok_input
136                        .map(move |row| {
137                            let group_row = {
138                                let datums = datum_vec.borrow_with(&row);
139                                SharedRow::pack(group_key.iter().map(|i| datums[*i]))
140                            };
141                            (group_row, row)
142                        })
143                        .consolidate_named_if::<KeyBatcher<_, _, _>>(
144                            must_consolidate,
145                            "Consolidated MonotonicTopK input",
146                        );
147
148                    // It should be now possible to ensure that we have a monotonic collection.
149                    let error_logger = self.error_logger();
150                    let (collection, errs) = collection.ensure_monotonic(move |data, diff| {
151                        error_logger.log(
152                            "Non-monotonic input to MonotonicTopK",
153                            &format!("data={data:?}, diff={diff}"),
154                        );
155                        let m = "tried to build monotonic top-k on non-monotonic input".into();
156                        (DataflowError::from(EvalError::Internal(m)), Diff::ONE)
157                    });
158                    err_collection = err_collection.concat(errs);
159
160                    // For monotonic inputs, we are able to thin the input relation in two stages:
161                    // 1. First, we can do an intra-timestamp thinning which has the advantage of
162                    //    being computed in a streaming fashion, even for the initial snapshot.
163                    // 2. Then, we can do inter-timestamp thinning by feeding back negations for
164                    //    any records that have been invalidated.
165                    let collection = if let Some(limit) = limit.clone() {
166                        render_intra_ts_thinning(collection, order_key.clone(), limit)
167                    } else {
168                        collection
169                    };
170
171                    let pairer = Pairer::new(1);
172                    let collection = collection.map(move |(group_row, row)| {
173                        let hash = row.hashed();
174                        let hash_key = pairer.merge(std::iter::once(Datum::from(hash)), &group_row);
175                        (hash_key, row)
176                    });
177
178                    // For monotonic inputs, we are able to retract inputs that can no longer be produced
179                    // as outputs. Any inputs beyond `offset + limit` will never again be produced as
180                    // outputs, and can be removed. The simplest form of this is when `offset == 0` and
181                    // these removable records are those in the input not produced in the output.
182                    // TODO: consider broadening this optimization to `offset > 0` by first filtering
183                    // down to `offset = 0` and `limit = offset + limit`, followed by a finishing act
184                    // of `offset` and `limit`, discarding only the records not produced in the intermediate
185                    // stage.
186                    let delay = std::time::Duration::from_secs(10);
187                    let (retractions_var, retractions) = SemigroupVariable::new(
188                        ok_scope,
189                        <T as crate::render::RenderTimestamp>::system_delay(
190                            delay.try_into().expect("must fit"),
191                        ),
192                    );
193                    let thinned = collection.clone().concat(retractions.negate());
194
195                    // As an additional optimization, we can skip creating the full topk hierachy
196                    // here since we now have an upper bound on the number records due to the
197                    // intra-ts thinning. The maximum number of records per timestamp is
198                    // (num_workers * limit), which we expect to be a small number and so we render
199                    // a single topk stage.
200                    let (result, errs) =
201                        self.build_topk_stage(thinned, order_key, 1u64, 0, limit, arity, false);
202                    // Consolidate the output of `build_topk_stage` because it's not guaranteed to be.
203                    let result = CollectionExt::consolidate_named::<KeyBatcher<_, _, _>>(
204                        result,
205                        "Monotonic TopK final consolidate",
206                    );
207                    retractions_var.set(collection.concat(result.clone().negate()));
208                    soft_assert_or_log!(
209                        errs.is_none(),
210                        "requested no validation, but received error collection"
211                    );
212
213                    result.map(|(_key_hash, row)| row)
214                }
215                TopKPlan::Basic(BasicTopKPlan {
216                    group_key,
217                    order_key,
218                    offset,
219                    mut limit,
220                    arity,
221                    buckets,
222                }) => {
223                    // Must permute `limit` to reference `group_key` elements as if in order.
224                    if let Some(expr) = limit.as_mut() {
225                        let mut map = BTreeMap::new();
226                        for (index, column) in group_key.iter().enumerate() {
227                            map.insert(*column, index);
228                        }
229                        expr.permute_map(&map);
230                    }
231
232                    let (oks, errs) = self.build_topk(
233                        ok_input, group_key, order_key, offset, limit, arity, buckets,
234                    );
235                    err_collection = err_collection.concat(errs);
236                    oks
237                }
238            };
239
240            // Extract the results from the region.
241            (
242                ok_result.leave_region(outer_scope),
243                err_collection.leave_region(outer_scope),
244            )
245        });
246
247        CollectionBundle::from_collections(ok_result, err_collection)
248    }
249
250    /// Constructs a TopK dataflow subgraph.
251    fn build_topk<'s>(
252        &self,
253        collection: VecCollection<'s, T, Row, Diff>,
254        group_key: Vec<usize>,
255        order_key: Vec<mz_expr::ColumnOrder>,
256        offset: usize,
257        limit: Option<mz_expr::MirScalarExpr>,
258        arity: usize,
259        buckets: Vec<u64>,
260    ) -> (
261        VecCollection<'s, T, Row, Diff>,
262        VecCollection<'s, T, DataflowError, Diff>,
263    ) {
264        let pairer = Pairer::new(1);
265        let mut datum_vec = mz_repr::DatumVec::new();
266        let mut collection = collection.map({
267            move |row| {
268                let group_row = {
269                    let row_hash = row.hashed();
270                    let datums = datum_vec.borrow_with(&row);
271                    let iterator = group_key.iter().map(|i| datums[*i]);
272                    pairer.merge(std::iter::once(Datum::from(row_hash)), iterator)
273                };
274                (group_row, row)
275            }
276        });
277
278        let mut validating = true;
279        let mut err_collection: Option<VecCollection<'s, T, _, _>> = None;
280
281        if let Some(mut limit) = limit.clone() {
282            // We may need a new `limit` that reflects the addition of `offset`.
283            // Ideally we compile it down to a literal if at all possible.
284            if offset > 0 {
285                let new_limit = (|| {
286                    let limit = limit.as_literal_int64()?;
287                    let offset = i64::try_from(offset).ok()?;
288                    limit.checked_add(offset)
289                })();
290
291                if let Some(new_limit) = new_limit {
292                    limit =
293                        MirScalarExpr::literal_ok(Datum::Int64(new_limit), ReprScalarType::Int64);
294                } else {
295                    limit = limit.call_binary(
296                        MirScalarExpr::literal_ok(
297                            Datum::UInt64(u64::cast_from(offset)),
298                            ReprScalarType::UInt64,
299                        )
300                        .call_unary(UnaryFunc::CastUint64ToInt64(CastUint64ToInt64)),
301                        BinaryFunc::AddInt64(func::AddInt64),
302                    );
303                }
304            }
305
306            // These bucket values define the shifts that happen to the 64 bit hash of the
307            // record, and should have the properties that 1. there are not too many of them,
308            // and 2. each has a modest difference to the next.
309            for bucket in buckets.into_iter() {
310                // here we do not apply `offset`, but instead restrict ourself with a limit
311                // that includes the offset. We cannot apply `offset` until we perform the
312                // final, complete reduction.
313                let (oks, errs) = self.build_topk_stage(
314                    collection,
315                    order_key.clone(),
316                    bucket,
317                    0,
318                    Some(limit.clone()),
319                    arity,
320                    validating,
321                );
322                collection = oks;
323                if validating {
324                    err_collection = errs;
325                    validating = false;
326                }
327            }
328        }
329
330        // We do a final step, both to make sure that we complete the reduction, and to correctly
331        // apply `offset` to the final group, as we have not yet been applying it to the partially
332        // formed groups.
333        let (oks, errs) = self.build_topk_stage(
334            collection, order_key, 1u64, offset, limit, arity, validating,
335        );
336        // Consolidate the output of `build_topk_stage` because it's not guaranteed to be.
337        let oks =
338            CollectionExt::consolidate_named::<KeyBatcher<_, _, _>>(oks, "TopK final consolidate");
339        collection = oks;
340        if validating {
341            err_collection = errs;
342        }
343        (
344            collection.map(|(_key_hash, row)| row),
345            err_collection.expect("at least one stage validated its inputs"),
346        )
347    }
348
349    /// To provide a robust incremental orderby-limit experience, we want to avoid grouping *all*
350    /// records (or even large groups) and then applying the ordering and limit. Instead, a more
351    /// robust approach forms groups of bounded size and applies the offset and limit to each,
352    /// and then increases the sizes of the groups.
353    ///
354    /// Builds a "stage", which uses a finer grouping than is required to reduce the volume of
355    /// updates, and to reduce the amount of work on the critical path for updates. The cost is
356    /// a larger number of arrangements when this optimization does nothing beneficial.
357    ///
358    /// The function accepts a collection of the form `(hash_key, row)`, a modulus it applies to the
359    /// `hash_key`'s hash datum, an `offset` for returning results, and a `limit` to restrict the
360    /// output size. `arity` represents the number of columns in the input data, and
361    /// if `validating` is true, we check for negative multiplicities, which indicate
362    /// an error in the input data.
363    ///
364    /// The output of this function is _not consolidated_.
365    ///
366    /// The dataflow fragment has the following shape:
367    /// ```text
368    ///     | input
369    ///     |
370    ///   arrange
371    ///     |\
372    ///     | \
373    ///     |  reduce
374    ///     |  |
375    ///     concat
376    ///     |
377    ///     | output
378    /// ```
379    /// There are additional map/flat_map operators as well as error demuxing operators, but we're
380    /// omitting them here for the sake of simplicity.
381    fn build_topk_stage<'s>(
382        &self,
383        collection: VecCollection<'s, T, (Row, Row), Diff>,
384        order_key: Vec<mz_expr::ColumnOrder>,
385        modulus: u64,
386        offset: usize,
387        limit: Option<mz_expr::MirScalarExpr>,
388        arity: usize,
389        validating: bool,
390    ) -> (
391        VecCollection<'s, T, (Row, Row), Diff>,
392        Option<VecCollection<'s, T, DataflowError, Diff>>,
393    ) {
394        // Form appropriate input by updating the `hash` column (first datum in `hash_key`) by
395        // applying `modulus`.
396        let input = collection.map(move |(hash_key, row)| {
397            let mut hash_key_iter = hash_key.iter();
398            let hash = hash_key_iter.next().unwrap().unwrap_uint64() % modulus;
399            let hash_key = SharedRow::pack(std::iter::once(hash.into()).chain(hash_key_iter));
400            (hash_key, row)
401        });
402
403        // If validating: demux errors, otherwise we cannot produce errors.
404        let (input, oks, errs) = if validating {
405            // Build topk stage, produce errors for invalid multiplicities.
406            let (input, stage) = build_topk_negated_stage::<
407                T,
408                RowValBuilder<_, _, _>,
409                RowValSpine<Result<Row, Row>, _, _>,
410            >(&input, order_key, offset, limit, arity);
411            let stage = stage.as_collection(|k, v| (k.to_row(), v.clone()));
412
413            // Demux oks and errors.
414            let error_logger = self.error_logger();
415            type CB<C> = CapacityContainerBuilder<C>;
416            let (oks, errs) = stage.map_fallible::<CB<_>, CB<_>, _, _, _>(
417                "Demuxing Errors",
418                move |(hk, result)| match result {
419                    Err(v) => {
420                        let mut hk_iter = hk.iter();
421                        let h = hk_iter.next().unwrap().unwrap_uint64();
422                        let k = SharedRow::pack(hk_iter);
423                        let message = "Negative multiplicities in TopK";
424                        error_logger.log(message, &format!("k={k:?}, h={h}, v={v:?}"));
425                        Err(EvalError::Internal(message.into()).into())
426                    }
427                    Ok(t) => Ok((hk, t)),
428                },
429            );
430            (input, oks, Some(errs))
431        } else {
432            // Build non-validating topk stage.
433            let (input, stage) =
434                build_topk_negated_stage::<T, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
435                    &input, order_key, offset, limit, arity,
436                );
437            // Turn arrangement into collection.
438            let stage = stage.as_collection(|k, v| (k.to_row(), v.to_row()));
439
440            (input, stage, None)
441        };
442        let input = input.as_collection(|k, v| (k.to_row(), v.to_row()));
443        (oks.concat(input), errs)
444    }
445
446    fn render_top1_monotonic<'s>(
447        &self,
448        collection: VecCollection<'s, T, Row, Diff>,
449        group_key: Vec<usize>,
450        order_key: Vec<mz_expr::ColumnOrder>,
451        must_consolidate: bool,
452    ) -> (
453        VecCollection<'s, T, Row, Diff>,
454        VecCollection<'s, T, DataflowError, Diff>,
455    ) {
456        // We can place our rows directly into the diff field, and only keep the relevant one
457        // corresponding to evaluating our aggregate, instead of having to do a hierarchical
458        // reduction. We start by mapping the group key along with the row and consolidating
459        // if required to do so.
460        let collection = collection
461            .map({
462                let mut datum_vec = mz_repr::DatumVec::new();
463                move |row| {
464                    // Scoped to allow borrow of `row` to drop.
465                    let group_key = {
466                        let datums = datum_vec.borrow_with(&row);
467                        SharedRow::pack(group_key.iter().map(|i| datums[*i]))
468                    };
469                    (group_key, row)
470                }
471            })
472            .consolidate_named_if::<KeyBatcher<_, _, _>>(
473                must_consolidate,
474                "Consolidated MonotonicTop1 input",
475            );
476
477        // It should be now possible to ensure that we have a monotonic collection and process it.
478        let error_logger = self.error_logger();
479        let (partial, errs) = collection.ensure_monotonic(move |data, diff| {
480            error_logger.log(
481                "Non-monotonic input to MonotonicTop1",
482                &format!("data={data:?}, diff={diff}"),
483            );
484            let m = "tried to build monotonic top-1 on non-monotonic input".into();
485            (EvalError::Internal(m).into(), Diff::ONE)
486        });
487        let partial: KeyCollection<_, _, _> = partial
488            .explode_one(move |(group_key, row)| {
489                (
490                    group_key,
491                    monoids::Top1Monoid {
492                        row,
493                        order_key: order_key.clone(),
494                    },
495                )
496            })
497            .into();
498        let result = partial
499            .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
500                "Arranged MonotonicTop1 partial [val: empty]",
501            )
502            .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
503                "MonotonicTop1",
504                move |_key, input, output| {
505                    let accum: &monoids::Top1Monoid = &input[0].1;
506                    output.push((accum.row.clone(), Diff::ONE));
507                },
508            );
509        // TODO(database-issues#2288): Here we discard the arranged output.
510        (result.as_collection(|_k, v| v.to_row()), errs)
511    }
512}
513
514/// Build a stage of a topk reduction. Maintains the _retractions_ of the output instead of emitted
515/// rows. This has the benefit that we have to maintain state proportionally to size of the output
516/// instead of the size of the input.
517///
518/// Returns two arrangements:
519/// * The arranged input data without modifications, and
520/// * the maintained negated output data.
521fn build_topk_negated_stage<'s, T, Bu, Tr>(
522    input: &VecCollection<'s, T, (Row, Row), Diff>,
523    order_key: Vec<mz_expr::ColumnOrder>,
524    offset: usize,
525    limit: Option<mz_expr::MirScalarExpr>,
526    arity: usize,
527) -> (
528    Arranged<'s, TraceAgent<RowRowSpine<T, Diff>>>,
529    Arranged<'s, TraceAgent<Tr>>,
530)
531where
532    T: MzTimestamp,
533    Bu: Builder<
534            Time = T,
535            Input: Container
536                       + InternalMerge
537                       + ClearContainer
538                       + PushInto<((Row, Tr::ValOwn), T, Diff)>,
539            Output = Tr::Batch,
540        >,
541    Tr: for<'a> Trace<
542            Key<'a> = DatumSeq<'a>,
543            KeyContainer: BatchContainer<Owned = Row>,
544            ValOwn: Data + MaybeValidatingRow<Row, Row>,
545            Time = T,
546            Diff = Diff,
547        > + 'static,
548    Arranged<'s, TraceAgent<Tr>>: ArrangementSize,
549{
550    let mut datum_vec = mz_repr::DatumVec::new();
551
552    // We only want to arrange parts of the input that are not part of the actual output
553    // such that `input.concat(&negated_output)` yields the correct TopK
554    // NOTE(vmarcos): The arranged input operator name below is used in the tuning advice
555    // built-in view mz_introspection.mz_expected_group_size_advice.
556    let arranged = input
557        .clone()
558        .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
559            "Arranged TopK input",
560        );
561
562    // Eagerly evaluate literal limits.
563    let limit = limit.map(|l| match l.as_literal() {
564        Some(Ok(Datum::Null)) => Ok(Diff::MAX),
565        Some(Ok(d)) => Ok(Diff::from(d.unwrap_int64())),
566        _ => Err(l),
567    });
568
569    let reduced = arranged
570        .clone()
571        .mz_reduce_abelian::<_, Bu, Tr>("Reduced TopK input", {
572            move |mut hash_key, source, target: &mut Vec<(Tr::ValOwn, Diff)>| {
573                // Unpack the limit, either into an integer literal or an expression to evaluate.
574                let limit = match &limit {
575                    Some(Ok(lit)) => Some(*lit),
576                    Some(Err(expr)) => {
577                        // Unpack `key` after skipping the hash and determine the limit.
578                        // If the limit errors, use a zero limit; errors are surfaced elsewhere.
579                        let temp_storage = mz_repr::RowArena::new();
580                        let _hash = hash_key.next();
581                        let mut key_datums = datum_vec.borrow();
582                        key_datums.extend(hash_key);
583                        let datum_limit = expr
584                            .eval(&key_datums, &temp_storage)
585                            .unwrap_or(Datum::Int64(0));
586                        Some(match datum_limit {
587                            Datum::Null => Diff::MAX,
588                            d => Diff::from(d.unwrap_int64()),
589                        })
590                    }
591                    None => None,
592                };
593
594                if let Some(err) = Tr::ValOwn::into_error() {
595                    for (datums, diff) in source.iter() {
596                        if diff.is_positive() {
597                            continue;
598                        }
599                        target.push((err((*datums).to_row()), Diff::ONE));
600                        return;
601                    }
602                }
603
604                // Determine if we must actually shrink the result set.
605                let must_shrink = offset > 0
606                    || limit
607                        .map(|l| source.iter().map(|(_, d)| *d).sum::<Diff>() > l)
608                        .unwrap_or(false);
609                if !must_shrink {
610                    return;
611                }
612
613                // First go ahead and emit all records. Note that we ensure target
614                // has the capacity to hold at least these records, and avoid any
615                // dependencies on the user-provided (potentially unbounded) limit.
616                target.reserve(source.len());
617                for (datums, diff) in source.iter() {
618                    target.push((Tr::ValOwn::ok((*datums).to_row()), -diff));
619                }
620                // local copies that may count down to zero.
621                let mut offset = offset;
622                let mut limit = limit;
623
624                // The order in which we should produce rows.
625                let mut indexes = (0..source.len()).collect::<Vec<_>>();
626                // We decode the datums once, into a common buffer for efficiency.
627                // Each row should contain `arity` columns; we should check that.
628                let mut buffer = datum_vec.borrow();
629                for (index, (datums, _)) in source.iter().enumerate() {
630                    buffer.extend(*datums);
631                    assert_eq!(buffer.len(), arity * (index + 1));
632                }
633                let width = buffer.len() / source.len();
634
635                //todo: use arrangements or otherwise make the sort more performant?
636                indexes.sort_by(|left, right| {
637                    let left = &buffer[left * width..][..width];
638                    let right = &buffer[right * width..][..width];
639                    // Note: source was originally ordered by the u8 array representation
640                    // of rows, but left.cmp(right) uses Datum::cmp.
641                    mz_expr::compare_columns(&order_key, left, right, || left.cmp(right))
642                });
643
644                // We now need to lay out the data in order of `buffer`, but respecting
645                // the `offset` and `limit` constraints.
646                for index in indexes.into_iter() {
647                    let (datums, mut diff) = source[index];
648                    if !diff.is_positive() {
649                        continue;
650                    }
651                    // If we are still skipping early records ...
652                    if offset > 0 {
653                        let to_skip =
654                            std::cmp::min(offset, usize::try_from(diff.into_inner()).unwrap());
655                        offset -= to_skip;
656                        diff -= Diff::try_from(to_skip).unwrap();
657                    }
658                    // We should produce at most `limit` records.
659                    if let Some(limit) = &mut limit {
660                        diff = std::cmp::min(diff, Diff::from(*limit));
661                        *limit -= diff;
662                    }
663                    // Output the indicated number of rows.
664                    if diff.is_positive() {
665                        // Emit retractions for the elements actually part of
666                        // the set of TopK elements.
667                        target.push((Tr::ValOwn::ok(datums.to_row()), diff));
668                    }
669                }
670            }
671        });
672    (arranged, reduced)
673}
674
675fn render_intra_ts_thinning<'s, T>(
676    collection: VecCollection<'s, T, (Row, Row), Diff>,
677    order_key: Vec<mz_expr::ColumnOrder>,
678    limit: mz_expr::MirScalarExpr,
679) -> VecCollection<'s, T, (Row, Row), Diff>
680where
681    T: timely::progress::Timestamp + Lattice,
682{
683    let mut datum_vec = mz_repr::DatumVec::new();
684
685    let mut aggregates = BTreeMap::new();
686    let shared = Rc::new(RefCell::new(monoids::Top1MonoidShared {
687        order_key,
688        left: DatumVec::new(),
689        right: DatumVec::new(),
690    }));
691    collection
692        .inner
693        .unary_notify(
694            Pipeline,
695            "TopKIntraTimeThinning",
696            [],
697            move |input, output, notificator| {
698                input.for_each_time(|time, data| {
699                    let agg_time = aggregates
700                        .entry(time.time().clone())
701                        .or_insert_with(BTreeMap::new);
702                    for ((grp_row, row), record_time, diff) in data.flat_map(|data| data.drain(..))
703                    {
704                        let monoid = monoids::Top1MonoidLocal {
705                            row,
706                            shared: Rc::clone(&shared),
707                        };
708
709                        // Evalute the limit, first as a constant and then against the key if needed.
710                        let limit = if let Some(l) = limit.as_literal_int64() {
711                            l
712                        } else {
713                            let temp_storage = mz_repr::RowArena::new();
714                            let key_datums = datum_vec.borrow_with(&grp_row);
715                            // Unpack `key` and determine the limit.
716                            // If the limit errors, use a zero limit; errors are surfaced elsewhere.
717                            let datum_limit = limit
718                                .eval(&key_datums, &temp_storage)
719                                .unwrap_or(mz_repr::Datum::Int64(0));
720                            if datum_limit == Datum::Null {
721                                i64::MAX
722                            } else {
723                                datum_limit.unwrap_int64()
724                            }
725                        };
726
727                        let topk = agg_time
728                            .entry((grp_row, record_time))
729                            .or_insert_with(move || topk_agg::TopKBatch::new(limit));
730                        topk.update(monoid, diff.into_inner());
731                    }
732                    notificator.notify_at(time.retain(0));
733                });
734
735                notificator.for_each(|time, _, _| {
736                    if let Some(aggs) = aggregates.remove(time.time()) {
737                        let mut session = output.session(&time);
738                        for ((grp_row, record_time), topk) in aggs {
739                            session.give_iterator(topk.into_iter().map(|(monoid, diff)| {
740                                (
741                                    (grp_row.clone(), monoid.into_row()),
742                                    record_time.clone(),
743                                    diff.into(),
744                                )
745                            }))
746                        }
747                    }
748                });
749            },
750        )
751        .as_collection()
752}
753
754/// Types for in-place intra-ts aggregation of monotonic streams.
755pub mod topk_agg {
756    use differential_dataflow::consolidation;
757    use smallvec::SmallVec;
758
759    // TODO: This struct looks a lot like ChangeBatch and indeed its code is a modified version of
760    // that. It would be nice to find a way to reuse some or all of the code from there.
761    //
762    // Additionally, because we're calling into DD's consolidate method we are forced to work with
763    // the `Ord` trait which for the usage we do above means that we need to clone the `order_key`
764    // for each record. It would be nice to also remove the need for cloning that piece of data
765    pub struct TopKBatch<T> {
766        updates: SmallVec<[(T, i64); 16]>,
767        clean: usize,
768        limit: i64,
769    }
770
771    impl<T: Ord> TopKBatch<T> {
772        pub fn new(limit: i64) -> Self {
773            Self {
774                updates: SmallVec::new(),
775                clean: 0,
776                limit,
777            }
778        }
779
780        /// Adds a new update, for `item` with `value`.
781        ///
782        /// This could be optimized to perform compaction when the number of "dirty" elements exceeds
783        /// half the length of the list, which would keep the total footprint within reasonable bounds
784        /// even under an arbitrary number of updates. This has a cost, and it isn't clear whether it
785        /// is worth paying without some experimentation.
786        #[inline]
787        pub fn update(&mut self, item: T, value: i64) {
788            self.updates.push((item, value));
789            self.maintain_bounds();
790        }
791
792        /// Compact the internal representation.
793        ///
794        /// This method sort `self.updates` and consolidates elements with equal item, discarding
795        /// any whose accumulation is zero. It is optimized to only do this if the number of dirty
796        /// elements is non-zero.
797        #[inline]
798        pub fn compact(&mut self) {
799            if self.clean < self.updates.len() && self.updates.len() > 1 {
800                let len = consolidation::consolidate_slice(&mut self.updates);
801                self.updates.truncate(len);
802
803                // We can now retain only the first K records and throw away everything else
804                let mut limit = self.limit;
805                self.updates.retain(|x| {
806                    if limit > 0 {
807                        limit -= x.1;
808                        true
809                    } else {
810                        false
811                    }
812                });
813                // By the end of the loop above `limit` will either be:
814                // (a) Positive, in which case all updates were retained;
815                // (b) Zero, in which case we discarded all updates after limit became zero;
816                // (c) Negative, in which case the last record we retained had more copies
817                // than necessary. In this latter case, we need to do one final adjustment
818                // of the diff field of the last record so that the total sum of the diffs
819                // in the batch is K.
820                if limit < 0 {
821                    if let Some(item) = self.updates.last_mut() {
822                        // We are subtracting the limit *negated*, therefore we are subtracting a value
823                        // that is *greater* than or equal to zero, which represents the excess.
824                        item.1 -= -limit;
825                    }
826                }
827            }
828            self.clean = self.updates.len();
829        }
830
831        /// Maintain the bounds of pending (non-compacted) updates versus clean (compacted) data.
832        /// This function tries to minimize work by only compacting if enough work has accumulated.
833        fn maintain_bounds(&mut self) {
834            // if we have more than 32 elements and at least half of them are not clean, compact
835            if self.updates.len() > 32 && self.updates.len() >> 1 >= self.clean {
836                self.compact()
837            }
838        }
839    }
840
841    impl<T: Ord> IntoIterator for TopKBatch<T> {
842        type Item = (T, i64);
843        type IntoIter = smallvec::IntoIter<[(T, i64); 16]>;
844
845        fn into_iter(mut self) -> Self::IntoIter {
846            self.compact();
847            self.updates.into_iter()
848        }
849    }
850}
851
852/// Monoids for in-place compaction of monotonic streams.
853pub mod monoids {
854    use std::cell::RefCell;
855    use std::cmp::Ordering;
856    use std::hash::{Hash, Hasher};
857    use std::rc::Rc;
858
859    use columnation::{Columnation, Region};
860    use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
861    use mz_expr::ColumnOrder;
862    use mz_repr::{DatumVec, Diff, Row};
863    use serde::{Deserialize, Serialize};
864
865    /// A monoid containing a row and an ordering.
866    #[derive(Eq, PartialEq, Debug, Serialize, Deserialize, Hash, Default)]
867    pub struct Top1Monoid {
868        pub row: Row,
869        pub order_key: Vec<ColumnOrder>,
870    }
871
872    impl Clone for Top1Monoid {
873        #[inline]
874        fn clone(&self) -> Self {
875            Self {
876                row: self.row.clone(),
877                order_key: self.order_key.clone(),
878            }
879        }
880
881        #[inline]
882        fn clone_from(&mut self, source: &Self) {
883            self.row.clone_from(&source.row);
884            self.order_key.clone_from(&source.order_key);
885        }
886    }
887
888    impl Multiply<Diff> for Top1Monoid {
889        type Output = Self;
890
891        fn multiply(self, factor: &Diff) -> Self {
892            // Multiplication in Top1Monoid is idempotent, and its
893            // users must ascertain its monotonicity beforehand
894            // (typically with ensure_monotonic) since it has no zero
895            // value for us to use here.
896            assert!(factor.is_positive());
897            self
898        }
899    }
900
901    impl Ord for Top1Monoid {
902        fn cmp(&self, other: &Self) -> Ordering {
903            debug_assert_eq!(self.order_key, other.order_key);
904
905            // It might be nice to cache this row decoding like the non-monotonic codepath, but we'd
906            // have to store the decoded Datums in the same struct as the Row, which gets tricky.
907            let left: Vec<_> = self.row.unpack();
908            let right: Vec<_> = other.row.unpack();
909            mz_expr::compare_columns(&self.order_key, &left, &right, || left.cmp(&right))
910        }
911    }
912    impl PartialOrd for Top1Monoid {
913        fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
914            Some(self.cmp(other))
915        }
916    }
917
918    impl Semigroup for Top1Monoid {
919        fn plus_equals(&mut self, rhs: &Self) {
920            let cmp = (*self).cmp(rhs);
921            // NB: Reminder that TopK returns the _minimum_ K items.
922            if cmp == Ordering::Greater {
923                self.clone_from(rhs);
924            }
925        }
926    }
927
928    impl IsZero for Top1Monoid {
929        fn is_zero(&self) -> bool {
930            false
931        }
932    }
933
934    impl Columnation for Top1Monoid {
935        type InnerRegion = Top1MonoidRegion;
936    }
937
938    #[derive(Default)]
939    pub struct Top1MonoidRegion {
940        row_region: <Row as Columnation>::InnerRegion,
941        order_key_region: <Vec<ColumnOrder> as Columnation>::InnerRegion,
942    }
943
944    impl Region for Top1MonoidRegion {
945        type Item = Top1Monoid;
946
947        unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
948            let row = unsafe { self.row_region.copy(&item.row) };
949            let order_key = unsafe { self.order_key_region.copy(&item.order_key) };
950            Self::Item { row, order_key }
951        }
952
953        fn clear(&mut self) {
954            self.row_region.clear();
955            self.order_key_region.clear();
956        }
957
958        fn reserve_items<'a, I>(&mut self, items1: I)
959        where
960            Self: 'a,
961            I: Iterator<Item = &'a Self::Item> + Clone,
962        {
963            let items2 = items1.clone();
964            self.row_region
965                .reserve_items(items1.into_iter().map(|s| &s.row));
966            self.order_key_region
967                .reserve_items(items2.into_iter().map(|s| &s.order_key));
968        }
969
970        fn reserve_regions<'a, I>(&mut self, regions1: I)
971        where
972            Self: 'a,
973            I: Iterator<Item = &'a Self> + Clone,
974        {
975            let regions2 = regions1.clone();
976            self.row_region
977                .reserve_regions(regions1.into_iter().map(|s| &s.row_region));
978            self.order_key_region
979                .reserve_regions(regions2.into_iter().map(|s| &s.order_key_region));
980        }
981
982        fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
983            self.row_region.heap_size(&mut callback);
984            self.order_key_region.heap_size(callback);
985        }
986    }
987
988    /// A shared portion of a thread-local top-1 monoid implementation.
989    #[derive(Debug)]
990    pub struct Top1MonoidShared {
991        pub order_key: Vec<ColumnOrder>,
992        pub left: DatumVec,
993        pub right: DatumVec,
994    }
995
996    /// A monoid containing a row and a shared pointer to a shared structure.
997    /// Only suitable for thread-local aggregations.
998    #[derive(Debug, Clone)]
999    pub struct Top1MonoidLocal {
1000        pub row: Row,
1001        pub shared: Rc<RefCell<Top1MonoidShared>>,
1002    }
1003
1004    impl Top1MonoidLocal {
1005        pub fn into_row(self) -> Row {
1006            self.row
1007        }
1008    }
1009
1010    impl PartialEq for Top1MonoidLocal {
1011        fn eq(&self, other: &Self) -> bool {
1012            self.row.eq(&other.row)
1013        }
1014    }
1015
1016    impl Eq for Top1MonoidLocal {}
1017
1018    impl Hash for Top1MonoidLocal {
1019        fn hash<H: Hasher>(&self, state: &mut H) {
1020            self.row.hash(state);
1021        }
1022    }
1023
1024    impl Ord for Top1MonoidLocal {
1025        fn cmp(&self, other: &Self) -> Ordering {
1026            debug_assert!(Rc::ptr_eq(&self.shared, &other.shared));
1027            let Top1MonoidShared {
1028                left,
1029                right,
1030                order_key,
1031            } = &mut *self.shared.borrow_mut();
1032
1033            let left = left.borrow_with(&self.row);
1034            let right = right.borrow_with(&other.row);
1035            mz_expr::compare_columns(order_key, &left, &right, || left.cmp(&right))
1036        }
1037    }
1038
1039    impl PartialOrd for Top1MonoidLocal {
1040        fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1041            Some(self.cmp(other))
1042        }
1043    }
1044
1045    impl Semigroup for Top1MonoidLocal {
1046        fn plus_equals(&mut self, rhs: &Self) {
1047            let cmp = (*self).cmp(rhs);
1048            // NB: Reminder that TopK returns the _minimum_ K items.
1049            if cmp == Ordering::Greater {
1050                self.clone_from(rhs);
1051            }
1052        }
1053    }
1054
1055    impl IsZero for Top1MonoidLocal {
1056        fn is_zero(&self) -> bool {
1057            false
1058        }
1059    }
1060}