mz_compute/render/
top_k.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! TopK execution logic.
11//!
12//! Consult [TopKPlan] documentation for details.
13
14use std::cell::RefCell;
15use std::collections::BTreeMap;
16use std::rc::Rc;
17
18use differential_dataflow::Data;
19use differential_dataflow::hashable::Hashable;
20use differential_dataflow::lattice::Lattice;
21use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
22use differential_dataflow::trace::{Builder, Trace};
23use differential_dataflow::{AsCollection, Collection};
24use mz_compute_types::plan::top_k::{
25    BasicTopKPlan, MonotonicTop1Plan, MonotonicTopKPlan, TopKPlan,
26};
27use mz_expr::func::CastUint64ToInt64;
28use mz_expr::{BinaryFunc, EvalError, MirScalarExpr, UnaryFunc};
29use mz_ore::cast::CastFrom;
30use mz_ore::soft_assert_or_log;
31use mz_repr::{Datum, DatumVec, Diff, Row, ScalarType, SharedRow};
32use mz_storage_types::errors::DataflowError;
33use mz_timely_util::operator::CollectionExt;
34use timely::Container;
35use timely::container::{CapacityContainerBuilder, PushInto};
36use timely::dataflow::Scope;
37use timely::dataflow::channels::pact::Pipeline;
38use timely::dataflow::operators::Operator;
39
40use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
41use crate::extensions::reduce::MzReduce;
42use crate::render::Pairer;
43use crate::render::context::{CollectionBundle, Context};
44use crate::render::errors::MaybeValidatingRow;
45use crate::row_spine::{
46    DatumSeq, RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowValBuilder, RowValSpine,
47};
48use crate::typedefs::{KeyBatcher, MzTimestamp, RowRowSpine, RowSpine};
49
50// The implementation requires integer timestamps to be able to delay feedback for monotonic inputs.
51impl<G> Context<G>
52where
53    G: Scope,
54    G::Timestamp: crate::render::RenderTimestamp,
55{
56    pub(crate) fn render_topk(
57        &self,
58        input: CollectionBundle<G>,
59        top_k_plan: TopKPlan,
60    ) -> CollectionBundle<G> {
61        let (ok_input, err_input) = input.as_specific_collection(None, &self.config_set);
62
63        // We create a new region to compartmentalize the topk logic.
64        let (ok_result, err_collection) = ok_input.scope().region_named("TopK", |inner| {
65            let ok_input = ok_input.enter_region(inner);
66            let mut err_collection = err_input.enter_region(inner);
67
68            // Determine if there should be errors due to limit evaluation; update `err_collection`.
69            // TODO(vmarcos): We evaluate the limit expression below for each input update. There
70            // is an opportunity to do so for every group key instead if the error handling is
71            // integrated with: 1. The intra-timestamp thinning step in monotonic top-k, e.g., by
72            // adding an error output there; 2. The validating reduction on basic top-k
73            // (database-issues#7108).
74
75            match top_k_plan.limit().map(|l| (l.as_literal(), l)) {
76                None => {}
77                Some((Some(Ok(literal)), _))
78                    if literal == Datum::Null || literal.unwrap_int64() >= 0 => {}
79                Some((_, expr)) => {
80                    // Produce errors from limit selectors that error or are
81                    // negative, and nothing from limit selectors that do
82                    // not. Note that even if expr.could_error() is false,
83                    // the expression might still return a negative limit and
84                    // thus needs to be checked.
85                    let expr = expr.clone();
86                    let mut datum_vec = mz_repr::DatumVec::new();
87                    let errors = ok_input.flat_map(move |row| {
88                        let temp_storage = mz_repr::RowArena::new();
89                        let datums = datum_vec.borrow_with(&row);
90                        match expr.eval(&datums[..], &temp_storage) {
91                            Ok(l) if l != Datum::Null && l.unwrap_int64() < 0 => {
92                                Some(EvalError::NegLimit.into())
93                            }
94                            Ok(_) => None,
95                            Err(e) => Some(e.into()),
96                        }
97                    });
98                    err_collection = err_collection.concat(&errors);
99                }
100            }
101
102            let ok_result = match top_k_plan {
103                TopKPlan::MonotonicTop1(MonotonicTop1Plan {
104                    group_key,
105                    order_key,
106                    must_consolidate,
107                }) => {
108                    let (oks, errs) = self.render_top1_monotonic(
109                        ok_input,
110                        group_key,
111                        order_key,
112                        must_consolidate,
113                    );
114                    err_collection = err_collection.concat(&errs);
115                    oks
116                }
117                TopKPlan::MonotonicTopK(MonotonicTopKPlan {
118                    order_key,
119                    group_key,
120                    arity,
121                    mut limit,
122                    must_consolidate,
123                }) => {
124                    // Must permute `limit` to reference `group_key` elements as if in order.
125                    if let Some(expr) = limit.as_mut() {
126                        let mut map = BTreeMap::new();
127                        for (index, column) in group_key.iter().enumerate() {
128                            map.insert(*column, index);
129                        }
130                        expr.permute_map(&map);
131                    }
132
133                    // Map the group key along with the row and consolidate if required to do so.
134                    let mut datum_vec = mz_repr::DatumVec::new();
135                    let collection = ok_input
136                        .map(move |row| {
137                            let group_row = {
138                                let datums = datum_vec.borrow_with(&row);
139                                SharedRow::pack(group_key.iter().map(|i| datums[*i]))
140                            };
141                            (group_row, row)
142                        })
143                        .consolidate_named_if::<KeyBatcher<_, _, _>>(
144                            must_consolidate,
145                            "Consolidated MonotonicTopK input",
146                        );
147
148                    // It should be now possible to ensure that we have a monotonic collection.
149                    let error_logger = self.error_logger();
150                    let (collection, errs) = collection.ensure_monotonic(move |data, diff| {
151                        error_logger.log(
152                            "Non-monotonic input to MonotonicTopK",
153                            &format!("data={data:?}, diff={diff}"),
154                        );
155                        let m = "tried to build monotonic top-k on non-monotonic input".into();
156                        (DataflowError::from(EvalError::Internal(m)), Diff::ONE)
157                    });
158                    err_collection = err_collection.concat(&errs);
159
160                    // For monotonic inputs, we are able to thin the input relation in two stages:
161                    // 1. First, we can do an intra-timestamp thinning which has the advantage of
162                    //    being computed in a streaming fashion, even for the initial snapshot.
163                    // 2. Then, we can do inter-timestamp thinning by feeding back negations for
164                    //    any records that have been invalidated.
165                    let collection = if let Some(limit) = limit.clone() {
166                        render_intra_ts_thinning(collection, order_key.clone(), limit)
167                    } else {
168                        collection
169                    };
170
171                    let pairer = Pairer::new(1);
172                    let collection = collection.map(move |(group_row, row)| {
173                        let hash = row.hashed();
174                        let hash_key = pairer.merge(std::iter::once(Datum::from(hash)), &group_row);
175                        (hash_key, row)
176                    });
177
178                    // For monotonic inputs, we are able to retract inputs that can no longer be produced
179                    // as outputs. Any inputs beyond `offset + limit` will never again be produced as
180                    // outputs, and can be removed. The simplest form of this is when `offset == 0` and
181                    // these removable records are those in the input not produced in the output.
182                    // TODO: consider broadening this optimization to `offset > 0` by first filtering
183                    // down to `offset = 0` and `limit = offset + limit`, followed by a finishing act
184                    // of `offset` and `limit`, discarding only the records not produced in the intermediate
185                    // stage.
186                    use differential_dataflow::operators::iterate::Variable;
187                    let delay = std::time::Duration::from_secs(10);
188                    let retractions = Variable::new(
189                        &mut ok_input.scope(),
190                        <G::Timestamp as crate::render::RenderTimestamp>::system_delay(
191                            delay.try_into().expect("must fit"),
192                        ),
193                    );
194                    let thinned = collection.concat(&retractions.negate());
195
196                    // As an additional optimization, we can skip creating the full topk hierachy
197                    // here since we now have an upper bound on the number records due to the
198                    // intra-ts thinning. The maximum number of records per timestamp is
199                    // (num_workers * limit), which we expect to be a small number and so we render
200                    // a single topk stage.
201                    let (result, errs) =
202                        self.build_topk_stage(thinned, order_key, 1u64, 0, limit, arity, false);
203                    // Consolidate the output of `build_topk_stage` because it's not guaranteed to be.
204                    let result = result.consolidate_named::<KeyBatcher<_, _, _>>(
205                        "Monotonic TopK final consolidate",
206                    );
207                    retractions.set(&collection.concat(&result.negate()));
208                    soft_assert_or_log!(
209                        errs.is_none(),
210                        "requested no validation, but received error collection"
211                    );
212
213                    result.map(|(_key_hash, row)| row)
214                }
215                TopKPlan::Basic(BasicTopKPlan {
216                    group_key,
217                    order_key,
218                    offset,
219                    mut limit,
220                    arity,
221                    buckets,
222                }) => {
223                    // Must permute `limit` to reference `group_key` elements as if in order.
224                    if let Some(expr) = limit.as_mut() {
225                        let mut map = BTreeMap::new();
226                        for (index, column) in group_key.iter().enumerate() {
227                            map.insert(*column, index);
228                        }
229                        expr.permute_map(&map);
230                    }
231
232                    let (oks, errs) = self.build_topk(
233                        ok_input, group_key, order_key, offset, limit, arity, buckets,
234                    );
235                    err_collection = err_collection.concat(&errs);
236                    oks
237                }
238            };
239
240            // Extract the results from the region.
241            (ok_result.leave_region(), err_collection.leave_region())
242        });
243
244        CollectionBundle::from_collections(ok_result, err_collection)
245    }
246
247    /// Constructs a TopK dataflow subgraph.
248    fn build_topk<S>(
249        &self,
250        collection: Collection<S, Row, Diff>,
251        group_key: Vec<usize>,
252        order_key: Vec<mz_expr::ColumnOrder>,
253        offset: usize,
254        limit: Option<mz_expr::MirScalarExpr>,
255        arity: usize,
256        buckets: Vec<u64>,
257    ) -> (Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>)
258    where
259        S: Scope<Timestamp = G::Timestamp>,
260    {
261        let pairer = Pairer::new(1);
262        let mut datum_vec = mz_repr::DatumVec::new();
263        let mut collection = collection.map({
264            move |row| {
265                let group_row = {
266                    let row_hash = row.hashed();
267                    let datums = datum_vec.borrow_with(&row);
268                    let iterator = group_key.iter().map(|i| datums[*i]);
269                    pairer.merge(std::iter::once(Datum::from(row_hash)), iterator)
270                };
271                (group_row, row)
272            }
273        });
274
275        let mut validating = true;
276        let mut err_collection: Option<Collection<S, _, _>> = None;
277
278        if let Some(mut limit) = limit.clone() {
279            // We may need a new `limit` that reflects the addition of `offset`.
280            // Ideally we compile it down to a literal if at all possible.
281            if offset > 0 {
282                let new_limit = (|| {
283                    let limit = limit.as_literal_int64()?;
284                    let offset = i64::try_from(offset).ok()?;
285                    limit.checked_add(offset)
286                })();
287
288                if let Some(new_limit) = new_limit {
289                    limit = MirScalarExpr::literal_ok(Datum::Int64(new_limit), ScalarType::Int64);
290                } else {
291                    limit = limit.call_binary(
292                        MirScalarExpr::literal_ok(
293                            Datum::UInt64(u64::cast_from(offset)),
294                            ScalarType::UInt64,
295                        )
296                        .call_unary(UnaryFunc::CastUint64ToInt64(CastUint64ToInt64)),
297                        BinaryFunc::AddInt64,
298                    );
299                }
300            }
301
302            // These bucket values define the shifts that happen to the 64 bit hash of the
303            // record, and should have the properties that 1. there are not too many of them,
304            // and 2. each has a modest difference to the next.
305            for bucket in buckets.into_iter() {
306                // here we do not apply `offset`, but instead restrict ourself with a limit
307                // that includes the offset. We cannot apply `offset` until we perform the
308                // final, complete reduction.
309                let (oks, errs) = self.build_topk_stage(
310                    collection,
311                    order_key.clone(),
312                    bucket,
313                    0,
314                    Some(limit.clone()),
315                    arity,
316                    validating,
317                );
318                collection = oks;
319                if validating {
320                    err_collection = errs;
321                    validating = false;
322                }
323            }
324        }
325
326        // We do a final step, both to make sure that we complete the reduction, and to correctly
327        // apply `offset` to the final group, as we have not yet been applying it to the partially
328        // formed groups.
329        let (oks, errs) = self.build_topk_stage(
330            collection, order_key, 1u64, offset, limit, arity, validating,
331        );
332        // Consolidate the output of `build_topk_stage` because it's not guaranteed to be.
333        let oks = oks.consolidate_named::<KeyBatcher<_, _, _>>("TopK final consolidate");
334        collection = oks;
335        if validating {
336            err_collection = errs;
337        }
338        (
339            collection.map(|(_key_hash, row)| row),
340            err_collection.expect("at least one stage validated its inputs"),
341        )
342    }
343
344    /// To provide a robust incremental orderby-limit experience, we want to avoid grouping *all*
345    /// records (or even large groups) and then applying the ordering and limit. Instead, a more
346    /// robust approach forms groups of bounded size and applies the offset and limit to each,
347    /// and then increases the sizes of the groups.
348    ///
349    /// Builds a "stage", which uses a finer grouping than is required to reduce the volume of
350    /// updates, and to reduce the amount of work on the critical path for updates. The cost is
351    /// a larger number of arrangements when this optimization does nothing beneficial.
352    ///
353    /// The function accepts a collection of the form `(hash_key, row)`, a modulus it applies to the
354    /// `hash_key`'s hash datum, an `offset` for returning results, and a `limit` to restrict the
355    /// output size. `arity` represents the number of columns in the input data, and
356    /// if `validating` is true, we check for negative multiplicities, which indicate
357    /// an error in the input data.
358    ///
359    /// The output of this function is _not consolidated_.
360    ///
361    /// The dataflow fragment has the following shape:
362    /// ```text
363    ///     | input
364    ///     |
365    ///   arrange
366    ///     |\
367    ///     | \
368    ///     |  reduce
369    ///     |  |
370    ///     concat
371    ///     |
372    ///     | output
373    /// ```
374    /// There are additional map/flat_map operators as well as error demuxing operators, but we're
375    /// omitting them here for the sake of simplicity.
376    fn build_topk_stage<S>(
377        &self,
378        collection: Collection<S, (Row, Row), Diff>,
379        order_key: Vec<mz_expr::ColumnOrder>,
380        modulus: u64,
381        offset: usize,
382        limit: Option<mz_expr::MirScalarExpr>,
383        arity: usize,
384        validating: bool,
385    ) -> (
386        Collection<S, (Row, Row), Diff>,
387        Option<Collection<S, DataflowError, Diff>>,
388    )
389    where
390        S: Scope<Timestamp = G::Timestamp>,
391    {
392        // Form appropriate input by updating the `hash` column (first datum in `hash_key`) by
393        // applying `modulus`.
394        let input = collection.map(move |(hash_key, row)| {
395            let mut hash_key_iter = hash_key.iter();
396            let hash = hash_key_iter.next().unwrap().unwrap_uint64() % modulus;
397            let hash_key = SharedRow::pack(std::iter::once(hash.into()).chain(hash_key_iter));
398            (hash_key, row)
399        });
400
401        // If validating: demux errors, otherwise we cannot produce errors.
402        let (input, oks, errs) = if validating {
403            // Build topk stage, produce errors for invalid multiplicities.
404            let (input, stage) = build_topk_negated_stage::<
405                S,
406                RowValBuilder<_, _, _>,
407                RowValSpine<Result<Row, Row>, _, _>,
408            >(&input, order_key, offset, limit, arity);
409            let stage = stage.as_collection(|k, v| (k.to_row(), v.clone()));
410
411            // Demux oks and errors.
412            let error_logger = self.error_logger();
413            type CB<C> = CapacityContainerBuilder<C>;
414            let (oks, errs) = stage.map_fallible::<CB<_>, CB<_>, _, _, _>(
415                "Demuxing Errors",
416                move |(hk, result)| match result {
417                    Err(v) => {
418                        let mut hk_iter = hk.iter();
419                        let h = hk_iter.next().unwrap().unwrap_uint64();
420                        let k = SharedRow::pack(hk_iter);
421                        let message = "Negative multiplicities in TopK";
422                        error_logger.log(message, &format!("k={k:?}, h={h}, v={v:?}"));
423                        Err(EvalError::Internal(message.into()).into())
424                    }
425                    Ok(t) => Ok((hk, t)),
426                },
427            );
428            (input, oks, Some(errs))
429        } else {
430            // Build non-validating topk stage.
431            let (input, stage) =
432                build_topk_negated_stage::<S, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
433                    &input, order_key, offset, limit, arity,
434                );
435            // Turn arrangement into collection.
436            let stage = stage.as_collection(|k, v| (k.to_row(), v.to_row()));
437
438            (input, stage, None)
439        };
440        let input = input.as_collection(|k, v| (k.to_row(), v.to_row()));
441        (oks.concat(&input), errs)
442    }
443
444    fn render_top1_monotonic<S>(
445        &self,
446        collection: Collection<S, Row, Diff>,
447        group_key: Vec<usize>,
448        order_key: Vec<mz_expr::ColumnOrder>,
449        must_consolidate: bool,
450    ) -> (Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>)
451    where
452        S: Scope<Timestamp = G::Timestamp>,
453    {
454        // We can place our rows directly into the diff field, and only keep the relevant one
455        // corresponding to evaluating our aggregate, instead of having to do a hierarchical
456        // reduction. We start by mapping the group key along with the row and consolidating
457        // if required to do so.
458        let collection = collection
459            .map({
460                let mut datum_vec = mz_repr::DatumVec::new();
461                move |row| {
462                    // Scoped to allow borrow of `row` to drop.
463                    let group_key = {
464                        let datums = datum_vec.borrow_with(&row);
465                        SharedRow::pack(group_key.iter().map(|i| datums[*i]))
466                    };
467                    (group_key, row)
468                }
469            })
470            .consolidate_named_if::<KeyBatcher<_, _, _>>(
471                must_consolidate,
472                "Consolidated MonotonicTop1 input",
473            );
474
475        // It should be now possible to ensure that we have a monotonic collection and process it.
476        let error_logger = self.error_logger();
477        let (partial, errs) = collection.ensure_monotonic(move |data, diff| {
478            error_logger.log(
479                "Non-monotonic input to MonotonicTop1",
480                &format!("data={data:?}, diff={diff}"),
481            );
482            let m = "tried to build monotonic top-1 on non-monotonic input".into();
483            (EvalError::Internal(m).into(), Diff::ONE)
484        });
485        let partial: KeyCollection<_, _, _> = partial
486            .explode_one(move |(group_key, row)| {
487                (
488                    group_key,
489                    monoids::Top1Monoid {
490                        row,
491                        order_key: order_key.clone(),
492                    },
493                )
494            })
495            .into();
496        let result = partial
497            .mz_arrange::<RowBatcher<_, _>, RowBuilder<_, _>, RowSpine<_, _>>(
498                "Arranged MonotonicTop1 partial [val: empty]",
499            )
500            .mz_reduce_abelian::<_, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
501                "MonotonicTop1",
502                move |_key, input, output| {
503                    let accum: &monoids::Top1Monoid = &input[0].1;
504                    output.push((accum.row.clone(), Diff::ONE));
505                },
506            );
507        // TODO(database-issues#2288): Here we discard the arranged output.
508        (result.as_collection(|_k, v| v.to_row()), errs)
509    }
510}
511
512/// Build a stage of a topk reduction. Maintains the _retractions_ of the output instead of emitted
513/// rows. This has the benefit that we have to maintain state proportionally to size of the output
514/// instead of the size of the input.
515///
516/// Returns two arrangements:
517/// * The arranged input data without modifications, and
518/// * the maintained negated output data.
519fn build_topk_negated_stage<G, Bu, Tr>(
520    input: &Collection<G, (Row, Row), Diff>,
521    order_key: Vec<mz_expr::ColumnOrder>,
522    offset: usize,
523    limit: Option<mz_expr::MirScalarExpr>,
524    arity: usize,
525) -> (
526    Arranged<G, TraceAgent<RowRowSpine<G::Timestamp, Diff>>>,
527    Arranged<G, TraceAgent<Tr>>,
528)
529where
530    G: Scope,
531    G::Timestamp: MzTimestamp,
532    Bu: Builder<
533            Time = G::Timestamp,
534            Input: Container + PushInto<((Row, Tr::ValOwn), G::Timestamp, Diff)>,
535            Output = Tr::Batch,
536        >,
537    Tr: for<'a> Trace<
538            Key<'a> = DatumSeq<'a>,
539            KeyOwn = Row,
540            ValOwn: Data + MaybeValidatingRow<Row, Row>,
541            Time = G::Timestamp,
542            Diff = Diff,
543        > + 'static,
544    Arranged<G, TraceAgent<Tr>>: ArrangementSize,
545{
546    let mut datum_vec = mz_repr::DatumVec::new();
547
548    // We only want to arrange parts of the input that are not part of the actual output
549    // such that `input.concat(&negated_output)` yields the correct TopK
550    // NOTE(vmarcos): The arranged input operator name below is used in the tuning advice
551    // built-in view mz_introspection.mz_expected_group_size_advice.
552    let arranged = input.mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
553        "Arranged TopK input",
554    );
555
556    // Eagerly evaluate literal limits.
557    let limit = limit.map(|l| match l.as_literal() {
558        Some(Ok(Datum::Null)) => Ok(Diff::MAX),
559        Some(Ok(d)) => Ok(Diff::from(d.unwrap_int64())),
560        _ => Err(l),
561    });
562
563    let reduced = arranged.mz_reduce_abelian::<_, Bu, Tr>("Reduced TopK input", {
564        move |mut hash_key, source, target: &mut Vec<(Tr::ValOwn, Diff)>| {
565            // Unpack the limit, either into an integer literal or an expression to evaluate.
566            let limit = match &limit {
567                Some(Ok(lit)) => Some(*lit),
568                Some(Err(expr)) => {
569                    // Unpack `key` after skipping the hash and determine the limit.
570                    // If the limit errors, use a zero limit; errors are surfaced elsewhere.
571                    let temp_storage = mz_repr::RowArena::new();
572                    let _hash = hash_key.next();
573                    let mut key_datums = datum_vec.borrow();
574                    key_datums.extend(hash_key);
575                    let datum_limit = expr
576                        .eval(&key_datums, &temp_storage)
577                        .unwrap_or(Datum::Int64(0));
578                    Some(match datum_limit {
579                        Datum::Null => Diff::MAX,
580                        d => Diff::from(d.unwrap_int64()),
581                    })
582                }
583                None => None,
584            };
585
586            if let Some(err) = Tr::ValOwn::into_error() {
587                for (datums, diff) in source.iter() {
588                    if diff.is_positive() {
589                        continue;
590                    }
591                    target.push((err((*datums).to_row()), Diff::ONE));
592                    return;
593                }
594            }
595
596            // Determine if we must actually shrink the result set.
597            let must_shrink = offset > 0
598                || limit
599                    .map(|l| source.iter().map(|(_, d)| *d).sum::<Diff>() > l)
600                    .unwrap_or(false);
601            if !must_shrink {
602                return;
603            }
604
605            // First go ahead and emit all records. Note that we ensure target
606            // has the capacity to hold at least these records, and avoid any
607            // dependencies on the user-provided (potentially unbounded) limit.
608            target.reserve(source.len());
609            for (datums, diff) in source.iter() {
610                target.push((Tr::ValOwn::ok((*datums).to_row()), -diff));
611            }
612            // local copies that may count down to zero.
613            let mut offset = offset;
614            let mut limit = limit;
615
616            // The order in which we should produce rows.
617            let mut indexes = (0..source.len()).collect::<Vec<_>>();
618            // We decode the datums once, into a common buffer for efficiency.
619            // Each row should contain `arity` columns; we should check that.
620            let mut buffer = datum_vec.borrow();
621            for (index, (datums, _)) in source.iter().enumerate() {
622                buffer.extend(*datums);
623                assert_eq!(buffer.len(), arity * (index + 1));
624            }
625            let width = buffer.len() / source.len();
626
627            //todo: use arrangements or otherwise make the sort more performant?
628            indexes.sort_by(|left, right| {
629                let left = &buffer[left * width..][..width];
630                let right = &buffer[right * width..][..width];
631                // Note: source was originally ordered by the u8 array representation
632                // of rows, but left.cmp(right) uses Datum::cmp.
633                mz_expr::compare_columns(&order_key, left, right, || left.cmp(right))
634            });
635
636            // We now need to lay out the data in order of `buffer`, but respecting
637            // the `offset` and `limit` constraints.
638            for index in indexes.into_iter() {
639                let (datums, mut diff) = source[index];
640                if !diff.is_positive() {
641                    continue;
642                }
643                // If we are still skipping early records ...
644                if offset > 0 {
645                    let to_skip =
646                        std::cmp::min(offset, usize::try_from(diff.into_inner()).unwrap());
647                    offset -= to_skip;
648                    diff -= Diff::try_from(to_skip).unwrap();
649                }
650                // We should produce at most `limit` records.
651                if let Some(limit) = &mut limit {
652                    diff = std::cmp::min(diff, Diff::from(*limit));
653                    *limit -= diff;
654                }
655                // Output the indicated number of rows.
656                if diff.is_positive() {
657                    // Emit retractions for the elements actually part of
658                    // the set of TopK elements.
659                    target.push((Tr::ValOwn::ok(datums.to_row()), diff));
660                }
661            }
662        }
663    });
664    (arranged, reduced)
665}
666
667fn render_intra_ts_thinning<S>(
668    collection: Collection<S, (Row, Row), Diff>,
669    order_key: Vec<mz_expr::ColumnOrder>,
670    limit: mz_expr::MirScalarExpr,
671) -> Collection<S, (Row, Row), Diff>
672where
673    S: Scope,
674    S::Timestamp: Lattice,
675{
676    let mut datum_vec = mz_repr::DatumVec::new();
677
678    let mut aggregates = BTreeMap::new();
679    let shared = Rc::new(RefCell::new(monoids::Top1MonoidShared {
680        order_key,
681        left: DatumVec::new(),
682        right: DatumVec::new(),
683    }));
684    collection
685        .inner
686        .unary_notify(
687            Pipeline,
688            "TopKIntraTimeThinning",
689            [],
690            move |input, output, notificator| {
691                while let Some((time, data)) = input.next() {
692                    let agg_time = aggregates
693                        .entry(time.time().clone())
694                        .or_insert_with(BTreeMap::new);
695                    for ((grp_row, row), record_time, diff) in data.drain(..) {
696                        let monoid = monoids::Top1MonoidLocal {
697                            row,
698                            shared: Rc::clone(&shared),
699                        };
700
701                        // Evalute the limit, first as a constant and then against the key if needed.
702                        let limit = if let Some(l) = limit.as_literal_int64() {
703                            l
704                        } else {
705                            let temp_storage = mz_repr::RowArena::new();
706                            let key_datums = datum_vec.borrow_with(&grp_row);
707                            // Unpack `key` and determine the limit.
708                            // If the limit errors, use a zero limit; errors are surfaced elsewhere.
709                            let datum_limit = limit
710                                .eval(&key_datums, &temp_storage)
711                                .unwrap_or(mz_repr::Datum::Int64(0));
712                            if datum_limit == Datum::Null {
713                                i64::MAX
714                            } else {
715                                datum_limit.unwrap_int64()
716                            }
717                        };
718
719                        let topk = agg_time
720                            .entry((grp_row, record_time))
721                            .or_insert_with(move || topk_agg::TopKBatch::new(limit));
722                        topk.update(monoid, diff.into_inner());
723                    }
724                    notificator.notify_at(time.retain());
725                }
726
727                notificator.for_each(|time, _, _| {
728                    if let Some(aggs) = aggregates.remove(time.time()) {
729                        let mut session = output.session(&time);
730                        for ((grp_row, record_time), topk) in aggs {
731                            session.give_iterator(topk.into_iter().map(|(monoid, diff)| {
732                                (
733                                    (grp_row.clone(), monoid.into_row()),
734                                    record_time.clone(),
735                                    diff.into(),
736                                )
737                            }))
738                        }
739                    }
740                });
741            },
742        )
743        .as_collection()
744}
745
746/// Types for in-place intra-ts aggregation of monotonic streams.
747pub mod topk_agg {
748    use differential_dataflow::consolidation;
749    use smallvec::SmallVec;
750
751    // TODO: This struct looks a lot like ChangeBatch and indeed its code is a modified version of
752    // that. It would be nice to find a way to reuse some or all of the code from there.
753    //
754    // Additionally, because we're calling into DD's consolidate method we are forced to work with
755    // the `Ord` trait which for the usage we do above means that we need to clone the `order_key`
756    // for each record. It would be nice to also remove the need for cloning that piece of data
757    pub struct TopKBatch<T> {
758        updates: SmallVec<[(T, i64); 16]>,
759        clean: usize,
760        limit: i64,
761    }
762
763    impl<T: Ord> TopKBatch<T> {
764        pub fn new(limit: i64) -> Self {
765            Self {
766                updates: SmallVec::new(),
767                clean: 0,
768                limit,
769            }
770        }
771
772        /// Adds a new update, for `item` with `value`.
773        ///
774        /// This could be optimized to perform compaction when the number of "dirty" elements exceeds
775        /// half the length of the list, which would keep the total footprint within reasonable bounds
776        /// even under an arbitrary number of updates. This has a cost, and it isn't clear whether it
777        /// is worth paying without some experimentation.
778        #[inline]
779        pub fn update(&mut self, item: T, value: i64) {
780            self.updates.push((item, value));
781            self.maintain_bounds();
782        }
783
784        /// Compact the internal representation.
785        ///
786        /// This method sort `self.updates` and consolidates elements with equal item, discarding
787        /// any whose accumulation is zero. It is optimized to only do this if the number of dirty
788        /// elements is non-zero.
789        #[inline]
790        pub fn compact(&mut self) {
791            if self.clean < self.updates.len() && self.updates.len() > 1 {
792                let len = consolidation::consolidate_slice(&mut self.updates);
793                self.updates.truncate(len);
794
795                // We can now retain only the first K records and throw away everything else
796                let mut limit = self.limit;
797                self.updates.retain(|x| {
798                    if limit > 0 {
799                        limit -= x.1;
800                        true
801                    } else {
802                        false
803                    }
804                });
805                // By the end of the loop above `limit` will either be:
806                // (a) Positive, in which case all updates were retained;
807                // (b) Zero, in which case we discarded all updates after limit became zero;
808                // (c) Negative, in which case the last record we retained had more copies
809                // than necessary. In this latter case, we need to do one final adjustment
810                // of the diff field of the last record so that the total sum of the diffs
811                // in the batch is K.
812                if limit < 0 {
813                    if let Some(item) = self.updates.last_mut() {
814                        // We are subtracting the limit *negated*, therefore we are subtracting a value
815                        // that is *greater* than or equal to zero, which represents the excess.
816                        item.1 -= -limit;
817                    }
818                }
819            }
820            self.clean = self.updates.len();
821        }
822
823        /// Maintain the bounds of pending (non-compacted) updates versus clean (compacted) data.
824        /// This function tries to minimize work by only compacting if enough work has accumulated.
825        fn maintain_bounds(&mut self) {
826            // if we have more than 32 elements and at least half of them are not clean, compact
827            if self.updates.len() > 32 && self.updates.len() >> 1 >= self.clean {
828                self.compact()
829            }
830        }
831    }
832
833    impl<T: Ord> IntoIterator for TopKBatch<T> {
834        type Item = (T, i64);
835        type IntoIter = smallvec::IntoIter<[(T, i64); 16]>;
836
837        fn into_iter(mut self) -> Self::IntoIter {
838            self.compact();
839            self.updates.into_iter()
840        }
841    }
842}
843
844/// Monoids for in-place compaction of monotonic streams.
845pub mod monoids {
846    use std::cell::RefCell;
847    use std::cmp::Ordering;
848    use std::hash::{Hash, Hasher};
849    use std::rc::Rc;
850
851    use differential_dataflow::containers::{Columnation, Region};
852    use differential_dataflow::difference::{IsZero, Multiply, Semigroup};
853    use mz_expr::ColumnOrder;
854    use mz_repr::{DatumVec, Diff, Row};
855    use serde::{Deserialize, Serialize};
856
857    /// A monoid containing a row and an ordering.
858    #[derive(Eq, PartialEq, Debug, Serialize, Deserialize, Hash, Default)]
859    pub struct Top1Monoid {
860        pub row: Row,
861        pub order_key: Vec<ColumnOrder>,
862    }
863
864    impl Clone for Top1Monoid {
865        #[inline]
866        fn clone(&self) -> Self {
867            Self {
868                row: self.row.clone(),
869                order_key: self.order_key.clone(),
870            }
871        }
872
873        #[inline]
874        fn clone_from(&mut self, source: &Self) {
875            self.row.clone_from(&source.row);
876            self.order_key.clone_from(&source.order_key);
877        }
878    }
879
880    impl Multiply<Diff> for Top1Monoid {
881        type Output = Self;
882
883        fn multiply(self, factor: &Diff) -> Self {
884            // Multiplication in Top1Monoid is idempotent, and its
885            // users must ascertain its monotonicity beforehand
886            // (typically with ensure_monotonic) since it has no zero
887            // value for us to use here.
888            assert!(factor.is_positive());
889            self
890        }
891    }
892
893    impl Ord for Top1Monoid {
894        fn cmp(&self, other: &Self) -> Ordering {
895            debug_assert_eq!(self.order_key, other.order_key);
896
897            // It might be nice to cache this row decoding like the non-monotonic codepath, but we'd
898            // have to store the decoded Datums in the same struct as the Row, which gets tricky.
899            let left: Vec<_> = self.row.unpack();
900            let right: Vec<_> = other.row.unpack();
901            mz_expr::compare_columns(&self.order_key, &left, &right, || left.cmp(&right))
902        }
903    }
904    impl PartialOrd for Top1Monoid {
905        fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
906            Some(self.cmp(other))
907        }
908    }
909
910    impl Semigroup for Top1Monoid {
911        fn plus_equals(&mut self, rhs: &Self) {
912            let cmp = (*self).cmp(rhs);
913            // NB: Reminder that TopK returns the _minimum_ K items.
914            if cmp == Ordering::Greater {
915                self.clone_from(rhs);
916            }
917        }
918    }
919
920    impl IsZero for Top1Monoid {
921        fn is_zero(&self) -> bool {
922            false
923        }
924    }
925
926    impl Columnation for Top1Monoid {
927        type InnerRegion = Top1MonoidRegion;
928    }
929
930    #[derive(Default)]
931    pub struct Top1MonoidRegion {
932        row_region: <Row as Columnation>::InnerRegion,
933        order_key_region: <Vec<ColumnOrder> as Columnation>::InnerRegion,
934    }
935
936    impl Region for Top1MonoidRegion {
937        type Item = Top1Monoid;
938
939        unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
940            let row = unsafe { self.row_region.copy(&item.row) };
941            let order_key = unsafe { self.order_key_region.copy(&item.order_key) };
942            Self::Item { row, order_key }
943        }
944
945        fn clear(&mut self) {
946            self.row_region.clear();
947            self.order_key_region.clear();
948        }
949
950        fn reserve_items<'a, I>(&mut self, items1: I)
951        where
952            Self: 'a,
953            I: Iterator<Item = &'a Self::Item> + Clone,
954        {
955            let items2 = items1.clone();
956            self.row_region
957                .reserve_items(items1.into_iter().map(|s| &s.row));
958            self.order_key_region
959                .reserve_items(items2.into_iter().map(|s| &s.order_key));
960        }
961
962        fn reserve_regions<'a, I>(&mut self, regions1: I)
963        where
964            Self: 'a,
965            I: Iterator<Item = &'a Self> + Clone,
966        {
967            let regions2 = regions1.clone();
968            self.row_region
969                .reserve_regions(regions1.into_iter().map(|s| &s.row_region));
970            self.order_key_region
971                .reserve_regions(regions2.into_iter().map(|s| &s.order_key_region));
972        }
973
974        fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
975            self.row_region.heap_size(&mut callback);
976            self.order_key_region.heap_size(callback);
977        }
978    }
979
980    /// A shared portion of a thread-local top-1 monoid implementation.
981    #[derive(Debug)]
982    pub struct Top1MonoidShared {
983        pub order_key: Vec<ColumnOrder>,
984        pub left: DatumVec,
985        pub right: DatumVec,
986    }
987
988    /// A monoid containing a row and a shared pointer to a shared structure.
989    /// Only suitable for thread-local aggregations.
990    #[derive(Debug, Clone)]
991    pub struct Top1MonoidLocal {
992        pub row: Row,
993        pub shared: Rc<RefCell<Top1MonoidShared>>,
994    }
995
996    impl Top1MonoidLocal {
997        pub fn into_row(self) -> Row {
998            self.row
999        }
1000    }
1001
1002    impl PartialEq for Top1MonoidLocal {
1003        fn eq(&self, other: &Self) -> bool {
1004            self.row.eq(&other.row)
1005        }
1006    }
1007
1008    impl Eq for Top1MonoidLocal {}
1009
1010    impl Hash for Top1MonoidLocal {
1011        fn hash<H: Hasher>(&self, state: &mut H) {
1012            self.row.hash(state);
1013        }
1014    }
1015
1016    impl Ord for Top1MonoidLocal {
1017        fn cmp(&self, other: &Self) -> Ordering {
1018            debug_assert!(Rc::ptr_eq(&self.shared, &other.shared));
1019            let Top1MonoidShared {
1020                left,
1021                right,
1022                order_key,
1023            } = &mut *self.shared.borrow_mut();
1024
1025            let left = left.borrow_with(&self.row);
1026            let right = right.borrow_with(&other.row);
1027            mz_expr::compare_columns(order_key, &left, &right, || left.cmp(&right))
1028        }
1029    }
1030
1031    impl PartialOrd for Top1MonoidLocal {
1032        fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1033            Some(self.cmp(other))
1034        }
1035    }
1036
1037    impl Semigroup for Top1MonoidLocal {
1038        fn plus_equals(&mut self, rhs: &Self) {
1039            let cmp = (*self).cmp(rhs);
1040            // NB: Reminder that TopK returns the _minimum_ K items.
1041            if cmp == Ordering::Greater {
1042                self.clone_from(rhs);
1043            }
1044        }
1045    }
1046
1047    impl IsZero for Top1MonoidLocal {
1048        fn is_zero(&self) -> bool {
1049            false
1050        }
1051    }
1052}