Skip to main content

mz_compute/render/
top_k.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! TopK execution logic.
11//!
12//! Consult [TopKPlan] documentation for details.
13
14use std::cell::RefCell;
15use std::collections::BTreeMap;
16use std::rc::Rc;
17
18use differential_dataflow::AsCollection;
19use differential_dataflow::hashable::Hashable;
20use differential_dataflow::lattice::Lattice;
21use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
22use differential_dataflow::operators::iterate::Variable as SemigroupVariable;
23use differential_dataflow::trace::implementations::merge_batcher::container::MergerChunk;
24use differential_dataflow::trace::{Builder, Trace};
25use differential_dataflow::{Data, VecCollection};
26use mz_compute_types::plan::top_k::{
27    BasicTopKPlan, MonotonicTop1Plan, MonotonicTopKPlan, TopKPlan,
28};
29use mz_expr::func::CastUint64ToInt64;
30use mz_expr::{BinaryFunc, EvalError, MirScalarExpr, UnaryFunc, func};
31use mz_ore::cast::CastFrom;
32use mz_ore::soft_assert_or_log;
33use mz_repr::{Datum, DatumVec, Diff, ReprScalarType, Row, SharedRow};
34use mz_storage_types::errors::DataflowError;
35use mz_timely_util::operator::CollectionExt;
36use timely::Container;
37use timely::container::{CapacityContainerBuilder, PushInto};
38use timely::dataflow::Scope;
39use timely::dataflow::channels::pact::Pipeline;
40use timely::dataflow::operators::Operator;
41
42use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
43use crate::extensions::reduce::MzReduce;
44use crate::render::Pairer;
45use crate::render::context::{CollectionBundle, Context};
46use crate::render::errors::MaybeValidatingRow;
47use crate::row_spine::{
48    DatumSeq, RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowValBuilder, RowValSpine,
49};
50use crate::typedefs::{KeyBatcher, MzTimestamp, RowRowSpine, RowSpine};
51
52// The implementation requires integer timestamps to be able to delay feedback for monotonic inputs.
53impl<G> Context<G>
54where
55    G: Scope,
56    G::Timestamp: crate::render::RenderTimestamp,
57{
58    pub(crate) fn render_topk(
59        &self,
60        input: CollectionBundle<G>,
61        top_k_plan: TopKPlan,
62    ) -> CollectionBundle<G> {
63        let (ok_input, err_input) = input.as_specific_collection(None, &self.config_set);
64
65        // We create a new region to compartmentalize the topk logic.
66        let (ok_result, err_collection) = ok_input.scope().region_named("TopK", |inner| {
67            let ok_input = ok_input.enter_region(inner);
68            let mut err_collection = err_input.enter_region(inner);
69
70            // Determine if there should be errors due to limit evaluation; update `err_collection`.
71            // TODO(vmarcos): We evaluate the limit expression below for each input update. There
72            // is an opportunity to do so for every group key instead if the error handling is
73            // integrated with: 1. The intra-timestamp thinning step in monotonic top-k, e.g., by
74            // adding an error output there; 2. The validating reduction on basic top-k
75            // (database-issues#7108).
76
77            match top_k_plan.limit().map(|l| (l.as_literal(), l)) {
78                None => {}
79                Some((Some(Ok(literal)), _))
80                    if literal == Datum::Null || literal.unwrap_int64() >= 0 => {}
81                Some((_, expr)) => {
82                    // Produce errors from limit selectors that error or are
83                    // negative, and nothing from limit selectors that do
84                    // not. Note that even if expr.could_error() is false,
85                    // the expression might still return a negative limit and
86                    // thus needs to be checked.
87                    let expr = expr.clone();
88                    let mut datum_vec = mz_repr::DatumVec::new();
89                    let errors = ok_input.clone().flat_map(move |row| {
90                        let temp_storage = mz_repr::RowArena::new();
91                        let datums = datum_vec.borrow_with(&row);
92                        match expr.eval(&datums[..], &temp_storage) {
93                            Ok(l) if l != Datum::Null && l.unwrap_int64() < 0 => {
94                                Some(EvalError::NegLimit.into())
95                            }
96                            Ok(_) => None,
97                            Err(e) => Some(e.into()),
98                        }
99                    });
100                    err_collection = err_collection.concat(errors);
101                }
102            }
103
104            let ok_result = match top_k_plan {
105                TopKPlan::MonotonicTop1(MonotonicTop1Plan {
106                    group_key,
107                    order_key,
108                    must_consolidate,
109                }) => {
110                    let (oks, errs) = self.render_top1_monotonic(
111                        ok_input,
112                        group_key,
113                        order_key,
114                        must_consolidate,
115                    );
116                    err_collection = err_collection.concat(errs);
117                    oks
118                }
119                TopKPlan::MonotonicTopK(MonotonicTopKPlan {
120                    order_key,
121                    group_key,
122                    arity,
123                    mut limit,
124                    must_consolidate,
125                }) => {
126                    // Must permute `limit` to reference `group_key` elements as if in order.
127                    if let Some(expr) = limit.as_mut() {
128                        let mut map = BTreeMap::new();
129                        for (index, column) in group_key.iter().enumerate() {
130                            map.insert(*column, index);
131                        }
132                        expr.permute_map(&map);
133                    }
134
135                    // Map the group key along with the row and consolidate if required to do so.
136                    let mut datum_vec = mz_repr::DatumVec::new();
137                    let mut ok_scope = ok_input.scope();
138                    let collection = ok_input
139                        .map(move |row| {
140                            let group_row = {
141                                let datums = datum_vec.borrow_with(&row);
142                                SharedRow::pack(group_key.iter().map(|i| datums[*i]))
143                            };
144                            (group_row, row)
145                        })
146                        .consolidate_named_if::<KeyBatcher<_, _, _>>(
147                            must_consolidate,
148                            "Consolidated MonotonicTopK input",
149                        );
150
151                    // It should be now possible to ensure that we have a monotonic collection.
152                    let error_logger = self.error_logger();
153                    let (collection, errs) = collection.ensure_monotonic(move |data, diff| {
154                        error_logger.log(
155                            "Non-monotonic input to MonotonicTopK",
156                            &format!("data={data:?}, diff={diff}"),
157                        );
158                        let m = "tried to build monotonic top-k on non-monotonic input".into();
159                        (DataflowError::from(EvalError::Internal(m)), Diff::ONE)
160                    });
161                    err_collection = err_collection.concat(errs);
162
163                    // For monotonic inputs, we are able to thin the input relation in two stages:
164                    // 1. First, we can do an intra-timestamp thinning which has the advantage of
165                    //    being computed in a streaming fashion, even for the initial snapshot.
166                    // 2. Then, we can do inter-timestamp thinning by feeding back negations for
167                    //    any records that have been invalidated.
168                    let collection = if let Some(limit) = limit.clone() {
169                        render_intra_ts_thinning(collection, order_key.clone(), limit)
170                    } else {
171                        collection
172                    };
173
174                    let pairer = Pairer::new(1);
175                    let collection = collection.map(move |(group_row, row)| {
176                        let hash = row.hashed();
177                        let hash_key = pairer.merge(std::iter::once(Datum::from(hash)), &group_row);
178                        (hash_key, row)
179                    });
180
181                    // For monotonic inputs, we are able to retract inputs that can no longer be produced
182                    // as outputs. Any inputs beyond `offset + limit` will never again be produced as
183                    // outputs, and can be removed. The simplest form of this is when `offset == 0` and
184                    // these removable records are those in the input not produced in the output.
185                    // TODO: consider broadening this optimization to `offset > 0` by first filtering
186                    // down to `offset = 0` and `limit = offset + limit`, followed by a finishing act
187                    // of `offset` and `limit`, discarding only the records not produced in the intermediate
188                    // stage.
189                    let delay = std::time::Duration::from_secs(10);
190                    let (retractions_var, retractions) = SemigroupVariable::new(
191                        &mut ok_scope,
192                        <G::Timestamp as crate::render::RenderTimestamp>::system_delay(
193                            delay.try_into().expect("must fit"),
194                        ),
195                    );
196                    let thinned = collection.clone().concat(retractions.negate());
197
198                    // As an additional optimization, we can skip creating the full topk hierachy
199                    // here since we now have an upper bound on the number records due to the
200                    // intra-ts thinning. The maximum number of records per timestamp is
201                    // (num_workers * limit), which we expect to be a small number and so we render
202                    // a single topk stage.
203                    let (result, errs) =
204                        self.build_topk_stage(thinned, order_key, 1u64, 0, limit, arity, false);
205                    // Consolidate the output of `build_topk_stage` because it's not guaranteed to be.
206                    let result = CollectionExt::consolidate_named::<KeyBatcher<_, _, _>>(
207                        result,
208                        "Monotonic TopK final consolidate",
209                    );
210                    retractions_var.set(collection.concat(result.clone().negate()));
211                    soft_assert_or_log!(
212                        errs.is_none(),
213                        "requested no validation, but received error collection"
214                    );
215
216                    result.map(|(_key_hash, row)| row)
217                }
218                TopKPlan::Basic(BasicTopKPlan {
219                    group_key,
220                    order_key,
221                    offset,
222                    mut limit,
223                    arity,
224                    buckets,
225                }) => {
226                    // Must permute `limit` to reference `group_key` elements as if in order.
227                    if let Some(expr) = limit.as_mut() {
228                        let mut map = BTreeMap::new();
229                        for (index, column) in group_key.iter().enumerate() {
230                            map.insert(*column, index);
231                        }
232                        expr.permute_map(&map);
233                    }
234
235                    let (oks, errs) = self.build_topk(
236                        ok_input, group_key, order_key, offset, limit, arity, buckets,
237                    );
238                    err_collection = err_collection.concat(errs);
239                    oks
240                }
241            };
242
243            // Extract the results from the region.
244            (ok_result.leave_region(), err_collection.leave_region())
245        });
246
247        CollectionBundle::from_collections(ok_result, err_collection)
248    }
249
250    /// Constructs a TopK dataflow subgraph.
251    fn build_topk<S>(
252        &self,
253        collection: VecCollection<S, Row, Diff>,
254        group_key: Vec<usize>,
255        order_key: Vec<mz_expr::ColumnOrder>,
256        offset: usize,
257        limit: Option<mz_expr::MirScalarExpr>,
258        arity: usize,
259        buckets: Vec<u64>,
260    ) -> (
261        VecCollection<S, Row, Diff>,
262        VecCollection<S, DataflowError, Diff>,
263    )
264    where
265        S: Scope<Timestamp = G::Timestamp>,
266    {
267        let pairer = Pairer::new(1);
268        let mut datum_vec = mz_repr::DatumVec::new();
269        let mut collection = collection.map({
270            move |row| {
271                let group_row = {
272                    let row_hash = row.hashed();
273                    let datums = datum_vec.borrow_with(&row);
274                    let iterator = group_key.iter().map(|i| datums[*i]);
275                    pairer.merge(std::iter::once(Datum::from(row_hash)), iterator)
276                };
277                (group_row, row)
278            }
279        });
280
281        let mut validating = true;
282        let mut err_collection: Option<VecCollection<S, _, _>> = None;
283
284        if let Some(mut limit) = limit.clone() {
285            // We may need a new `limit` that reflects the addition of `offset`.
286            // Ideally we compile it down to a literal if at all possible.
287            if offset > 0 {
288                let new_limit = (|| {
289                    let limit = limit.as_literal_int64()?;
290                    let offset = i64::try_from(offset).ok()?;
291                    limit.checked_add(offset)
292                })();
293
294                if let Some(new_limit) = new_limit {
295                    limit =
296                        MirScalarExpr::literal_ok(Datum::Int64(new_limit), ReprScalarType::Int64);
297                } else {
298                    limit = limit.call_binary(
299                        MirScalarExpr::literal_ok(
300                            Datum::UInt64(u64::cast_from(offset)),
301                            ReprScalarType::UInt64,
302                        )
303                        .call_unary(UnaryFunc::CastUint64ToInt64(CastUint64ToInt64)),
304                        BinaryFunc::AddInt64(func::AddInt64),
305                    );
306                }
307            }
308
309            // These bucket values define the shifts that happen to the 64 bit hash of the
310            // record, and should have the properties that 1. there are not too many of them,
311            // and 2. each has a modest difference to the next.
312            for bucket in buckets.into_iter() {
313                // here we do not apply `offset`, but instead restrict ourself with a limit
314                // that includes the offset. We cannot apply `offset` until we perform the
315                // final, complete reduction.
316                let (oks, errs) = self.build_topk_stage(
317                    collection,
318                    order_key.clone(),
319                    bucket,
320                    0,
321                    Some(limit.clone()),
322                    arity,
323                    validating,
324                );
325                collection = oks;
326                if validating {
327                    err_collection = errs;
328                    validating = false;
329                }
330            }
331        }
332
333        // We do a final step, both to make sure that we complete the reduction, and to correctly
334        // apply `offset` to the final group, as we have not yet been applying it to the partially
335        // formed groups.
336        let (oks, errs) = self.build_topk_stage(
337            collection, order_key, 1u64, offset, limit, arity, validating,
338        );
339        // Consolidate the output of `build_topk_stage` because it's not guaranteed to be.
340        let oks =
341            CollectionExt::consolidate_named::<KeyBatcher<_, _, _>>(oks, "TopK final consolidate");
342        collection = oks;
343        if validating {
344            err_collection = errs;
345        }
346        (
347            collection.map(|(_key_hash, row)| row),
348            err_collection.expect("at least one stage validated its inputs"),
349        )
350    }
351
352    /// To provide a robust incremental orderby-limit experience, we want to avoid grouping *all*
353    /// records (or even large groups) and then applying the ordering and limit. Instead, a more
354    /// robust approach forms groups of bounded size and applies the offset and limit to each,
355    /// and then increases the sizes of the groups.
356    ///
357    /// Builds a "stage", which uses a finer grouping than is required to reduce the volume of
358    /// updates, and to reduce the amount of work on the critical path for updates. The cost is
359    /// a larger number of arrangements when this optimization does nothing beneficial.
360    ///
361    /// The function accepts a collection of the form `(hash_key, row)`, a modulus it applies to the
362    /// `hash_key`'s hash datum, an `offset` for returning results, and a `limit` to restrict the
363    /// output size. `arity` represents the number of columns in the input data, and
364    /// if `validating` is true, we check for negative multiplicities, which indicate
365    /// an error in the input data.
366    ///
367    /// The output of this function is _not consolidated_.
368    ///
369    /// The dataflow fragment has the following shape:
370    /// ```text
371    ///     | input
372    ///     |
373    ///   arrange
374    ///     |\
375    ///     | \
376    ///     |  reduce
377    ///     |  |
378    ///     concat
379    ///     |
380    ///     | output
381    /// ```
382    /// There are additional map/flat_map operators as well as error demuxing operators, but we're
383    /// omitting them here for the sake of simplicity.
384    fn build_topk_stage<S>(
385        &self,
386        collection: VecCollection<S, (Row, Row), Diff>,
387        order_key: Vec<mz_expr::ColumnOrder>,
388        modulus: u64,
389        offset: usize,
390        limit: Option<mz_expr::MirScalarExpr>,
391        arity: usize,
392        validating: bool,
393    ) -> (
394        VecCollection<S, (Row, Row), Diff>,
395        Option<VecCollection<S, DataflowError, Diff>>,
396    )
397    where
398        S: Scope<Timestamp = G::Timestamp>,
399    {
400        // Form appropriate input by updating the `hash` column (first datum in `hash_key`) by
401        // applying `modulus`.
402        let input = collection.map(move |(hash_key, row)| {
403            let mut hash_key_iter = hash_key.iter();
404            let hash = hash_key_iter.next().unwrap().unwrap_uint64() % modulus;
405            let hash_key = SharedRow::pack(std::iter::once(hash.into()).chain(hash_key_iter));
406            (hash_key, row)
407        });
408
409        // If validating: demux errors, otherwise we cannot produce errors.
410        let (input, oks, errs) = if validating {
411            // Build topk stage, produce errors for invalid multiplicities.
412            let (input, stage) = build_topk_negated_stage::<
413                S,
414                RowValBuilder<_, _, _>,
415                RowValSpine<Result<Row, Row>, _, _>,
416            >(&input, order_key, offset, limit, arity);
417            let stage = stage.as_collection(|k, v| (k.to_row(), v.clone()));
418
419            // Demux oks and errors.
420            let error_logger = self.error_logger();
421            type CB<C> = CapacityContainerBuilder<C>;
422            let (oks, errs) = stage.map_fallible::<CB<_>, CB<_>, _, _, _>(
423                "Demuxing Errors",
424                move |(hk, result)| match result {
425                    Err(v) => {
426                        let mut hk_iter = hk.iter();
427                        let h = hk_iter.next().unwrap().unwrap_uint64();
428                        let k = SharedRow::pack(hk_iter);
429                        let message = "Negative multiplicities in TopK";
430                        error_logger.log(message, &format!("k={k:?}, h={h}, v={v:?}"));
431                        Err(EvalError::Internal(message.into()).into())
432                    }
433                    Ok(t) => Ok((hk, t)),
434                },
435            );
436            (input, oks, Some(errs))
437        } else {
438            // Build non-validating topk stage.
439            let (input, stage) =
440                build_topk_negated_stage::<S, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
441                    &input, order_key, offset, limit, arity,
442                );
443            // Turn arrangement into collection.
444            let stage = stage.as_collection(|k, v| (k.to_row(), v.to_row()));
445
446            (input, stage, None)
447        };
448        let input = input.as_collection(|k, v| (k.to_row(), v.to_row()));
449        (oks.concat(input), errs)
450    }
451
452    fn render_top1_monotonic<S>(
453        &self,
454        collection: VecCollection<S, Row, Diff>,
455        group_key: Vec<usize>,
456        order_key: Vec<mz_expr::ColumnOrder>,
457        must_consolidate: bool,
458    ) -> (
459        VecCollection<S, Row, Diff>,
460        VecCollection<S, DataflowError, Diff>,
461    )
462    where
463        S: Scope<Timestamp = G::Timestamp>,
464    {
465        // We can place our rows directly into the diff field, and only keep the relevant one
466        // corresponding to evaluating our aggregate, instead of having to do a hierarchical
467        // reduction. We start by mapping the group key along with the row and consolidating
468        // if required to do so.
469        let collection = collection
470            .map({
471                let mut datum_vec = mz_repr::DatumVec::new();
472                move |row| {
473                    // Scoped to allow borrow of `row` to drop.
474                    let group_key = {
475                        let datums = datum_vec.borrow_with(&row);
476                        SharedRow::pack(group_key.iter().map(|i| datums[*i]))
477                    };
478                    (group_key, row)
479                }
480            })
481            .consolidate_named_if::<KeyBatcher<_, _, _>>(
482                must_consolidate,
483                "Consolidated MonotonicTop1 input",
484            );
485
486        // It should be now possible to ensure that we have a monotonic collection and process it.
487        let error_logger = self.error_logger();
488        let (partial, errs) = collection.ensure_monotonic(move |data, diff| {
489            error_logger.log(
490                "Non-monotonic input to MonotonicTop1",
491                &format!("data={data:?}, diff={diff}"),
492            );
493            let m = "tried to build monotonic top-1 on non-monotonic input".into();
494            (EvalError::Internal(m).into(), Diff::ONE)
495        });
496        let partial: KeyCollection<_, _, _> = partial
497            .explode_one(move |(group_key, row)| {
498                (
499                    group_key,
500                    monoids::Top1Monoid {
501                        row,
502                        order_key: order_key.clone(),
503                    },
504                )
505            })
506            .into();
507        let result = partial
508            .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
509                "Arranged MonotonicTop1 partial [val: empty]",
510            )
511            .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
512                "MonotonicTop1",
513                move |_key, input, output| {
514                    let accum: &monoids::Top1Monoid = &input[0].1;
515                    output.push((accum.row.clone(), Diff::ONE));
516                },
517            );
518        // TODO(database-issues#2288): Here we discard the arranged output.
519        (result.as_collection(|_k, v| v.to_row()), errs)
520    }
521}
522
523/// Build a stage of a topk reduction. Maintains the _retractions_ of the output instead of emitted
524/// rows. This has the benefit that we have to maintain state proportionally to size of the output
525/// instead of the size of the input.
526///
527/// Returns two arrangements:
528/// * The arranged input data without modifications, and
529/// * the maintained negated output data.
530fn build_topk_negated_stage<G, Bu, Tr>(
531    input: &VecCollection<G, (Row, Row), Diff>,
532    order_key: Vec<mz_expr::ColumnOrder>,
533    offset: usize,
534    limit: Option<mz_expr::MirScalarExpr>,
535    arity: usize,
536) -> (
537    Arranged<G, TraceAgent<RowRowSpine<G::Timestamp, Diff>>>,
538    Arranged<G, TraceAgent<Tr>>,
539)
540where
541    G: Scope,
542    G::Timestamp: MzTimestamp,
543    Bu: Builder<
544            Time = G::Timestamp,
545            Input: Container + MergerChunk + PushInto<((Row, Tr::ValOwn), G::Timestamp, Diff)>,
546            Output = Tr::Batch,
547        >,
548    Tr: for<'a> Trace<
549            Key<'a> = DatumSeq<'a>,
550            KeyOwn = Row,
551            ValOwn: Data + MaybeValidatingRow<Row, Row>,
552            Time = G::Timestamp,
553            Diff = Diff,
554        > + 'static,
555    Arranged<G, TraceAgent<Tr>>: ArrangementSize,
556{
557    let mut datum_vec = mz_repr::DatumVec::new();
558
559    // We only want to arrange parts of the input that are not part of the actual output
560    // such that `input.concat(&negated_output)` yields the correct TopK
561    // NOTE(vmarcos): The arranged input operator name below is used in the tuning advice
562    // built-in view mz_introspection.mz_expected_group_size_advice.
563    let arranged = input
564        .clone()
565        .mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
566            "Arranged TopK input",
567        );
568
569    // Eagerly evaluate literal limits.
570    let limit = limit.map(|l| match l.as_literal() {
571        Some(Ok(Datum::Null)) => Ok(Diff::MAX),
572        Some(Ok(d)) => Ok(Diff::from(d.unwrap_int64())),
573        _ => Err(l),
574    });
575
576    let reduced = arranged
577        .clone()
578        .mz_reduce_abelian::<_, Bu, Tr>("Reduced TopK input", {
579            move |mut hash_key, source, target: &mut Vec<(Tr::ValOwn, Diff)>| {
580                // Unpack the limit, either into an integer literal or an expression to evaluate.
581                let limit = match &limit {
582                    Some(Ok(lit)) => Some(*lit),
583                    Some(Err(expr)) => {
584                        // Unpack `key` after skipping the hash and determine the limit.
585                        // If the limit errors, use a zero limit; errors are surfaced elsewhere.
586                        let temp_storage = mz_repr::RowArena::new();
587                        let _hash = hash_key.next();
588                        let mut key_datums = datum_vec.borrow();
589                        key_datums.extend(hash_key);
590                        let datum_limit = expr
591                            .eval(&key_datums, &temp_storage)
592                            .unwrap_or(Datum::Int64(0));
593                        Some(match datum_limit {
594                            Datum::Null => Diff::MAX,
595                            d => Diff::from(d.unwrap_int64()),
596                        })
597                    }
598                    None => None,
599                };
600
601                if let Some(err) = Tr::ValOwn::into_error() {
602                    for (datums, diff) in source.iter() {
603                        if diff.is_positive() {
604                            continue;
605                        }
606                        target.push((err((*datums).to_row()), Diff::ONE));
607                        return;
608                    }
609                }
610
611                // Determine if we must actually shrink the result set.
612                let must_shrink = offset > 0
613                    || limit
614                        .map(|l| source.iter().map(|(_, d)| *d).sum::<Diff>() > l)
615                        .unwrap_or(false);
616                if !must_shrink {
617                    return;
618                }
619
620                // First go ahead and emit all records. Note that we ensure target
621                // has the capacity to hold at least these records, and avoid any
622                // dependencies on the user-provided (potentially unbounded) limit.
623                target.reserve(source.len());
624                for (datums, diff) in source.iter() {
625                    target.push((Tr::ValOwn::ok((*datums).to_row()), -diff));
626                }
627                // local copies that may count down to zero.
628                let mut offset = offset;
629                let mut limit = limit;
630
631                // The order in which we should produce rows.
632                let mut indexes = (0..source.len()).collect::<Vec<_>>();
633                // We decode the datums once, into a common buffer for efficiency.
634                // Each row should contain `arity` columns; we should check that.
635                let mut buffer = datum_vec.borrow();
636                for (index, (datums, _)) in source.iter().enumerate() {
637                    buffer.extend(*datums);
638                    assert_eq!(buffer.len(), arity * (index + 1));
639                }
640                let width = buffer.len() / source.len();
641
642                //todo: use arrangements or otherwise make the sort more performant?
643                indexes.sort_by(|left, right| {
644                    let left = &buffer[left * width..][..width];
645                    let right = &buffer[right * width..][..width];
646                    // Note: source was originally ordered by the u8 array representation
647                    // of rows, but left.cmp(right) uses Datum::cmp.
648                    mz_expr::compare_columns(&order_key, left, right, || left.cmp(right))
649                });
650
651                // We now need to lay out the data in order of `buffer`, but respecting
652                // the `offset` and `limit` constraints.
653                for index in indexes.into_iter() {
654                    let (datums, mut diff) = source[index];
655                    if !diff.is_positive() {
656                        continue;
657                    }
658                    // If we are still skipping early records ...
659                    if offset > 0 {
660                        let to_skip =
661                            std::cmp::min(offset, usize::try_from(diff.into_inner()).unwrap());
662                        offset -= to_skip;
663                        diff -= Diff::try_from(to_skip).unwrap();
664                    }
665                    // We should produce at most `limit` records.
666                    if let Some(limit) = &mut limit {
667                        diff = std::cmp::min(diff, Diff::from(*limit));
668                        *limit -= diff;
669                    }
670                    // Output the indicated number of rows.
671                    if diff.is_positive() {
672                        // Emit retractions for the elements actually part of
673                        // the set of TopK elements.
674                        target.push((Tr::ValOwn::ok(datums.to_row()), diff));
675                    }
676                }
677            }
678        });
679    (arranged, reduced)
680}
681
682fn render_intra_ts_thinning<S>(
683    collection: VecCollection<S, (Row, Row), Diff>,
684    order_key: Vec<mz_expr::ColumnOrder>,
685    limit: mz_expr::MirScalarExpr,
686) -> VecCollection<S, (Row, Row), Diff>
687where
688    S: Scope,
689    S::Timestamp: Lattice,
690{
691    let mut datum_vec = mz_repr::DatumVec::new();
692
693    let mut aggregates = BTreeMap::new();
694    let shared = Rc::new(RefCell::new(monoids::Top1MonoidShared {
695        order_key,
696        left: DatumVec::new(),
697        right: DatumVec::new(),
698    }));
699    collection
700        .inner
701        .unary_notify(
702            Pipeline,
703            "TopKIntraTimeThinning",
704            [],
705            move |input, output, notificator| {
706                input.for_each_time(|time, data| {
707                    let agg_time = aggregates
708                        .entry(time.time().clone())
709                        .or_insert_with(BTreeMap::new);
710                    for ((grp_row, row), record_time, diff) in data.flat_map(|data| data.drain(..))
711                    {
712                        let monoid = monoids::Top1MonoidLocal {
713                            row,
714                            shared: Rc::clone(&shared),
715                        };
716
717                        // Evalute the limit, first as a constant and then against the key if needed.
718                        let limit = if let Some(l) = limit.as_literal_int64() {
719                            l
720                        } else {
721                            let temp_storage = mz_repr::RowArena::new();
722                            let key_datums = datum_vec.borrow_with(&grp_row);
723                            // Unpack `key` and determine the limit.
724                            // If the limit errors, use a zero limit; errors are surfaced elsewhere.
725                            let datum_limit = limit
726                                .eval(&key_datums, &temp_storage)
727                                .unwrap_or(mz_repr::Datum::Int64(0));
728                            if datum_limit == Datum::Null {
729                                i64::MAX
730                            } else {
731                                datum_limit.unwrap_int64()
732                            }
733                        };
734
735                        let topk = agg_time
736                            .entry((grp_row, record_time))
737                            .or_insert_with(move || topk_agg::TopKBatch::new(limit));
738                        topk.update(monoid, diff.into_inner());
739                    }
740                    notificator.notify_at(time.retain(0));
741                });
742
743                notificator.for_each(|time, _, _| {
744                    if let Some(aggs) = aggregates.remove(time.time()) {
745                        let mut session = output.session(&time);
746                        for ((grp_row, record_time), topk) in aggs {
747                            session.give_iterator(topk.into_iter().map(|(monoid, diff)| {
748                                (
749                                    (grp_row.clone(), monoid.into_row()),
750                                    record_time.clone(),
751                                    diff.into(),
752                                )
753                            }))
754                        }
755                    }
756                });
757            },
758        )
759        .as_collection()
760}
761
762/// Types for in-place intra-ts aggregation of monotonic streams.
763pub mod topk_agg {
764    use differential_dataflow::consolidation;
765    use smallvec::SmallVec;
766
767    // TODO: This struct looks a lot like ChangeBatch and indeed its code is a modified version of
768    // that. It would be nice to find a way to reuse some or all of the code from there.
769    //
770    // Additionally, because we're calling into DD's consolidate method we are forced to work with
771    // the `Ord` trait which for the usage we do above means that we need to clone the `order_key`
772    // for each record. It would be nice to also remove the need for cloning that piece of data
773    pub struct TopKBatch<T> {
774        updates: SmallVec<[(T, i64); 16]>,
775        clean: usize,
776        limit: i64,
777    }
778
779    impl<T: Ord> TopKBatch<T> {
780        pub fn new(limit: i64) -> Self {
781            Self {
782                updates: SmallVec::new(),
783                clean: 0,
784                limit,
785            }
786        }
787
788        /// Adds a new update, for `item` with `value`.
789        ///
790        /// This could be optimized to perform compaction when the number of "dirty" elements exceeds
791        /// half the length of the list, which would keep the total footprint within reasonable bounds
792        /// even under an arbitrary number of updates. This has a cost, and it isn't clear whether it
793        /// is worth paying without some experimentation.
794        #[inline]
795        pub fn update(&mut self, item: T, value: i64) {
796            self.updates.push((item, value));
797            self.maintain_bounds();
798        }
799
800        /// Compact the internal representation.
801        ///
802        /// This method sort `self.updates` and consolidates elements with equal item, discarding
803        /// any whose accumulation is zero. It is optimized to only do this if the number of dirty
804        /// elements is non-zero.
805        #[inline]
806        pub fn compact(&mut self) {
807            if self.clean < self.updates.len() && self.updates.len() > 1 {
808                let len = consolidation::consolidate_slice(&mut self.updates);
809                self.updates.truncate(len);
810
811                // We can now retain only the first K records and throw away everything else
812                let mut limit = self.limit;
813                self.updates.retain(|x| {
814                    if limit > 0 {
815                        limit -= x.1;
816                        true
817                    } else {
818                        false
819                    }
820                });
821                // By the end of the loop above `limit` will either be:
822                // (a) Positive, in which case all updates were retained;
823                // (b) Zero, in which case we discarded all updates after limit became zero;
824                // (c) Negative, in which case the last record we retained had more copies
825                // than necessary. In this latter case, we need to do one final adjustment
826                // of the diff field of the last record so that the total sum of the diffs
827                // in the batch is K.
828                if limit < 0 {
829                    if let Some(item) = self.updates.last_mut() {
830                        // We are subtracting the limit *negated*, therefore we are subtracting a value
831                        // that is *greater* than or equal to zero, which represents the excess.
832                        item.1 -= -limit;
833                    }
834                }
835            }
836            self.clean = self.updates.len();
837        }
838
839        /// Maintain the bounds of pending (non-compacted) updates versus clean (compacted) data.
840        /// This function tries to minimize work by only compacting if enough work has accumulated.
841        fn maintain_bounds(&mut self) {
842            // if we have more than 32 elements and at least half of them are not clean, compact
843            if self.updates.len() > 32 && self.updates.len() >> 1 >= self.clean {
844                self.compact()
845            }
846        }
847    }
848
849    impl<T: Ord> IntoIterator for TopKBatch<T> {
850        type Item = (T, i64);
851        type IntoIter = smallvec::IntoIter<[(T, i64); 16]>;
852
853        fn into_iter(mut self) -> Self::IntoIter {
854            self.compact();
855            self.updates.into_iter()
856        }
857    }
858}
859
860/// Monoids for in-place compaction of monotonic streams.
861pub mod monoids {
862    use std::cell::RefCell;
863    use std::cmp::Ordering;
864    use std::hash::{Hash, Hasher};
865    use std::rc::Rc;
866
867    use differential_dataflow::containers::{Columnation, Region};
868    use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
869    use mz_expr::ColumnOrder;
870    use mz_repr::{DatumVec, Diff, Row};
871    use serde::{Deserialize, Serialize};
872
873    /// A monoid containing a row and an ordering.
874    #[derive(Eq, PartialEq, Debug, Serialize, Deserialize, Hash, Default)]
875    pub struct Top1Monoid {
876        pub row: Row,
877        pub order_key: Vec<ColumnOrder>,
878    }
879
880    impl Clone for Top1Monoid {
881        #[inline]
882        fn clone(&self) -> Self {
883            Self {
884                row: self.row.clone(),
885                order_key: self.order_key.clone(),
886            }
887        }
888
889        #[inline]
890        fn clone_from(&mut self, source: &Self) {
891            self.row.clone_from(&source.row);
892            self.order_key.clone_from(&source.order_key);
893        }
894    }
895
896    impl Multiply<Diff> for Top1Monoid {
897        type Output = Self;
898
899        fn multiply(self, factor: &Diff) -> Self {
900            // Multiplication in Top1Monoid is idempotent, and its
901            // users must ascertain its monotonicity beforehand
902            // (typically with ensure_monotonic) since it has no zero
903            // value for us to use here.
904            assert!(factor.is_positive());
905            self
906        }
907    }
908
909    impl Ord for Top1Monoid {
910        fn cmp(&self, other: &Self) -> Ordering {
911            debug_assert_eq!(self.order_key, other.order_key);
912
913            // It might be nice to cache this row decoding like the non-monotonic codepath, but we'd
914            // have to store the decoded Datums in the same struct as the Row, which gets tricky.
915            let left: Vec<_> = self.row.unpack();
916            let right: Vec<_> = other.row.unpack();
917            mz_expr::compare_columns(&self.order_key, &left, &right, || left.cmp(&right))
918        }
919    }
920    impl PartialOrd for Top1Monoid {
921        fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
922            Some(self.cmp(other))
923        }
924    }
925
926    impl Semigroup for Top1Monoid {
927        fn plus_equals(&mut self, rhs: &Self) {
928            let cmp = (*self).cmp(rhs);
929            // NB: Reminder that TopK returns the _minimum_ K items.
930            if cmp == Ordering::Greater {
931                self.clone_from(rhs);
932            }
933        }
934    }
935
936    impl IsZero for Top1Monoid {
937        fn is_zero(&self) -> bool {
938            false
939        }
940    }
941
942    impl Columnation for Top1Monoid {
943        type InnerRegion = Top1MonoidRegion;
944    }
945
946    #[derive(Default)]
947    pub struct Top1MonoidRegion {
948        row_region: <Row as Columnation>::InnerRegion,
949        order_key_region: <Vec<ColumnOrder> as Columnation>::InnerRegion,
950    }
951
952    impl Region for Top1MonoidRegion {
953        type Item = Top1Monoid;
954
955        unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
956            let row = unsafe { self.row_region.copy(&item.row) };
957            let order_key = unsafe { self.order_key_region.copy(&item.order_key) };
958            Self::Item { row, order_key }
959        }
960
961        fn clear(&mut self) {
962            self.row_region.clear();
963            self.order_key_region.clear();
964        }
965
966        fn reserve_items<'a, I>(&mut self, items1: I)
967        where
968            Self: 'a,
969            I: Iterator<Item = &'a Self::Item> + Clone,
970        {
971            let items2 = items1.clone();
972            self.row_region
973                .reserve_items(items1.into_iter().map(|s| &s.row));
974            self.order_key_region
975                .reserve_items(items2.into_iter().map(|s| &s.order_key));
976        }
977
978        fn reserve_regions<'a, I>(&mut self, regions1: I)
979        where
980            Self: 'a,
981            I: Iterator<Item = &'a Self> + Clone,
982        {
983            let regions2 = regions1.clone();
984            self.row_region
985                .reserve_regions(regions1.into_iter().map(|s| &s.row_region));
986            self.order_key_region
987                .reserve_regions(regions2.into_iter().map(|s| &s.order_key_region));
988        }
989
990        fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
991            self.row_region.heap_size(&mut callback);
992            self.order_key_region.heap_size(callback);
993        }
994    }
995
996    /// A shared portion of a thread-local top-1 monoid implementation.
997    #[derive(Debug)]
998    pub struct Top1MonoidShared {
999        pub order_key: Vec<ColumnOrder>,
1000        pub left: DatumVec,
1001        pub right: DatumVec,
1002    }
1003
1004    /// A monoid containing a row and a shared pointer to a shared structure.
1005    /// Only suitable for thread-local aggregations.
1006    #[derive(Debug, Clone)]
1007    pub struct Top1MonoidLocal {
1008        pub row: Row,
1009        pub shared: Rc<RefCell<Top1MonoidShared>>,
1010    }
1011
1012    impl Top1MonoidLocal {
1013        pub fn into_row(self) -> Row {
1014            self.row
1015        }
1016    }
1017
1018    impl PartialEq for Top1MonoidLocal {
1019        fn eq(&self, other: &Self) -> bool {
1020            self.row.eq(&other.row)
1021        }
1022    }
1023
1024    impl Eq for Top1MonoidLocal {}
1025
1026    impl Hash for Top1MonoidLocal {
1027        fn hash<H: Hasher>(&self, state: &mut H) {
1028            self.row.hash(state);
1029        }
1030    }
1031
1032    impl Ord for Top1MonoidLocal {
1033        fn cmp(&self, other: &Self) -> Ordering {
1034            debug_assert!(Rc::ptr_eq(&self.shared, &other.shared));
1035            let Top1MonoidShared {
1036                left,
1037                right,
1038                order_key,
1039            } = &mut *self.shared.borrow_mut();
1040
1041            let left = left.borrow_with(&self.row);
1042            let right = right.borrow_with(&other.row);
1043            mz_expr::compare_columns(order_key, &left, &right, || left.cmp(&right))
1044        }
1045    }
1046
1047    impl PartialOrd for Top1MonoidLocal {
1048        fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1049            Some(self.cmp(other))
1050        }
1051    }
1052
1053    impl Semigroup for Top1MonoidLocal {
1054        fn plus_equals(&mut self, rhs: &Self) {
1055            let cmp = (*self).cmp(rhs);
1056            // NB: Reminder that TopK returns the _minimum_ K items.
1057            if cmp == Ordering::Greater {
1058                self.clone_from(rhs);
1059            }
1060        }
1061    }
1062
1063    impl IsZero for Top1MonoidLocal {
1064        fn is_zero(&self) -> bool {
1065            false
1066        }
1067    }
1068}