mz_compute/render/
top_k.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! TopK execution logic.
11//!
12//! Consult [TopKPlan] documentation for details.
13
14use columnar::Columnar;
15use differential_dataflow::IntoOwned;
16use differential_dataflow::containers::Columnation;
17use differential_dataflow::hashable::Hashable;
18use differential_dataflow::lattice::Lattice;
19use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
20use differential_dataflow::trace::{Batch, Builder, Trace, TraceReader};
21use differential_dataflow::{AsCollection, Collection};
22use mz_compute_types::plan::top_k::{
23    BasicTopKPlan, MonotonicTop1Plan, MonotonicTopKPlan, TopKPlan,
24};
25use mz_expr::func::CastUint64ToInt64;
26use mz_expr::{BinaryFunc, EvalError, MirScalarExpr, UnaryFunc};
27use mz_ore::cast::CastFrom;
28use mz_ore::soft_assert_or_log;
29use mz_repr::{Datum, DatumVec, Diff, Row, ScalarType, SharedRow};
30use mz_storage_types::errors::DataflowError;
31use mz_timely_util::operator::CollectionExt;
32use std::cell::RefCell;
33use std::collections::BTreeMap;
34use std::rc::Rc;
35use timely::Container;
36use timely::container::{CapacityContainerBuilder, PushInto};
37use timely::dataflow::Scope;
38use timely::dataflow::channels::pact::Pipeline;
39use timely::dataflow::operators::Operator;
40
41use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
42use crate::extensions::reduce::MzReduce;
43use crate::render::Pairer;
44use crate::render::context::{CollectionBundle, Context};
45use crate::render::errors::MaybeValidatingRow;
46use crate::row_spine::{
47    DatumSeq, RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowValBuilder, RowValSpine,
48};
49use crate::typedefs::{KeyBatcher, RowRowSpine, RowSpine};
50
51// The implementation requires integer timestamps to be able to delay feedback for monotonic inputs.
52impl<G> Context<G>
53where
54    G: Scope,
55    G::Timestamp: crate::render::RenderTimestamp,
56    <G::Timestamp as Columnar>::Container: Clone + Send,
57{
58    pub(crate) fn render_topk(
59        &self,
60        input: CollectionBundle<G>,
61        top_k_plan: TopKPlan,
62    ) -> CollectionBundle<G> {
63        let (ok_input, err_input) = input.as_specific_collection(None, &self.config_set);
64
65        // We create a new region to compartmentalize the topk logic.
66        let (ok_result, err_collection) = ok_input.scope().region_named("TopK", |inner| {
67            let ok_input = ok_input.enter_region(inner);
68            let mut err_collection = err_input.enter_region(inner);
69
70            // Determine if there should be errors due to limit evaluation; update `err_collection`.
71            // TODO(vmarcos): We evaluate the limit expression below for each input update. There
72            // is an opportunity to do so for every group key instead if the error handling is
73            // integrated with: 1. The intra-timestamp thinning step in monotonic top-k, e.g., by
74            // adding an error output there; 2. The validating reduction on basic top-k
75            // (database-issues#7108).
76            let limit_err = match &top_k_plan {
77                TopKPlan::MonotonicTop1(MonotonicTop1Plan { .. }) => None,
78                TopKPlan::MonotonicTopK(MonotonicTopKPlan { limit, .. }) => Some(limit),
79                TopKPlan::Basic(BasicTopKPlan { limit, .. }) => Some(limit),
80            };
81            if let Some(limit) = limit_err {
82                if let Some(expr) = limit {
83                    // Produce errors from limit selectors that error or are
84                    // negative, and nothing from limit selectors that do
85                    // not. Note that even if expr.could_error() is false,
86                    // the expression might still return a negative limit and
87                    // thus needs to be checked.
88                    let expr = expr.clone();
89                    let mut datum_vec = mz_repr::DatumVec::new();
90                    let errors = ok_input.flat_map(move |row| {
91                        let temp_storage = mz_repr::RowArena::new();
92                        let datums = datum_vec.borrow_with(&row);
93                        match expr.eval(&datums[..], &temp_storage) {
94                            Ok(l) if l != Datum::Null && l.unwrap_int64() < 0 => {
95                                Some(EvalError::NegLimit.into())
96                            }
97                            Ok(_) => None,
98                            Err(e) => Some(e.into()),
99                        }
100                    });
101                    err_collection = err_collection.concat(&errors);
102                }
103            }
104
105            let ok_result = match top_k_plan {
106                TopKPlan::MonotonicTop1(MonotonicTop1Plan {
107                    group_key,
108                    order_key,
109                    must_consolidate,
110                }) => {
111                    let (oks, errs) = self.render_top1_monotonic(
112                        ok_input,
113                        group_key,
114                        order_key,
115                        must_consolidate,
116                    );
117                    err_collection = err_collection.concat(&errs);
118                    oks
119                }
120                TopKPlan::MonotonicTopK(MonotonicTopKPlan {
121                    order_key,
122                    group_key,
123                    arity,
124                    mut limit,
125                    must_consolidate,
126                }) => {
127                    // Must permute `limit` to reference `group_key` elements as if in order.
128                    if let Some(expr) = limit.as_mut() {
129                        let mut map = BTreeMap::new();
130                        for (index, column) in group_key.iter().enumerate() {
131                            map.insert(*column, index);
132                        }
133                        expr.permute_map(&map);
134                    }
135
136                    // Map the group key along with the row and consolidate if required to do so.
137                    let mut datum_vec = mz_repr::DatumVec::new();
138                    let collection = ok_input
139                        .map(move |row| {
140                            let group_row = {
141                                let datums = datum_vec.borrow_with(&row);
142                                SharedRow::pack(group_key.iter().map(|i| datums[*i]))
143                            };
144                            (group_row, row)
145                        })
146                        .consolidate_named_if::<KeyBatcher<_, _, _>>(
147                            must_consolidate,
148                            "Consolidated MonotonicTopK input",
149                        );
150
151                    // It should be now possible to ensure that we have a monotonic collection.
152                    let error_logger = self.error_logger();
153                    let (collection, errs) = collection.ensure_monotonic(move |data, diff| {
154                        error_logger.log(
155                            "Non-monotonic input to MonotonicTopK",
156                            &format!("data={data:?}, diff={diff}"),
157                        );
158                        let m = "tried to build monotonic top-k on non-monotonic input".into();
159                        (DataflowError::from(EvalError::Internal(m)), Diff::ONE)
160                    });
161                    err_collection = err_collection.concat(&errs);
162
163                    // For monotonic inputs, we are able to thin the input relation in two stages:
164                    // 1. First, we can do an intra-timestamp thinning which has the advantage of
165                    //    being computed in a streaming fashion, even for the initial snapshot.
166                    // 2. Then, we can do inter-timestamp thinning by feeding back negations for
167                    //    any records that have been invalidated.
168                    let collection = if let Some(limit) = limit.clone() {
169                        render_intra_ts_thinning(collection, order_key.clone(), limit)
170                    } else {
171                        collection
172                    };
173
174                    let pairer = Pairer::new(1);
175                    let collection = collection.map(move |(group_row, row)| {
176                        let hash = row.hashed();
177                        let hash_key = pairer.merge(std::iter::once(Datum::from(hash)), &group_row);
178                        (hash_key, row)
179                    });
180
181                    // For monotonic inputs, we are able to retract inputs that can no longer be produced
182                    // as outputs. Any inputs beyond `offset + limit` will never again be produced as
183                    // outputs, and can be removed. The simplest form of this is when `offset == 0` and
184                    // these removable records are those in the input not produced in the output.
185                    // TODO: consider broadening this optimization to `offset > 0` by first filtering
186                    // down to `offset = 0` and `limit = offset + limit`, followed by a finishing act
187                    // of `offset` and `limit`, discarding only the records not produced in the intermediate
188                    // stage.
189                    use differential_dataflow::operators::iterate::Variable;
190                    let delay = std::time::Duration::from_secs(10);
191                    let retractions = Variable::new(
192                        &mut ok_input.scope(),
193                        <G::Timestamp as crate::render::RenderTimestamp>::system_delay(
194                            delay.try_into().expect("must fit"),
195                        ),
196                    );
197                    let thinned = collection.concat(&retractions.negate());
198
199                    // As an additional optimization, we can skip creating the full topk hierachy
200                    // here since we now have an upper bound on the number records due to the
201                    // intra-ts thinning. The maximum number of records per timestamp is
202                    // (num_workers * limit), which we expect to be a small number and so we render
203                    // a single topk stage.
204                    let (result, errs) =
205                        self.build_topk_stage(thinned, order_key, 1u64, 0, limit, arity, false);
206                    // Consolidate the output of `build_topk_stage` because it's not guaranteed to be.
207                    let result = result.consolidate_named::<KeyBatcher<_, _, _>>(
208                        "Monotonic TopK final consolidate",
209                    );
210                    retractions.set(&collection.concat(&result.negate()));
211                    soft_assert_or_log!(
212                        errs.is_none(),
213                        "requested no validation, but received error collection"
214                    );
215
216                    result.map(|(_key_hash, row)| row)
217                }
218                TopKPlan::Basic(BasicTopKPlan {
219                    group_key,
220                    order_key,
221                    offset,
222                    mut limit,
223                    arity,
224                    buckets,
225                }) => {
226                    // Must permute `limit` to reference `group_key` elements as if in order.
227                    if let Some(expr) = limit.as_mut() {
228                        let mut map = BTreeMap::new();
229                        for (index, column) in group_key.iter().enumerate() {
230                            map.insert(*column, index);
231                        }
232                        expr.permute_map(&map);
233                    }
234
235                    let (oks, errs) = self.build_topk(
236                        ok_input, group_key, order_key, offset, limit, arity, buckets,
237                    );
238                    err_collection = err_collection.concat(&errs);
239                    oks
240                }
241            };
242
243            // Extract the results from the region.
244            (ok_result.leave_region(), err_collection.leave_region())
245        });
246
247        CollectionBundle::from_collections(ok_result, err_collection)
248    }
249
250    /// Constructs a TopK dataflow subgraph.
251    fn build_topk<S>(
252        &self,
253        collection: Collection<S, Row, Diff>,
254        group_key: Vec<usize>,
255        order_key: Vec<mz_expr::ColumnOrder>,
256        offset: usize,
257        limit: Option<mz_expr::MirScalarExpr>,
258        arity: usize,
259        buckets: Vec<u64>,
260    ) -> (Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>)
261    where
262        S: Scope<Timestamp = G::Timestamp>,
263    {
264        let pairer = Pairer::new(1);
265        let mut datum_vec = mz_repr::DatumVec::new();
266        let mut collection = collection.map({
267            move |row| {
268                let group_row = {
269                    let row_hash = row.hashed();
270                    let datums = datum_vec.borrow_with(&row);
271                    let iterator = group_key.iter().map(|i| datums[*i]);
272                    pairer.merge(std::iter::once(Datum::from(row_hash)), iterator)
273                };
274                (group_row, row)
275            }
276        });
277
278        let mut validating = true;
279        let mut err_collection: Option<Collection<S, _, _>> = None;
280
281        if let Some(mut limit) = limit.clone() {
282            // We may need a new `limit` that reflects the addition of `offset`.
283            // Ideally we compile it down to a literal if at all possible.
284            if offset > 0 {
285                let new_limit = (|| {
286                    let limit = limit.as_literal_int64()?;
287                    let offset = i64::try_from(offset).ok()?;
288                    limit.checked_add(offset)
289                })();
290
291                if let Some(new_limit) = new_limit {
292                    limit = MirScalarExpr::literal_ok(Datum::Int64(new_limit), ScalarType::Int64);
293                } else {
294                    limit = limit.call_binary(
295                        MirScalarExpr::literal_ok(
296                            Datum::UInt64(u64::cast_from(offset)),
297                            ScalarType::UInt64,
298                        )
299                        .call_unary(UnaryFunc::CastUint64ToInt64(CastUint64ToInt64)),
300                        BinaryFunc::AddInt64,
301                    );
302                }
303            }
304
305            // These bucket values define the shifts that happen to the 64 bit hash of the
306            // record, and should have the properties that 1. there are not too many of them,
307            // and 2. each has a modest difference to the next.
308            for bucket in buckets.into_iter() {
309                // here we do not apply `offset`, but instead restrict ourself with a limit
310                // that includes the offset. We cannot apply `offset` until we perform the
311                // final, complete reduction.
312                let (oks, errs) = self.build_topk_stage(
313                    collection,
314                    order_key.clone(),
315                    bucket,
316                    0,
317                    Some(limit.clone()),
318                    arity,
319                    validating,
320                );
321                collection = oks;
322                if validating {
323                    err_collection = errs;
324                    validating = false;
325                }
326            }
327        }
328
329        // We do a final step, both to make sure that we complete the reduction, and to correctly
330        // apply `offset` to the final group, as we have not yet been applying it to the partially
331        // formed groups.
332        let (oks, errs) = self.build_topk_stage(
333            collection, order_key, 1u64, offset, limit, arity, validating,
334        );
335        // Consolidate the output of `build_topk_stage` because it's not guaranteed to be.
336        let oks = oks.consolidate_named::<KeyBatcher<_, _, _>>("TopK final consolidate");
337        collection = oks;
338        if validating {
339            err_collection = errs;
340        }
341        (
342            collection.map(|(_key_hash, row)| row),
343            err_collection.expect("at least one stage validated its inputs"),
344        )
345    }
346
347    /// To provide a robust incremental orderby-limit experience, we want to avoid grouping *all*
348    /// records (or even large groups) and then applying the ordering and limit. Instead, a more
349    /// robust approach forms groups of bounded size and applies the offset and limit to each,
350    /// and then increases the sizes of the groups.
351    ///
352    /// Builds a "stage", which uses a finer grouping than is required to reduce the volume of
353    /// updates, and to reduce the amount of work on the critical path for updates. The cost is
354    /// a larger number of arrangements when this optimization does nothing beneficial.
355    ///
356    /// The function accepts a collection of the form `(hash_key, row)`, a modulus it applies to the
357    /// `hash_key`'s hash datum, an `offset` for returning results, and a `limit` to restrict the
358    /// output size. `arity` represents the number of columns in the input data, and
359    /// if `validating` is true, we check for negative multiplicities, which indicate
360    /// an error in the input data.
361    ///
362    /// The output of this function is _not consolidated_.
363    ///
364    /// The dataflow fragment has the following shape:
365    /// ```text
366    ///     | input
367    ///     |
368    ///   arrange
369    ///     |\
370    ///     | \
371    ///     |  reduce
372    ///     |  |
373    ///     concat
374    ///     |
375    ///     | output
376    /// ```
377    /// There are additional map/flat_map operators as well as error demuxing operators, but we're
378    /// omitting them here for the sake of simplicity.
379    fn build_topk_stage<S>(
380        &self,
381        collection: Collection<S, (Row, Row), Diff>,
382        order_key: Vec<mz_expr::ColumnOrder>,
383        modulus: u64,
384        offset: usize,
385        limit: Option<mz_expr::MirScalarExpr>,
386        arity: usize,
387        validating: bool,
388    ) -> (
389        Collection<S, (Row, Row), Diff>,
390        Option<Collection<S, DataflowError, Diff>>,
391    )
392    where
393        S: Scope<Timestamp = G::Timestamp>,
394    {
395        // Form appropriate input by updating the `hash` column (first datum in `hash_key`) by
396        // applying `modulus`.
397        let input = collection.map(move |(hash_key, row)| {
398            let mut hash_key_iter = hash_key.iter();
399            let hash = hash_key_iter.next().unwrap().unwrap_uint64() % modulus;
400            let hash_key = SharedRow::pack(std::iter::once(hash.into()).chain(hash_key_iter));
401            (hash_key, row)
402        });
403
404        // If validating: demux errors, otherwise we cannot produce errors.
405        let (input, oks, errs) = if validating {
406            // Build topk stage, produce errors for invalid multiplicities.
407            let (input, stage) = build_topk_negated_stage::<
408                S,
409                _,
410                RowValBuilder<_, _, _>,
411                RowValSpine<Result<Row, Row>, _, _>,
412            >(&input, order_key, offset, limit, arity);
413            let stage = stage.as_collection(|k, v| (k.into_owned(), v.clone()));
414
415            // Demux oks and errors.
416            let error_logger = self.error_logger();
417            type CB<C> = CapacityContainerBuilder<C>;
418            let (oks, errs) = stage.map_fallible::<CB<_>, CB<_>, _, _, _>(
419                "Demuxing Errors",
420                move |(hk, result)| match result {
421                    Err(v) => {
422                        let mut hk_iter = hk.iter();
423                        let h = hk_iter.next().unwrap().unwrap_uint64();
424                        let k = SharedRow::pack(hk_iter);
425                        let message = "Negative multiplicities in TopK";
426                        error_logger.log(message, &format!("k={k:?}, h={h}, v={v:?}"));
427                        Err(EvalError::Internal(message.into()).into())
428                    }
429                    Ok(t) => Ok((hk, t)),
430                },
431            );
432            (input, oks, Some(errs))
433        } else {
434            // Build non-validating topk stage.
435            let (input, stage) =
436                build_topk_negated_stage::<S, _, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
437                    &input, order_key, offset, limit, arity,
438                );
439            // Turn arrangement into collection.
440            let stage = stage.as_collection(|k, v| (k.into_owned(), v.into_owned()));
441
442            (input, stage, None)
443        };
444        let input = input.as_collection(|k, v| (k.into_owned(), v.into_owned()));
445        (oks.concat(&input), errs)
446    }
447
448    fn render_top1_monotonic<S>(
449        &self,
450        collection: Collection<S, Row, Diff>,
451        group_key: Vec<usize>,
452        order_key: Vec<mz_expr::ColumnOrder>,
453        must_consolidate: bool,
454    ) -> (Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>)
455    where
456        S: Scope<Timestamp = G::Timestamp>,
457    {
458        // We can place our rows directly into the diff field, and only keep the relevant one
459        // corresponding to evaluating our aggregate, instead of having to do a hierarchical
460        // reduction. We start by mapping the group key along with the row and consolidating
461        // if required to do so.
462        let collection = collection
463            .map({
464                let mut datum_vec = mz_repr::DatumVec::new();
465                move |row| {
466                    // Scoped to allow borrow of `row` to drop.
467                    let group_key = {
468                        let datums = datum_vec.borrow_with(&row);
469                        SharedRow::pack(group_key.iter().map(|i| datums[*i]))
470                    };
471                    (group_key, row)
472                }
473            })
474            .consolidate_named_if::<KeyBatcher<_, _, _>>(
475                must_consolidate,
476                "Consolidated MonotonicTop1 input",
477            );
478
479        // It should be now possible to ensure that we have a monotonic collection and process it.
480        let error_logger = self.error_logger();
481        let (partial, errs) = collection.ensure_monotonic(move |data, diff| {
482            error_logger.log(
483                "Non-monotonic input to MonotonicTop1",
484                &format!("data={data:?}, diff={diff}"),
485            );
486            let m = "tried to build monotonic top-1 on non-monotonic input".into();
487            (EvalError::Internal(m).into(), Diff::ONE)
488        });
489        let partial: KeyCollection<_, _, _> = partial
490            .explode_one(move |(group_key, row)| {
491                (
492                    group_key,
493                    monoids::Top1Monoid {
494                        row,
495                        order_key: order_key.clone(),
496                    },
497                )
498            })
499            .into();
500        let result = partial
501            .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
502                "Arranged MonotonicTop1 partial [val: empty]",
503            )
504            .mz_reduce_abelian::<_, _, _, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
505                "MonotonicTop1",
506                move |_key, input, output| {
507                    let accum: &monoids::Top1Monoid = &input[0].1;
508                    output.push((accum.row.clone(), Diff::ONE));
509                },
510            );
511        // TODO(database-issues#2288): Here we discard the arranged output.
512        (result.as_collection(|_k, v| v.into_owned()), errs)
513    }
514}
515
516/// Build a stage of a topk reduction. Maintains the _retractions_ of the output instead of emitted
517/// rows. This has the benefit that we have to maintain state proportionally to size of the output
518/// instead of the size of the input.
519///
520/// Returns two arrangements:
521/// * The arranged input data without modifications, and
522/// * the maintained negated output data.
523fn build_topk_negated_stage<G, V, Bu, Tr>(
524    input: &Collection<G, (Row, Row), Diff>,
525    order_key: Vec<mz_expr::ColumnOrder>,
526    offset: usize,
527    limit: Option<mz_expr::MirScalarExpr>,
528    arity: usize,
529) -> (
530    Arranged<G, TraceAgent<RowRowSpine<G::Timestamp, Diff>>>,
531    Arranged<G, TraceAgent<Tr>>,
532)
533where
534    G: Scope,
535    G::Timestamp: Lattice + Columnation,
536    V: MaybeValidatingRow<Row, Row>,
537    Bu: Builder<Time = G::Timestamp, Output = Tr::Batch>,
538    Bu::Input: Container + PushInto<((Row, V), G::Timestamp, Diff)>,
539    Tr: Trace
540        + for<'a> TraceReader<Key<'a> = DatumSeq<'a>, Time = G::Timestamp, Diff = Diff>
541        + 'static,
542    for<'a> Tr::Val<'a>: IntoOwned<'a, Owned = V>,
543    Tr::Batch: Batch,
544    Arranged<G, TraceAgent<Tr>>: ArrangementSize,
545{
546    let mut datum_vec = mz_repr::DatumVec::new();
547
548    // We only want to arrange parts of the input that are not part of the actual output
549    // such that `input.concat(&negated_output)` yields the correct TopK
550    // NOTE(vmarcos): The arranged input operator name below is used in the tuning advice
551    // built-in view mz_introspection.mz_expected_group_size_advice.
552    let arranged = input.mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
553        "Arranged TopK input",
554    );
555
556    let reduced = arranged.mz_reduce_abelian::<_, _, _, Bu, Tr>("Reduced TopK input", {
557        move |mut hash_key, source, target: &mut Vec<(V, Diff)>| {
558            // Unpack the limit, either into an integer literal or an expression to evaluate.
559            let limit: Option<Diff> = limit.as_ref().map(|l| {
560                if let Some(l) = l.as_literal_int64() {
561                    l.into()
562                } else {
563                    // Unpack `key` after skipping the hash and determine the limit.
564                    // If the limit errors, use a zero limit; errors are surfaced elsewhere.
565                    let temp_storage = mz_repr::RowArena::new();
566                    let _hash = hash_key.next();
567                    let mut key_datums = datum_vec.borrow();
568                    key_datums.extend(hash_key);
569                    let datum_limit = l
570                        .eval(&key_datums, &temp_storage)
571                        .unwrap_or(Datum::Int64(0));
572                    if datum_limit == Datum::Null {
573                        Diff::MAX
574                    } else {
575                        datum_limit.unwrap_int64().into()
576                    }
577                }
578            });
579
580            if let Some(err) = V::into_error() {
581                for (datums, diff) in source.iter() {
582                    if diff.is_positive() {
583                        continue;
584                    }
585                    target.push((err((*datums).into_owned()), Diff::ONE));
586                    return;
587                }
588            }
589
590            // Determine if we must actually shrink the result set.
591            let must_shrink = offset > 0
592                || limit
593                    .map(|l| source.iter().map(|(_, d)| *d).sum::<Diff>() > l)
594                    .unwrap_or(false);
595            if !must_shrink {
596                return;
597            }
598
599            // First go ahead and emit all records. Note that we ensure target
600            // has the capacity to hold at least these records, and avoid any
601            // dependencies on the user-provided (potentially unbounded) limit.
602            target.reserve(source.len());
603            for (datums, diff) in source.iter() {
604                target.push((V::ok((*datums).into_owned()), -diff));
605            }
606            // local copies that may count down to zero.
607            let mut offset = offset;
608            let mut limit = limit;
609
610            // The order in which we should produce rows.
611            let mut indexes = (0..source.len()).collect::<Vec<_>>();
612            // We decode the datums once, into a common buffer for efficiency.
613            // Each row should contain `arity` columns; we should check that.
614            let mut buffer = datum_vec.borrow();
615            for (index, (datums, _)) in source.iter().enumerate() {
616                buffer.extend(*datums);
617                assert_eq!(buffer.len(), arity * (index + 1));
618            }
619            let width = buffer.len() / source.len();
620
621            //todo: use arrangements or otherwise make the sort more performant?
622            indexes.sort_by(|left, right| {
623                let left = &buffer[left * width..][..width];
624                let right = &buffer[right * width..][..width];
625                // Note: source was originally ordered by the u8 array representation
626                // of rows, but left.cmp(right) uses Datum::cmp.
627                mz_expr::compare_columns(&order_key, left, right, || left.cmp(right))
628            });
629
630            // We now need to lay out the data in order of `buffer`, but respecting
631            // the `offset` and `limit` constraints.
632            for index in indexes.into_iter() {
633                let (datums, mut diff) = source[index];
634                if !diff.is_positive() {
635                    continue;
636                }
637                // If we are still skipping early records ...
638                if offset > 0 {
639                    let to_skip =
640                        std::cmp::min(offset, usize::try_from(diff.into_inner()).unwrap());
641                    offset -= to_skip;
642                    diff -= Diff::try_from(to_skip).unwrap();
643                }
644                // We should produce at most `limit` records.
645                if let Some(limit) = &mut limit {
646                    diff = std::cmp::min(diff, Diff::from(*limit));
647                    *limit -= diff;
648                }
649                // Output the indicated number of rows.
650                if diff.is_positive() {
651                    // Emit retractions for the elements actually part of
652                    // the set of TopK elements.
653                    target.push((V::ok(datums.into_owned()), diff));
654                }
655            }
656        }
657    });
658    (arranged, reduced)
659}
660
661fn render_intra_ts_thinning<S>(
662    collection: Collection<S, (Row, Row), Diff>,
663    order_key: Vec<mz_expr::ColumnOrder>,
664    limit: mz_expr::MirScalarExpr,
665) -> Collection<S, (Row, Row), Diff>
666where
667    S: Scope,
668    S::Timestamp: Lattice,
669{
670    let mut datum_vec = mz_repr::DatumVec::new();
671
672    let mut aggregates = BTreeMap::new();
673    let shared = Rc::new(RefCell::new(monoids::Top1MonoidShared {
674        order_key,
675        left: DatumVec::new(),
676        right: DatumVec::new(),
677    }));
678    collection
679        .inner
680        .unary_notify(
681            Pipeline,
682            "TopKIntraTimeThinning",
683            [],
684            move |input, output, notificator| {
685                while let Some((time, data)) = input.next() {
686                    let agg_time = aggregates
687                        .entry(time.time().clone())
688                        .or_insert_with(BTreeMap::new);
689                    for ((grp_row, row), record_time, diff) in data.drain(..) {
690                        let monoid = monoids::Top1MonoidLocal {
691                            row,
692                            shared: Rc::clone(&shared),
693                        };
694
695                        // Evalute the limit, first as a constant and then against the key if needed.
696                        let limit = if let Some(l) = limit.as_literal_int64() {
697                            l
698                        } else {
699                            let temp_storage = mz_repr::RowArena::new();
700                            let key_datums = datum_vec.borrow_with(&grp_row);
701                            // Unpack `key` and determine the limit.
702                            // If the limit errors, use a zero limit; errors are surfaced elsewhere.
703                            let datum_limit = limit
704                                .eval(&key_datums, &temp_storage)
705                                .unwrap_or(mz_repr::Datum::Int64(0));
706                            if datum_limit == Datum::Null {
707                                i64::MAX
708                            } else {
709                                datum_limit.unwrap_int64()
710                            }
711                        };
712
713                        let topk = agg_time
714                            .entry((grp_row, record_time))
715                            .or_insert_with(move || topk_agg::TopKBatch::new(limit));
716                        topk.update(monoid, diff.into_inner());
717                    }
718                    notificator.notify_at(time.retain());
719                }
720
721                notificator.for_each(|time, _, _| {
722                    if let Some(aggs) = aggregates.remove(time.time()) {
723                        let mut session = output.session(&time);
724                        for ((grp_row, record_time), topk) in aggs {
725                            session.give_iterator(topk.into_iter().map(|(monoid, diff)| {
726                                (
727                                    (grp_row.clone(), monoid.into_row()),
728                                    record_time.clone(),
729                                    diff.into(),
730                                )
731                            }))
732                        }
733                    }
734                });
735            },
736        )
737        .as_collection()
738}
739
740/// Types for in-place intra-ts aggregation of monotonic streams.
741pub mod topk_agg {
742    use differential_dataflow::consolidation;
743    use smallvec::SmallVec;
744
745    // TODO: This struct looks a lot like ChangeBatch and indeed its code is a modified version of
746    // that. It would be nice to find a way to reuse some or all of the code from there.
747    //
748    // Additionally, because we're calling into DD's consolidate method we are forced to work with
749    // the `Ord` trait which for the usage we do above means that we need to clone the `order_key`
750    // for each record. It would be nice to also remove the need for cloning that piece of data
751    pub struct TopKBatch<T> {
752        updates: SmallVec<[(T, i64); 16]>,
753        clean: usize,
754        limit: i64,
755    }
756
757    impl<T: Ord> TopKBatch<T> {
758        pub fn new(limit: i64) -> Self {
759            Self {
760                updates: SmallVec::new(),
761                clean: 0,
762                limit,
763            }
764        }
765
766        /// Adds a new update, for `item` with `value`.
767        ///
768        /// This could be optimized to perform compaction when the number of "dirty" elements exceeds
769        /// half the length of the list, which would keep the total footprint within reasonable bounds
770        /// even under an arbitrary number of updates. This has a cost, and it isn't clear whether it
771        /// is worth paying without some experimentation.
772        #[inline]
773        pub fn update(&mut self, item: T, value: i64) {
774            self.updates.push((item, value));
775            self.maintain_bounds();
776        }
777
778        /// Compact the internal representation.
779        ///
780        /// This method sort `self.updates` and consolidates elements with equal item, discarding
781        /// any whose accumulation is zero. It is optimized to only do this if the number of dirty
782        /// elements is non-zero.
783        #[inline]
784        pub fn compact(&mut self) {
785            if self.clean < self.updates.len() && self.updates.len() > 1 {
786                let len = consolidation::consolidate_slice(&mut self.updates);
787                self.updates.truncate(len);
788
789                // We can now retain only the first K records and throw away everything else
790                let mut limit = self.limit;
791                self.updates.retain(|x| {
792                    if limit > 0 {
793                        limit -= x.1;
794                        true
795                    } else {
796                        false
797                    }
798                });
799                // By the end of the loop above `limit` will either be:
800                // (a) Positive, in which case all updates were retained;
801                // (b) Zero, in which case we discarded all updates after limit became zero;
802                // (c) Negative, in which case the last record we retained had more copies
803                // than necessary. In this latter case, we need to do one final adjustment
804                // of the diff field of the last record so that the total sum of the diffs
805                // in the batch is K.
806                if limit < 0 {
807                    if let Some(item) = self.updates.last_mut() {
808                        // We are subtracting the limit *negated*, therefore we are subtracting a value
809                        // that is *greater* than or equal to zero, which represents the excess.
810                        item.1 -= -limit;
811                    }
812                }
813            }
814            self.clean = self.updates.len();
815        }
816
817        /// Maintain the bounds of pending (non-compacted) updates versus clean (compacted) data.
818        /// This function tries to minimize work by only compacting if enough work has accumulated.
819        fn maintain_bounds(&mut self) {
820            // if we have more than 32 elements and at least half of them are not clean, compact
821            if self.updates.len() > 32 && self.updates.len() >> 1 >= self.clean {
822                self.compact()
823            }
824        }
825    }
826
827    impl<T: Ord> IntoIterator for TopKBatch<T> {
828        type Item = (T, i64);
829        type IntoIter = smallvec::IntoIter<[(T, i64); 16]>;
830
831        fn into_iter(mut self) -> Self::IntoIter {
832            self.compact();
833            self.updates.into_iter()
834        }
835    }
836}
837
838/// Monoids for in-place compaction of monotonic streams.
839pub mod monoids {
840    use std::cell::RefCell;
841    use std::cmp::Ordering;
842    use std::hash::{Hash, Hasher};
843    use std::rc::Rc;
844
845    use differential_dataflow::containers::{Columnation, Region};
846    use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
847    use mz_expr::ColumnOrder;
848    use mz_repr::{DatumVec, Diff, Row};
849    use serde::{Deserialize, Serialize};
850
851    /// A monoid containing a row and an ordering.
852    #[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Hash, Default)]
853    pub struct Top1Monoid {
854        pub row: Row,
855        pub order_key: Vec<ColumnOrder>,
856    }
857
858    impl Multiply<Diff> for Top1Monoid {
859        type Output = Self;
860
861        fn multiply(self, factor: &Diff) -> Self {
862            // Multiplication in Top1Monoid is idempotent, and its
863            // users must ascertain its monotonicity beforehand
864            // (typically with ensure_monotonic) since it has no zero
865            // value for us to use here.
866            assert!(factor.is_positive());
867            self
868        }
869    }
870
871    impl Ord for Top1Monoid {
872        fn cmp(&self, other: &Self) -> Ordering {
873            debug_assert_eq!(self.order_key, other.order_key);
874
875            // It might be nice to cache this row decoding like the non-monotonic codepath, but we'd
876            // have to store the decoded Datums in the same struct as the Row, which gets tricky.
877            let left: Vec<_> = self.row.unpack();
878            let right: Vec<_> = other.row.unpack();
879            mz_expr::compare_columns(&self.order_key, &left, &right, || left.cmp(&right))
880        }
881    }
882    impl PartialOrd for Top1Monoid {
883        fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
884            Some(self.cmp(other))
885        }
886    }
887
888    impl Semigroup for Top1Monoid {
889        fn plus_equals(&mut self, rhs: &Self) {
890            let cmp = (*self).cmp(rhs);
891            // NB: Reminder that TopK returns the _minimum_ K items.
892            if cmp == Ordering::Greater {
893                self.clone_from(rhs);
894            }
895        }
896    }
897
898    impl IsZero for Top1Monoid {
899        fn is_zero(&self) -> bool {
900            false
901        }
902    }
903
904    impl Columnation for Top1Monoid {
905        type InnerRegion = Top1MonoidRegion;
906    }
907
908    #[derive(Default)]
909    pub struct Top1MonoidRegion {
910        row_region: <Row as Columnation>::InnerRegion,
911        order_key_region: <Vec<ColumnOrder> as Columnation>::InnerRegion,
912    }
913
914    impl Region for Top1MonoidRegion {
915        type Item = Top1Monoid;
916
917        unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
918            let row = unsafe { self.row_region.copy(&item.row) };
919            let order_key = unsafe { self.order_key_region.copy(&item.order_key) };
920            Self::Item { row, order_key }
921        }
922
923        fn clear(&mut self) {
924            self.row_region.clear();
925            self.order_key_region.clear();
926        }
927
928        fn reserve_items<'a, I>(&mut self, items1: I)
929        where
930            Self: 'a,
931            I: Iterator<Item = &'a Self::Item> + Clone,
932        {
933            let items2 = items1.clone();
934            self.row_region
935                .reserve_items(items1.into_iter().map(|s| &s.row));
936            self.order_key_region
937                .reserve_items(items2.into_iter().map(|s| &s.order_key));
938        }
939
940        fn reserve_regions<'a, I>(&mut self, regions1: I)
941        where
942            Self: 'a,
943            I: Iterator<Item = &'a Self> + Clone,
944        {
945            let regions2 = regions1.clone();
946            self.row_region
947                .reserve_regions(regions1.into_iter().map(|s| &s.row_region));
948            self.order_key_region
949                .reserve_regions(regions2.into_iter().map(|s| &s.order_key_region));
950        }
951
952        fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
953            self.row_region.heap_size(&mut callback);
954            self.order_key_region.heap_size(callback);
955        }
956    }
957
958    /// A shared portion of a thread-local top-1 monoid implementation.
959    #[derive(Debug)]
960    pub struct Top1MonoidShared {
961        pub order_key: Vec<ColumnOrder>,
962        pub left: DatumVec,
963        pub right: DatumVec,
964    }
965
966    /// A monoid containing a row and a shared pointer to a shared structure.
967    /// Only suitable for thread-local aggregations.
968    #[derive(Debug, Clone)]
969    pub struct Top1MonoidLocal {
970        pub row: Row,
971        pub shared: Rc<RefCell<Top1MonoidShared>>,
972    }
973
974    impl Top1MonoidLocal {
975        pub fn into_row(self) -> Row {
976            self.row
977        }
978    }
979
980    impl PartialEq for Top1MonoidLocal {
981        fn eq(&self, other: &Self) -> bool {
982            self.row.eq(&other.row)
983        }
984    }
985
986    impl Eq for Top1MonoidLocal {}
987
988    impl Hash for Top1MonoidLocal {
989        fn hash<H: Hasher>(&self, state: &mut H) {
990            self.row.hash(state);
991        }
992    }
993
994    impl Ord for Top1MonoidLocal {
995        fn cmp(&self, other: &Self) -> Ordering {
996            debug_assert!(Rc::ptr_eq(&self.shared, &other.shared));
997            let Top1MonoidShared {
998                left,
999                right,
1000                order_key,
1001            } = &mut *self.shared.borrow_mut();
1002
1003            let left = left.borrow_with(&self.row);
1004            let right = right.borrow_with(&other.row);
1005            mz_expr::compare_columns(order_key, &left, &right, || left.cmp(&right))
1006        }
1007    }
1008
1009    impl PartialOrd for Top1MonoidLocal {
1010        fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1011            Some(self.cmp(other))
1012        }
1013    }
1014
1015    impl Semigroup for Top1MonoidLocal {
1016        fn plus_equals(&mut self, rhs: &Self) {
1017            let cmp = (*self).cmp(rhs);
1018            // NB: Reminder that TopK returns the _minimum_ K items.
1019            if cmp == Ordering::Greater {
1020                self.clone_from(rhs);
1021            }
1022        }
1023    }
1024
1025    impl IsZero for Top1MonoidLocal {
1026        fn is_zero(&self) -> bool {
1027            false
1028        }
1029    }
1030}