mz_compute/render/
top_k.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! TopK execution logic.
11//!
12//! Consult [TopKPlan] documentation for details.
13
14use std::cell::RefCell;
15use std::collections::BTreeMap;
16use std::rc::Rc;
17
18use differential_dataflow::Data;
19use differential_dataflow::hashable::Hashable;
20use differential_dataflow::lattice::Lattice;
21use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
22use differential_dataflow::operators::iterate::SemigroupVariable;
23use differential_dataflow::trace::implementations::merge_batcher::container::MergerChunk;
24use differential_dataflow::trace::{Builder, Trace};
25use differential_dataflow::{AsCollection, Collection};
26use mz_compute_types::plan::top_k::{
27    BasicTopKPlan, MonotonicTop1Plan, MonotonicTopKPlan, TopKPlan,
28};
29use mz_expr::func::CastUint64ToInt64;
30use mz_expr::{BinaryFunc, EvalError, MirScalarExpr, UnaryFunc, func};
31use mz_ore::cast::CastFrom;
32use mz_ore::soft_assert_or_log;
33use mz_repr::{Datum, DatumVec, Diff, Row, SharedRow, SqlScalarType};
34use mz_storage_types::errors::DataflowError;
35use mz_timely_util::operator::CollectionExt;
36use timely::Container;
37use timely::container::{CapacityContainerBuilder, PushInto};
38use timely::dataflow::Scope;
39use timely::dataflow::channels::pact::Pipeline;
40use timely::dataflow::operators::Operator;
41
42use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
43use crate::extensions::reduce::MzReduce;
44use crate::render::Pairer;
45use crate::render::context::{CollectionBundle, Context};
46use crate::render::errors::MaybeValidatingRow;
47use crate::row_spine::{
48    DatumSeq, RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowValBuilder, RowValSpine,
49};
50use crate::typedefs::{KeyBatcher, MzTimestamp, RowRowSpine, RowSpine};
51
52// The implementation requires integer timestamps to be able to delay feedback for monotonic inputs.
53impl<G> Context<G>
54where
55    G: Scope,
56    G::Timestamp: crate::render::RenderTimestamp,
57{
58    pub(crate) fn render_topk(
59        &self,
60        input: CollectionBundle<G>,
61        top_k_plan: TopKPlan,
62    ) -> CollectionBundle<G> {
63        let (ok_input, err_input) = input.as_specific_collection(None, &self.config_set);
64
65        // We create a new region to compartmentalize the topk logic.
66        let (ok_result, err_collection) = ok_input.scope().region_named("TopK", |inner| {
67            let ok_input = ok_input.enter_region(inner);
68            let mut err_collection = err_input.enter_region(inner);
69
70            // Determine if there should be errors due to limit evaluation; update `err_collection`.
71            // TODO(vmarcos): We evaluate the limit expression below for each input update. There
72            // is an opportunity to do so for every group key instead if the error handling is
73            // integrated with: 1. The intra-timestamp thinning step in monotonic top-k, e.g., by
74            // adding an error output there; 2. The validating reduction on basic top-k
75            // (database-issues#7108).
76
77            match top_k_plan.limit().map(|l| (l.as_literal(), l)) {
78                None => {}
79                Some((Some(Ok(literal)), _))
80                    if literal == Datum::Null || literal.unwrap_int64() >= 0 => {}
81                Some((_, expr)) => {
82                    // Produce errors from limit selectors that error or are
83                    // negative, and nothing from limit selectors that do
84                    // not. Note that even if expr.could_error() is false,
85                    // the expression might still return a negative limit and
86                    // thus needs to be checked.
87                    let expr = expr.clone();
88                    let mut datum_vec = mz_repr::DatumVec::new();
89                    let errors = ok_input.flat_map(move |row| {
90                        let temp_storage = mz_repr::RowArena::new();
91                        let datums = datum_vec.borrow_with(&row);
92                        match expr.eval(&datums[..], &temp_storage) {
93                            Ok(l) if l != Datum::Null && l.unwrap_int64() < 0 => {
94                                Some(EvalError::NegLimit.into())
95                            }
96                            Ok(_) => None,
97                            Err(e) => Some(e.into()),
98                        }
99                    });
100                    err_collection = err_collection.concat(&errors);
101                }
102            }
103
104            let ok_result = match top_k_plan {
105                TopKPlan::MonotonicTop1(MonotonicTop1Plan {
106                    group_key,
107                    order_key,
108                    must_consolidate,
109                }) => {
110                    let (oks, errs) = self.render_top1_monotonic(
111                        ok_input,
112                        group_key,
113                        order_key,
114                        must_consolidate,
115                    );
116                    err_collection = err_collection.concat(&errs);
117                    oks
118                }
119                TopKPlan::MonotonicTopK(MonotonicTopKPlan {
120                    order_key,
121                    group_key,
122                    arity,
123                    mut limit,
124                    must_consolidate,
125                }) => {
126                    // Must permute `limit` to reference `group_key` elements as if in order.
127                    if let Some(expr) = limit.as_mut() {
128                        let mut map = BTreeMap::new();
129                        for (index, column) in group_key.iter().enumerate() {
130                            map.insert(*column, index);
131                        }
132                        expr.permute_map(&map);
133                    }
134
135                    // Map the group key along with the row and consolidate if required to do so.
136                    let mut datum_vec = mz_repr::DatumVec::new();
137                    let collection = ok_input
138                        .map(move |row| {
139                            let group_row = {
140                                let datums = datum_vec.borrow_with(&row);
141                                SharedRow::pack(group_key.iter().map(|i| datums[*i]))
142                            };
143                            (group_row, row)
144                        })
145                        .consolidate_named_if::<KeyBatcher<_, _, _>>(
146                            must_consolidate,
147                            "Consolidated MonotonicTopK input",
148                        );
149
150                    // It should be now possible to ensure that we have a monotonic collection.
151                    let error_logger = self.error_logger();
152                    let (collection, errs) = collection.ensure_monotonic(move |data, diff| {
153                        error_logger.log(
154                            "Non-monotonic input to MonotonicTopK",
155                            &format!("data={data:?}, diff={diff}"),
156                        );
157                        let m = "tried to build monotonic top-k on non-monotonic input".into();
158                        (DataflowError::from(EvalError::Internal(m)), Diff::ONE)
159                    });
160                    err_collection = err_collection.concat(&errs);
161
162                    // For monotonic inputs, we are able to thin the input relation in two stages:
163                    // 1. First, we can do an intra-timestamp thinning which has the advantage of
164                    //    being computed in a streaming fashion, even for the initial snapshot.
165                    // 2. Then, we can do inter-timestamp thinning by feeding back negations for
166                    //    any records that have been invalidated.
167                    let collection = if let Some(limit) = limit.clone() {
168                        render_intra_ts_thinning(collection, order_key.clone(), limit)
169                    } else {
170                        collection
171                    };
172
173                    let pairer = Pairer::new(1);
174                    let collection = collection.map(move |(group_row, row)| {
175                        let hash = row.hashed();
176                        let hash_key = pairer.merge(std::iter::once(Datum::from(hash)), &group_row);
177                        (hash_key, row)
178                    });
179
180                    // For monotonic inputs, we are able to retract inputs that can no longer be produced
181                    // as outputs. Any inputs beyond `offset + limit` will never again be produced as
182                    // outputs, and can be removed. The simplest form of this is when `offset == 0` and
183                    // these removable records are those in the input not produced in the output.
184                    // TODO: consider broadening this optimization to `offset > 0` by first filtering
185                    // down to `offset = 0` and `limit = offset + limit`, followed by a finishing act
186                    // of `offset` and `limit`, discarding only the records not produced in the intermediate
187                    // stage.
188                    let delay = std::time::Duration::from_secs(10);
189                    let retractions = SemigroupVariable::new(
190                        &mut ok_input.scope(),
191                        <G::Timestamp as crate::render::RenderTimestamp>::system_delay(
192                            delay.try_into().expect("must fit"),
193                        ),
194                    );
195                    let thinned = collection.concat(&retractions.negate());
196
197                    // As an additional optimization, we can skip creating the full topk hierachy
198                    // here since we now have an upper bound on the number records due to the
199                    // intra-ts thinning. The maximum number of records per timestamp is
200                    // (num_workers * limit), which we expect to be a small number and so we render
201                    // a single topk stage.
202                    let (result, errs) =
203                        self.build_topk_stage(thinned, order_key, 1u64, 0, limit, arity, false);
204                    // Consolidate the output of `build_topk_stage` because it's not guaranteed to be.
205                    let result = result.consolidate_named::<KeyBatcher<_, _, _>>(
206                        "Monotonic TopK final consolidate",
207                    );
208                    retractions.set(&collection.concat(&result.negate()));
209                    soft_assert_or_log!(
210                        errs.is_none(),
211                        "requested no validation, but received error collection"
212                    );
213
214                    result.map(|(_key_hash, row)| row)
215                }
216                TopKPlan::Basic(BasicTopKPlan {
217                    group_key,
218                    order_key,
219                    offset,
220                    mut limit,
221                    arity,
222                    buckets,
223                }) => {
224                    // Must permute `limit` to reference `group_key` elements as if in order.
225                    if let Some(expr) = limit.as_mut() {
226                        let mut map = BTreeMap::new();
227                        for (index, column) in group_key.iter().enumerate() {
228                            map.insert(*column, index);
229                        }
230                        expr.permute_map(&map);
231                    }
232
233                    let (oks, errs) = self.build_topk(
234                        ok_input, group_key, order_key, offset, limit, arity, buckets,
235                    );
236                    err_collection = err_collection.concat(&errs);
237                    oks
238                }
239            };
240
241            // Extract the results from the region.
242            (ok_result.leave_region(), err_collection.leave_region())
243        });
244
245        CollectionBundle::from_collections(ok_result, err_collection)
246    }
247
248    /// Constructs a TopK dataflow subgraph.
249    fn build_topk<S>(
250        &self,
251        collection: Collection<S, Row, Diff>,
252        group_key: Vec<usize>,
253        order_key: Vec<mz_expr::ColumnOrder>,
254        offset: usize,
255        limit: Option<mz_expr::MirScalarExpr>,
256        arity: usize,
257        buckets: Vec<u64>,
258    ) -> (Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>)
259    where
260        S: Scope<Timestamp = G::Timestamp>,
261    {
262        let pairer = Pairer::new(1);
263        let mut datum_vec = mz_repr::DatumVec::new();
264        let mut collection = collection.map({
265            move |row| {
266                let group_row = {
267                    let row_hash = row.hashed();
268                    let datums = datum_vec.borrow_with(&row);
269                    let iterator = group_key.iter().map(|i| datums[*i]);
270                    pairer.merge(std::iter::once(Datum::from(row_hash)), iterator)
271                };
272                (group_row, row)
273            }
274        });
275
276        let mut validating = true;
277        let mut err_collection: Option<Collection<S, _, _>> = None;
278
279        if let Some(mut limit) = limit.clone() {
280            // We may need a new `limit` that reflects the addition of `offset`.
281            // Ideally we compile it down to a literal if at all possible.
282            if offset > 0 {
283                let new_limit = (|| {
284                    let limit = limit.as_literal_int64()?;
285                    let offset = i64::try_from(offset).ok()?;
286                    limit.checked_add(offset)
287                })();
288
289                if let Some(new_limit) = new_limit {
290                    limit =
291                        MirScalarExpr::literal_ok(Datum::Int64(new_limit), SqlScalarType::Int64);
292                } else {
293                    limit = limit.call_binary(
294                        MirScalarExpr::literal_ok(
295                            Datum::UInt64(u64::cast_from(offset)),
296                            SqlScalarType::UInt64,
297                        )
298                        .call_unary(UnaryFunc::CastUint64ToInt64(CastUint64ToInt64)),
299                        BinaryFunc::AddInt64(func::AddInt64),
300                    );
301                }
302            }
303
304            // These bucket values define the shifts that happen to the 64 bit hash of the
305            // record, and should have the properties that 1. there are not too many of them,
306            // and 2. each has a modest difference to the next.
307            for bucket in buckets.into_iter() {
308                // here we do not apply `offset`, but instead restrict ourself with a limit
309                // that includes the offset. We cannot apply `offset` until we perform the
310                // final, complete reduction.
311                let (oks, errs) = self.build_topk_stage(
312                    collection,
313                    order_key.clone(),
314                    bucket,
315                    0,
316                    Some(limit.clone()),
317                    arity,
318                    validating,
319                );
320                collection = oks;
321                if validating {
322                    err_collection = errs;
323                    validating = false;
324                }
325            }
326        }
327
328        // We do a final step, both to make sure that we complete the reduction, and to correctly
329        // apply `offset` to the final group, as we have not yet been applying it to the partially
330        // formed groups.
331        let (oks, errs) = self.build_topk_stage(
332            collection, order_key, 1u64, offset, limit, arity, validating,
333        );
334        // Consolidate the output of `build_topk_stage` because it's not guaranteed to be.
335        let oks = oks.consolidate_named::<KeyBatcher<_, _, _>>("TopK final consolidate");
336        collection = oks;
337        if validating {
338            err_collection = errs;
339        }
340        (
341            collection.map(|(_key_hash, row)| row),
342            err_collection.expect("at least one stage validated its inputs"),
343        )
344    }
345
346    /// To provide a robust incremental orderby-limit experience, we want to avoid grouping *all*
347    /// records (or even large groups) and then applying the ordering and limit. Instead, a more
348    /// robust approach forms groups of bounded size and applies the offset and limit to each,
349    /// and then increases the sizes of the groups.
350    ///
351    /// Builds a "stage", which uses a finer grouping than is required to reduce the volume of
352    /// updates, and to reduce the amount of work on the critical path for updates. The cost is
353    /// a larger number of arrangements when this optimization does nothing beneficial.
354    ///
355    /// The function accepts a collection of the form `(hash_key, row)`, a modulus it applies to the
356    /// `hash_key`'s hash datum, an `offset` for returning results, and a `limit` to restrict the
357    /// output size. `arity` represents the number of columns in the input data, and
358    /// if `validating` is true, we check for negative multiplicities, which indicate
359    /// an error in the input data.
360    ///
361    /// The output of this function is _not consolidated_.
362    ///
363    /// The dataflow fragment has the following shape:
364    /// ```text
365    ///     | input
366    ///     |
367    ///   arrange
368    ///     |\
369    ///     | \
370    ///     |  reduce
371    ///     |  |
372    ///     concat
373    ///     |
374    ///     | output
375    /// ```
376    /// There are additional map/flat_map operators as well as error demuxing operators, but we're
377    /// omitting them here for the sake of simplicity.
378    fn build_topk_stage<S>(
379        &self,
380        collection: Collection<S, (Row, Row), Diff>,
381        order_key: Vec<mz_expr::ColumnOrder>,
382        modulus: u64,
383        offset: usize,
384        limit: Option<mz_expr::MirScalarExpr>,
385        arity: usize,
386        validating: bool,
387    ) -> (
388        Collection<S, (Row, Row), Diff>,
389        Option<Collection<S, DataflowError, Diff>>,
390    )
391    where
392        S: Scope<Timestamp = G::Timestamp>,
393    {
394        // Form appropriate input by updating the `hash` column (first datum in `hash_key`) by
395        // applying `modulus`.
396        let input = collection.map(move |(hash_key, row)| {
397            let mut hash_key_iter = hash_key.iter();
398            let hash = hash_key_iter.next().unwrap().unwrap_uint64() % modulus;
399            let hash_key = SharedRow::pack(std::iter::once(hash.into()).chain(hash_key_iter));
400            (hash_key, row)
401        });
402
403        // If validating: demux errors, otherwise we cannot produce errors.
404        let (input, oks, errs) = if validating {
405            // Build topk stage, produce errors for invalid multiplicities.
406            let (input, stage) = build_topk_negated_stage::<
407                S,
408                RowValBuilder<_, _, _>,
409                RowValSpine<Result<Row, Row>, _, _>,
410            >(&input, order_key, offset, limit, arity);
411            let stage = stage.as_collection(|k, v| (k.to_row(), v.clone()));
412
413            // Demux oks and errors.
414            let error_logger = self.error_logger();
415            type CB<C> = CapacityContainerBuilder<C>;
416            let (oks, errs) = stage.map_fallible::<CB<_>, CB<_>, _, _, _>(
417                "Demuxing Errors",
418                move |(hk, result)| match result {
419                    Err(v) => {
420                        let mut hk_iter = hk.iter();
421                        let h = hk_iter.next().unwrap().unwrap_uint64();
422                        let k = SharedRow::pack(hk_iter);
423                        let message = "Negative multiplicities in TopK";
424                        error_logger.log(message, &format!("k={k:?}, h={h}, v={v:?}"));
425                        Err(EvalError::Internal(message.into()).into())
426                    }
427                    Ok(t) => Ok((hk, t)),
428                },
429            );
430            (input, oks, Some(errs))
431        } else {
432            // Build non-validating topk stage.
433            let (input, stage) =
434                build_topk_negated_stage::<S, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
435                    &input, order_key, offset, limit, arity,
436                );
437            // Turn arrangement into collection.
438            let stage = stage.as_collection(|k, v| (k.to_row(), v.to_row()));
439
440            (input, stage, None)
441        };
442        let input = input.as_collection(|k, v| (k.to_row(), v.to_row()));
443        (oks.concat(&input), errs)
444    }
445
446    fn render_top1_monotonic<S>(
447        &self,
448        collection: Collection<S, Row, Diff>,
449        group_key: Vec<usize>,
450        order_key: Vec<mz_expr::ColumnOrder>,
451        must_consolidate: bool,
452    ) -> (Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>)
453    where
454        S: Scope<Timestamp = G::Timestamp>,
455    {
456        // We can place our rows directly into the diff field, and only keep the relevant one
457        // corresponding to evaluating our aggregate, instead of having to do a hierarchical
458        // reduction. We start by mapping the group key along with the row and consolidating
459        // if required to do so.
460        let collection = collection
461            .map({
462                let mut datum_vec = mz_repr::DatumVec::new();
463                move |row| {
464                    // Scoped to allow borrow of `row` to drop.
465                    let group_key = {
466                        let datums = datum_vec.borrow_with(&row);
467                        SharedRow::pack(group_key.iter().map(|i| datums[*i]))
468                    };
469                    (group_key, row)
470                }
471            })
472            .consolidate_named_if::<KeyBatcher<_, _, _>>(
473                must_consolidate,
474                "Consolidated MonotonicTop1 input",
475            );
476
477        // It should be now possible to ensure that we have a monotonic collection and process it.
478        let error_logger = self.error_logger();
479        let (partial, errs) = collection.ensure_monotonic(move |data, diff| {
480            error_logger.log(
481                "Non-monotonic input to MonotonicTop1",
482                &format!("data={data:?}, diff={diff}"),
483            );
484            let m = "tried to build monotonic top-1 on non-monotonic input".into();
485            (EvalError::Internal(m).into(), Diff::ONE)
486        });
487        let partial: KeyCollection<_, _, _> = partial
488            .explode_one(move |(group_key, row)| {
489                (
490                    group_key,
491                    monoids::Top1Monoid {
492                        row,
493                        order_key: order_key.clone(),
494                    },
495                )
496            })
497            .into();
498        let result = partial
499            .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
500                "Arranged MonotonicTop1 partial [val: empty]",
501            )
502            .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
503                "MonotonicTop1",
504                move |_key, input, output| {
505                    let accum: &monoids::Top1Monoid = &input[0].1;
506                    output.push((accum.row.clone(), Diff::ONE));
507                },
508            );
509        // TODO(database-issues#2288): Here we discard the arranged output.
510        (result.as_collection(|_k, v| v.to_row()), errs)
511    }
512}
513
514/// Build a stage of a topk reduction. Maintains the _retractions_ of the output instead of emitted
515/// rows. This has the benefit that we have to maintain state proportionally to size of the output
516/// instead of the size of the input.
517///
518/// Returns two arrangements:
519/// * The arranged input data without modifications, and
520/// * the maintained negated output data.
521fn build_topk_negated_stage<G, Bu, Tr>(
522    input: &Collection<G, (Row, Row), Diff>,
523    order_key: Vec<mz_expr::ColumnOrder>,
524    offset: usize,
525    limit: Option<mz_expr::MirScalarExpr>,
526    arity: usize,
527) -> (
528    Arranged<G, TraceAgent<RowRowSpine<G::Timestamp, Diff>>>,
529    Arranged<G, TraceAgent<Tr>>,
530)
531where
532    G: Scope,
533    G::Timestamp: MzTimestamp,
534    Bu: Builder<
535            Time = G::Timestamp,
536            Input: Container + MergerChunk + PushInto<((Row, Tr::ValOwn), G::Timestamp, Diff)>,
537            Output = Tr::Batch,
538        >,
539    Tr: for<'a> Trace<
540            Key<'a> = DatumSeq<'a>,
541            KeyOwn = Row,
542            ValOwn: Data + MaybeValidatingRow<Row, Row>,
543            Time = G::Timestamp,
544            Diff = Diff,
545        > + 'static,
546    Arranged<G, TraceAgent<Tr>>: ArrangementSize,
547{
548    let mut datum_vec = mz_repr::DatumVec::new();
549
550    // We only want to arrange parts of the input that are not part of the actual output
551    // such that `input.concat(&negated_output)` yields the correct TopK
552    // NOTE(vmarcos): The arranged input operator name below is used in the tuning advice
553    // built-in view mz_introspection.mz_expected_group_size_advice.
554    let arranged = input.mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
555        "Arranged TopK input",
556    );
557
558    // Eagerly evaluate literal limits.
559    let limit = limit.map(|l| match l.as_literal() {
560        Some(Ok(Datum::Null)) => Ok(Diff::MAX),
561        Some(Ok(d)) => Ok(Diff::from(d.unwrap_int64())),
562        _ => Err(l),
563    });
564
565    let reduced = arranged.mz_reduce_abelian::<_, Bu, Tr>("Reduced TopK input", {
566        move |mut hash_key, source, target: &mut Vec<(Tr::ValOwn, Diff)>| {
567            // Unpack the limit, either into an integer literal or an expression to evaluate.
568            let limit = match &limit {
569                Some(Ok(lit)) => Some(*lit),
570                Some(Err(expr)) => {
571                    // Unpack `key` after skipping the hash and determine the limit.
572                    // If the limit errors, use a zero limit; errors are surfaced elsewhere.
573                    let temp_storage = mz_repr::RowArena::new();
574                    let _hash = hash_key.next();
575                    let mut key_datums = datum_vec.borrow();
576                    key_datums.extend(hash_key);
577                    let datum_limit = expr
578                        .eval(&key_datums, &temp_storage)
579                        .unwrap_or(Datum::Int64(0));
580                    Some(match datum_limit {
581                        Datum::Null => Diff::MAX,
582                        d => Diff::from(d.unwrap_int64()),
583                    })
584                }
585                None => None,
586            };
587
588            if let Some(err) = Tr::ValOwn::into_error() {
589                for (datums, diff) in source.iter() {
590                    if diff.is_positive() {
591                        continue;
592                    }
593                    target.push((err((*datums).to_row()), Diff::ONE));
594                    return;
595                }
596            }
597
598            // Determine if we must actually shrink the result set.
599            let must_shrink = offset > 0
600                || limit
601                    .map(|l| source.iter().map(|(_, d)| *d).sum::<Diff>() > l)
602                    .unwrap_or(false);
603            if !must_shrink {
604                return;
605            }
606
607            // First go ahead and emit all records. Note that we ensure target
608            // has the capacity to hold at least these records, and avoid any
609            // dependencies on the user-provided (potentially unbounded) limit.
610            target.reserve(source.len());
611            for (datums, diff) in source.iter() {
612                target.push((Tr::ValOwn::ok((*datums).to_row()), -diff));
613            }
614            // local copies that may count down to zero.
615            let mut offset = offset;
616            let mut limit = limit;
617
618            // The order in which we should produce rows.
619            let mut indexes = (0..source.len()).collect::<Vec<_>>();
620            // We decode the datums once, into a common buffer for efficiency.
621            // Each row should contain `arity` columns; we should check that.
622            let mut buffer = datum_vec.borrow();
623            for (index, (datums, _)) in source.iter().enumerate() {
624                buffer.extend(*datums);
625                assert_eq!(buffer.len(), arity * (index + 1));
626            }
627            let width = buffer.len() / source.len();
628
629            //todo: use arrangements or otherwise make the sort more performant?
630            indexes.sort_by(|left, right| {
631                let left = &buffer[left * width..][..width];
632                let right = &buffer[right * width..][..width];
633                // Note: source was originally ordered by the u8 array representation
634                // of rows, but left.cmp(right) uses Datum::cmp.
635                mz_expr::compare_columns(&order_key, left, right, || left.cmp(right))
636            });
637
638            // We now need to lay out the data in order of `buffer`, but respecting
639            // the `offset` and `limit` constraints.
640            for index in indexes.into_iter() {
641                let (datums, mut diff) = source[index];
642                if !diff.is_positive() {
643                    continue;
644                }
645                // If we are still skipping early records ...
646                if offset > 0 {
647                    let to_skip =
648                        std::cmp::min(offset, usize::try_from(diff.into_inner()).unwrap());
649                    offset -= to_skip;
650                    diff -= Diff::try_from(to_skip).unwrap();
651                }
652                // We should produce at most `limit` records.
653                if let Some(limit) = &mut limit {
654                    diff = std::cmp::min(diff, Diff::from(*limit));
655                    *limit -= diff;
656                }
657                // Output the indicated number of rows.
658                if diff.is_positive() {
659                    // Emit retractions for the elements actually part of
660                    // the set of TopK elements.
661                    target.push((Tr::ValOwn::ok(datums.to_row()), diff));
662                }
663            }
664        }
665    });
666    (arranged, reduced)
667}
668
669fn render_intra_ts_thinning<S>(
670    collection: Collection<S, (Row, Row), Diff>,
671    order_key: Vec<mz_expr::ColumnOrder>,
672    limit: mz_expr::MirScalarExpr,
673) -> Collection<S, (Row, Row), Diff>
674where
675    S: Scope,
676    S::Timestamp: Lattice,
677{
678    let mut datum_vec = mz_repr::DatumVec::new();
679
680    let mut aggregates = BTreeMap::new();
681    let shared = Rc::new(RefCell::new(monoids::Top1MonoidShared {
682        order_key,
683        left: DatumVec::new(),
684        right: DatumVec::new(),
685    }));
686    collection
687        .inner
688        .unary_notify(
689            Pipeline,
690            "TopKIntraTimeThinning",
691            [],
692            move |input, output, notificator| {
693                while let Some((time, data)) = input.next() {
694                    let agg_time = aggregates
695                        .entry(time.time().clone())
696                        .or_insert_with(BTreeMap::new);
697                    for ((grp_row, row), record_time, diff) in data.drain(..) {
698                        let monoid = monoids::Top1MonoidLocal {
699                            row,
700                            shared: Rc::clone(&shared),
701                        };
702
703                        // Evalute the limit, first as a constant and then against the key if needed.
704                        let limit = if let Some(l) = limit.as_literal_int64() {
705                            l
706                        } else {
707                            let temp_storage = mz_repr::RowArena::new();
708                            let key_datums = datum_vec.borrow_with(&grp_row);
709                            // Unpack `key` and determine the limit.
710                            // If the limit errors, use a zero limit; errors are surfaced elsewhere.
711                            let datum_limit = limit
712                                .eval(&key_datums, &temp_storage)
713                                .unwrap_or(mz_repr::Datum::Int64(0));
714                            if datum_limit == Datum::Null {
715                                i64::MAX
716                            } else {
717                                datum_limit.unwrap_int64()
718                            }
719                        };
720
721                        let topk = agg_time
722                            .entry((grp_row, record_time))
723                            .or_insert_with(move || topk_agg::TopKBatch::new(limit));
724                        topk.update(monoid, diff.into_inner());
725                    }
726                    notificator.notify_at(time.retain());
727                }
728
729                notificator.for_each(|time, _, _| {
730                    if let Some(aggs) = aggregates.remove(time.time()) {
731                        let mut session = output.session(&time);
732                        for ((grp_row, record_time), topk) in aggs {
733                            session.give_iterator(topk.into_iter().map(|(monoid, diff)| {
734                                (
735                                    (grp_row.clone(), monoid.into_row()),
736                                    record_time.clone(),
737                                    diff.into(),
738                                )
739                            }))
740                        }
741                    }
742                });
743            },
744        )
745        .as_collection()
746}
747
748/// Types for in-place intra-ts aggregation of monotonic streams.
749pub mod topk_agg {
750    use differential_dataflow::consolidation;
751    use smallvec::SmallVec;
752
753    // TODO: This struct looks a lot like ChangeBatch and indeed its code is a modified version of
754    // that. It would be nice to find a way to reuse some or all of the code from there.
755    //
756    // Additionally, because we're calling into DD's consolidate method we are forced to work with
757    // the `Ord` trait which for the usage we do above means that we need to clone the `order_key`
758    // for each record. It would be nice to also remove the need for cloning that piece of data
759    pub struct TopKBatch<T> {
760        updates: SmallVec<[(T, i64); 16]>,
761        clean: usize,
762        limit: i64,
763    }
764
765    impl<T: Ord> TopKBatch<T> {
766        pub fn new(limit: i64) -> Self {
767            Self {
768                updates: SmallVec::new(),
769                clean: 0,
770                limit,
771            }
772        }
773
774        /// Adds a new update, for `item` with `value`.
775        ///
776        /// This could be optimized to perform compaction when the number of "dirty" elements exceeds
777        /// half the length of the list, which would keep the total footprint within reasonable bounds
778        /// even under an arbitrary number of updates. This has a cost, and it isn't clear whether it
779        /// is worth paying without some experimentation.
780        #[inline]
781        pub fn update(&mut self, item: T, value: i64) {
782            self.updates.push((item, value));
783            self.maintain_bounds();
784        }
785
786        /// Compact the internal representation.
787        ///
788        /// This method sort `self.updates` and consolidates elements with equal item, discarding
789        /// any whose accumulation is zero. It is optimized to only do this if the number of dirty
790        /// elements is non-zero.
791        #[inline]
792        pub fn compact(&mut self) {
793            if self.clean < self.updates.len() && self.updates.len() > 1 {
794                let len = consolidation::consolidate_slice(&mut self.updates);
795                self.updates.truncate(len);
796
797                // We can now retain only the first K records and throw away everything else
798                let mut limit = self.limit;
799                self.updates.retain(|x| {
800                    if limit > 0 {
801                        limit -= x.1;
802                        true
803                    } else {
804                        false
805                    }
806                });
807                // By the end of the loop above `limit` will either be:
808                // (a) Positive, in which case all updates were retained;
809                // (b) Zero, in which case we discarded all updates after limit became zero;
810                // (c) Negative, in which case the last record we retained had more copies
811                // than necessary. In this latter case, we need to do one final adjustment
812                // of the diff field of the last record so that the total sum of the diffs
813                // in the batch is K.
814                if limit < 0 {
815                    if let Some(item) = self.updates.last_mut() {
816                        // We are subtracting the limit *negated*, therefore we are subtracting a value
817                        // that is *greater* than or equal to zero, which represents the excess.
818                        item.1 -= -limit;
819                    }
820                }
821            }
822            self.clean = self.updates.len();
823        }
824
825        /// Maintain the bounds of pending (non-compacted) updates versus clean (compacted) data.
826        /// This function tries to minimize work by only compacting if enough work has accumulated.
827        fn maintain_bounds(&mut self) {
828            // if we have more than 32 elements and at least half of them are not clean, compact
829            if self.updates.len() > 32 && self.updates.len() >> 1 >= self.clean {
830                self.compact()
831            }
832        }
833    }
834
835    impl<T: Ord> IntoIterator for TopKBatch<T> {
836        type Item = (T, i64);
837        type IntoIter = smallvec::IntoIter<[(T, i64); 16]>;
838
839        fn into_iter(mut self) -> Self::IntoIter {
840            self.compact();
841            self.updates.into_iter()
842        }
843    }
844}
845
846/// Monoids for in-place compaction of monotonic streams.
847pub mod monoids {
848    use std::cell::RefCell;
849    use std::cmp::Ordering;
850    use std::hash::{Hash, Hasher};
851    use std::rc::Rc;
852
853    use differential_dataflow::containers::{Columnation, Region};
854    use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
855    use mz_expr::ColumnOrder;
856    use mz_repr::{DatumVec, Diff, Row};
857    use serde::{Deserialize, Serialize};
858
859    /// A monoid containing a row and an ordering.
860    #[derive(Eq, PartialEq, Debug, Serialize, Deserialize, Hash, Default)]
861    pub struct Top1Monoid {
862        pub row: Row,
863        pub order_key: Vec<ColumnOrder>,
864    }
865
866    impl Clone for Top1Monoid {
867        #[inline]
868        fn clone(&self) -> Self {
869            Self {
870                row: self.row.clone(),
871                order_key: self.order_key.clone(),
872            }
873        }
874
875        #[inline]
876        fn clone_from(&mut self, source: &Self) {
877            self.row.clone_from(&source.row);
878            self.order_key.clone_from(&source.order_key);
879        }
880    }
881
882    impl Multiply<Diff> for Top1Monoid {
883        type Output = Self;
884
885        fn multiply(self, factor: &Diff) -> Self {
886            // Multiplication in Top1Monoid is idempotent, and its
887            // users must ascertain its monotonicity beforehand
888            // (typically with ensure_monotonic) since it has no zero
889            // value for us to use here.
890            assert!(factor.is_positive());
891            self
892        }
893    }
894
895    impl Ord for Top1Monoid {
896        fn cmp(&self, other: &Self) -> Ordering {
897            debug_assert_eq!(self.order_key, other.order_key);
898
899            // It might be nice to cache this row decoding like the non-monotonic codepath, but we'd
900            // have to store the decoded Datums in the same struct as the Row, which gets tricky.
901            let left: Vec<_> = self.row.unpack();
902            let right: Vec<_> = other.row.unpack();
903            mz_expr::compare_columns(&self.order_key, &left, &right, || left.cmp(&right))
904        }
905    }
906    impl PartialOrd for Top1Monoid {
907        fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
908            Some(self.cmp(other))
909        }
910    }
911
912    impl Semigroup for Top1Monoid {
913        fn plus_equals(&mut self, rhs: &Self) {
914            let cmp = (*self).cmp(rhs);
915            // NB: Reminder that TopK returns the _minimum_ K items.
916            if cmp == Ordering::Greater {
917                self.clone_from(rhs);
918            }
919        }
920    }
921
922    impl IsZero for Top1Monoid {
923        fn is_zero(&self) -> bool {
924            false
925        }
926    }
927
928    impl Columnation for Top1Monoid {
929        type InnerRegion = Top1MonoidRegion;
930    }
931
932    #[derive(Default)]
933    pub struct Top1MonoidRegion {
934        row_region: <Row as Columnation>::InnerRegion,
935        order_key_region: <Vec<ColumnOrder> as Columnation>::InnerRegion,
936    }
937
938    impl Region for Top1MonoidRegion {
939        type Item = Top1Monoid;
940
941        unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
942            let row = unsafe { self.row_region.copy(&item.row) };
943            let order_key = unsafe { self.order_key_region.copy(&item.order_key) };
944            Self::Item { row, order_key }
945        }
946
947        fn clear(&mut self) {
948            self.row_region.clear();
949            self.order_key_region.clear();
950        }
951
952        fn reserve_items<'a, I>(&mut self, items1: I)
953        where
954            Self: 'a,
955            I: Iterator<Item = &'a Self::Item> + Clone,
956        {
957            let items2 = items1.clone();
958            self.row_region
959                .reserve_items(items1.into_iter().map(|s| &s.row));
960            self.order_key_region
961                .reserve_items(items2.into_iter().map(|s| &s.order_key));
962        }
963
964        fn reserve_regions<'a, I>(&mut self, regions1: I)
965        where
966            Self: 'a,
967            I: Iterator<Item = &'a Self> + Clone,
968        {
969            let regions2 = regions1.clone();
970            self.row_region
971                .reserve_regions(regions1.into_iter().map(|s| &s.row_region));
972            self.order_key_region
973                .reserve_regions(regions2.into_iter().map(|s| &s.order_key_region));
974        }
975
976        fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
977            self.row_region.heap_size(&mut callback);
978            self.order_key_region.heap_size(callback);
979        }
980    }
981
982    /// A shared portion of a thread-local top-1 monoid implementation.
983    #[derive(Debug)]
984    pub struct Top1MonoidShared {
985        pub order_key: Vec<ColumnOrder>,
986        pub left: DatumVec,
987        pub right: DatumVec,
988    }
989
990    /// A monoid containing a row and a shared pointer to a shared structure.
991    /// Only suitable for thread-local aggregations.
992    #[derive(Debug, Clone)]
993    pub struct Top1MonoidLocal {
994        pub row: Row,
995        pub shared: Rc<RefCell<Top1MonoidShared>>,
996    }
997
998    impl Top1MonoidLocal {
999        pub fn into_row(self) -> Row {
1000            self.row
1001        }
1002    }
1003
1004    impl PartialEq for Top1MonoidLocal {
1005        fn eq(&self, other: &Self) -> bool {
1006            self.row.eq(&other.row)
1007        }
1008    }
1009
1010    impl Eq for Top1MonoidLocal {}
1011
1012    impl Hash for Top1MonoidLocal {
1013        fn hash<H: Hasher>(&self, state: &mut H) {
1014            self.row.hash(state);
1015        }
1016    }
1017
1018    impl Ord for Top1MonoidLocal {
1019        fn cmp(&self, other: &Self) -> Ordering {
1020            debug_assert!(Rc::ptr_eq(&self.shared, &other.shared));
1021            let Top1MonoidShared {
1022                left,
1023                right,
1024                order_key,
1025            } = &mut *self.shared.borrow_mut();
1026
1027            let left = left.borrow_with(&self.row);
1028            let right = right.borrow_with(&other.row);
1029            mz_expr::compare_columns(order_key, &left, &right, || left.cmp(&right))
1030        }
1031    }
1032
1033    impl PartialOrd for Top1MonoidLocal {
1034        fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1035            Some(self.cmp(other))
1036        }
1037    }
1038
1039    impl Semigroup for Top1MonoidLocal {
1040        fn plus_equals(&mut self, rhs: &Self) {
1041            let cmp = (*self).cmp(rhs);
1042            // NB: Reminder that TopK returns the _minimum_ K items.
1043            if cmp == Ordering::Greater {
1044                self.clone_from(rhs);
1045            }
1046        }
1047    }
1048
1049    impl IsZero for Top1MonoidLocal {
1050        fn is_zero(&self) -> bool {
1051            false
1052        }
1053    }
1054}