Skip to main content

mz_compute/render/
context.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Management of dataflow-local state, like arrangements, while building a
11//! dataflow.
12
13use std::collections::BTreeMap;
14use std::rc::Rc;
15
16use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
17use differential_dataflow::operators::arrange::Arranged;
18use differential_dataflow::trace::implementations::BatchContainer;
19use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
20use differential_dataflow::{AsCollection, Data, VecCollection};
21use mz_compute_types::dataflows::DataflowDescription;
22use mz_compute_types::dyncfgs::{
23    ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION, ENABLE_COMPUTE_TEMPORAL_BUCKETING,
24    TEMPORAL_BUCKETING_SUMMARY,
25};
26use mz_compute_types::plan::{ArrangementStrategy, AvailableCollections};
27use mz_dyncfg::ConfigSet;
28use mz_expr::{Id, MapFilterProject, MirScalarExpr};
29use mz_ore::soft_assert_or_log;
30use mz_repr::fixed_length::ToDatumIter;
31use mz_repr::{DatumVec, DatumVecBorrow, Diff, GlobalId, Row, RowArena, SharedRow};
32use mz_storage_types::controller::CollectionMetadata;
33use mz_timely_util::columnar::builder::ColumnBuilder;
34use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
35use mz_timely_util::operator::CollectionExt;
36use timely::container::CapacityContainerBuilder;
37use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
38use timely::dataflow::operators::Capability;
39use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
40use timely::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession};
41use timely::dataflow::{Scope, StreamVec};
42use timely::progress::operate::FrontierInterest;
43use timely::progress::{Antichain, Timestamp};
44
45use crate::compute_state::ComputeState;
46use crate::extensions::arrange::{KeyCollection, MzArrange, MzArrangeCore};
47use crate::render::errors::{DataflowErrorSer, ErrorLogger};
48use crate::render::{LinearJoinSpec, MaybeBucketByTime, RenderTimestamp};
49use crate::row_spine::{DatumSeq, RowRowBuilder};
50use crate::typedefs::{
51    ErrAgent, ErrBatcher, ErrBuilder, ErrEnter, ErrSpine, RowRowAgent, RowRowEnter, RowRowSpine,
52};
53
54/// Dataflow-local collections and arrangements.
55///
56/// A context means to wrap available data assets and present them in an easy-to-use manner.
57/// These assets include dataflow-local collections and arrangements, as well as imported
58/// arrangements from outside the dataflow.
59///
60/// Context has a timestamp type `T`, which is the timestamp used by the scope in question.
61pub struct Context<'scope, T: RenderTimestamp> {
62    /// The scope within which all managed collections exist.
63    ///
64    /// It is an error to add any collections not contained in this scope.
65    pub(crate) scope: Scope<'scope, T>,
66    /// The debug name of the dataflow associated with this context.
67    pub debug_name: String,
68    /// The Timely ID of the dataflow associated with this context.
69    pub dataflow_id: usize,
70    /// The collection IDs of exports of the dataflow associated with this context.
71    pub export_ids: Vec<GlobalId>,
72    /// Frontier before which updates should not be emitted.
73    ///
74    /// We *must* apply it to sinks, to ensure correct outputs.
75    /// We *should* apply it to sources and imported traces, because it improves performance.
76    pub as_of_frontier: Antichain<mz_repr::Timestamp>,
77    /// Frontier after which updates should not be emitted.
78    /// Used to limit the amount of work done when appropriate.
79    pub until: Antichain<mz_repr::Timestamp>,
80    /// Bindings of identifiers to collections.
81    pub bindings: BTreeMap<Id, CollectionBundle<'scope, T>>,
82    /// The logger, from Timely's logging framework, if logs are enabled.
83    pub(super) compute_logger: Option<crate::logging::compute::Logger>,
84    /// Specification for rendering linear joins.
85    pub(super) linear_join_spec: LinearJoinSpec,
86    /// The expiration time for dataflows in this context. The output's frontier should never advance
87    /// past this frontier, except the empty frontier.
88    pub dataflow_expiration: Antichain<mz_repr::Timestamp>,
89    /// The config set for this context.
90    pub config_set: Rc<ConfigSet>,
91}
92
93impl<'scope, T: RenderTimestamp> Context<'scope, T> {
94    /// Creates a new empty Context.
95    pub fn for_dataflow_in<Plan>(
96        dataflow: &DataflowDescription<Plan, CollectionMetadata>,
97        scope: Scope<'scope, T>,
98        compute_state: &ComputeState,
99        until: Antichain<mz_repr::Timestamp>,
100        dataflow_expiration: Antichain<mz_repr::Timestamp>,
101    ) -> Self {
102        use mz_ore::collections::CollectionExt as IteratorExt;
103        let dataflow_id = *scope.addr().into_first();
104        let as_of_frontier = dataflow
105            .as_of
106            .clone()
107            .unwrap_or_else(|| Antichain::from_elem(Timestamp::minimum()));
108
109        let export_ids = dataflow.export_ids().collect();
110
111        // Skip compute event logging for transient dataflows. We do this to avoid overhead for
112        // slow-path peeks, but it also affects subscribes. For now that seems fine, but we may
113        // want to reconsider in the future.
114        let compute_logger = if dataflow.is_transient() {
115            None
116        } else {
117            compute_state.compute_logger.clone()
118        };
119
120        Self {
121            scope,
122            debug_name: dataflow.debug_name.clone(),
123            dataflow_id,
124            export_ids,
125            as_of_frontier,
126            until,
127            bindings: BTreeMap::new(),
128            compute_logger,
129            linear_join_spec: compute_state.linear_join_spec,
130            dataflow_expiration,
131            config_set: Rc::clone(&compute_state.worker_config),
132        }
133    }
134}
135
136impl<'scope, T: RenderTimestamp> Context<'scope, T> {
137    /// Insert a collection bundle by an identifier.
138    ///
139    /// This is expected to be used to install external collections (sources, indexes, other views),
140    /// as well as for `Let` bindings of local collections.
141    pub fn insert_id(
142        &mut self,
143        id: Id,
144        collection: CollectionBundle<'scope, T>,
145    ) -> Option<CollectionBundle<'scope, T>> {
146        self.bindings.insert(id, collection)
147    }
148    /// Remove a collection bundle by an identifier.
149    ///
150    /// The primary use of this method is uninstalling `Let` bindings.
151    pub fn remove_id(&mut self, id: Id) -> Option<CollectionBundle<'scope, T>> {
152        self.bindings.remove(&id)
153    }
154    /// Melds a collection bundle to whatever exists.
155    pub fn update_id(&mut self, id: Id, collection: CollectionBundle<'scope, T>) {
156        if !self.bindings.contains_key(&id) {
157            self.bindings.insert(id, collection);
158        } else {
159            let binding = self
160                .bindings
161                .get_mut(&id)
162                .expect("Binding verified to exist");
163            if collection.collection.is_some() {
164                binding.collection = collection.collection;
165            }
166            for (key, flavor) in collection.arranged.into_iter() {
167                binding.arranged.insert(key, flavor);
168            }
169        }
170    }
171    /// Look up a collection bundle by an identifier.
172    pub fn lookup_id(&self, id: Id) -> Option<CollectionBundle<'scope, T>> {
173        self.bindings.get(&id).cloned()
174    }
175
176    pub(super) fn error_logger(&self) -> ErrorLogger {
177        ErrorLogger::new(self.debug_name.clone())
178    }
179}
180
181impl<'scope, T: RenderTimestamp> Context<'scope, T> {
182    /// Brings the underlying arrangements and collections into a region.
183    pub fn enter_region<'a>(
184        &self,
185        region: Scope<'a, T>,
186        bindings: Option<&std::collections::BTreeSet<Id>>,
187    ) -> Context<'a, T> {
188        let bindings = self
189            .bindings
190            .iter()
191            .filter(|(key, _)| bindings.as_ref().map(|b| b.contains(key)).unwrap_or(true))
192            .map(|(key, bundle)| (*key, bundle.enter_region(region)))
193            .collect();
194
195        Context {
196            scope: region,
197            debug_name: self.debug_name.clone(),
198            dataflow_id: self.dataflow_id.clone(),
199            export_ids: self.export_ids.clone(),
200            as_of_frontier: self.as_of_frontier.clone(),
201            until: self.until.clone(),
202            compute_logger: self.compute_logger.clone(),
203            linear_join_spec: self.linear_join_spec.clone(),
204            bindings,
205            dataflow_expiration: self.dataflow_expiration.clone(),
206            config_set: Rc::clone(&self.config_set),
207        }
208    }
209}
210
211/// Describes flavor of arrangement: local or imported trace.
212#[derive(Clone)]
213pub enum ArrangementFlavor<'scope, T: RenderTimestamp> {
214    /// A dataflow-local arrangement.
215    Local(
216        Arranged<'scope, RowRowAgent<T, Diff>>,
217        Arranged<'scope, ErrAgent<T, Diff>>,
218    ),
219    /// An imported trace from outside the dataflow.
220    ///
221    /// The `GlobalId` identifier exists so that exports of this same trace
222    /// can refer back to and depend on the original instance.
223    Trace(
224        GlobalId,
225        Arranged<'scope, RowRowEnter<mz_repr::Timestamp, Diff, T>>,
226        Arranged<'scope, ErrEnter<mz_repr::Timestamp, T>>,
227    ),
228}
229
230impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> {
231    /// Presents `self` as a stream of updates.
232    ///
233    /// Deprecated: This function is not fueled and hence risks flattening the whole arrangement.
234    ///
235    /// This method presents the contents as they are, without further computation.
236    /// If you have logic that could be applied to each record, consider using the
237    /// `flat_map` methods which allows this and can reduce the work done.
238    #[deprecated(note = "Use `flat_map` instead.")]
239    pub fn as_collection(
240        &self,
241    ) -> (
242        VecCollection<'scope, T, Row, Diff>,
243        VecCollection<'scope, T, DataflowErrorSer, Diff>,
244    ) {
245        let mut datums = DatumVec::new();
246        let logic = move |k: DatumSeq, v: DatumSeq| {
247            let mut datums_borrow = datums.borrow();
248            datums_borrow.extend(k);
249            datums_borrow.extend(v);
250            SharedRow::pack(&**datums_borrow)
251        };
252        match &self {
253            ArrangementFlavor::Local(oks, errs) => (
254                oks.clone().as_collection(logic),
255                errs.clone().as_collection(|k, &()| k.clone()),
256            ),
257            ArrangementFlavor::Trace(_, oks, errs) => (
258                oks.clone().as_collection(logic),
259                errs.clone().as_collection(|k, &()| k.clone()),
260            ),
261        }
262    }
263
264    /// Constructs and applies logic to elements of `self` and returns the results.
265    ///
266    /// The `logic` receives a vector of datums, a timestamp, and a diff, and produces
267    /// an iterator of `(D, T, Diff)` updates.
268    ///
269    /// If `key` is set, this is a promise that `logic` will produce no results on
270    /// records for which the key does not evaluate to the value. This is used to
271    /// leap directly to exactly those records.
272    ///
273    /// The `max_demand` parameter limits the number of columns decoded from the
274    /// input. Only the first `max_demand` columns are decoded. Pass `usize::MAX` to
275    /// decode all columns.
276    pub fn flat_map<D, I, L>(
277        &self,
278        key: Option<&Row>,
279        max_demand: usize,
280        mut logic: L,
281    ) -> (
282        StreamVec<'scope, T, I::Item>,
283        VecCollection<'scope, T, DataflowErrorSer, Diff>,
284    )
285    where
286        I: IntoIterator<Item = (D, T, Diff)>,
287        D: Data,
288        L: for<'a, 'b> FnMut(&'a mut DatumVecBorrow<'b>, T, Diff) -> I + 'static,
289    {
290        // Set a number of tuples after which the operator should yield.
291        // This allows us to remain responsive even when enumerating a substantial
292        // arrangement, as well as provides time to accumulate our produced output.
293        let refuel = 1000000;
294
295        let mut datums = DatumVec::new();
296        let logic = move |k: DatumSeq, v: DatumSeq, t, d| {
297            let mut datums_borrow = datums.borrow();
298            datums_borrow.extend(k.to_datum_iter().take(max_demand));
299            let max_demand = max_demand.saturating_sub(datums_borrow.len());
300            datums_borrow.extend(v.to_datum_iter().take(max_demand));
301            logic(&mut datums_borrow, t, d)
302        };
303
304        match &self {
305            ArrangementFlavor::Local(oks, errs) => {
306                let oks = CollectionBundle::<T>::flat_map_core(oks.clone(), key, logic, refuel);
307                let errs = errs.clone().as_collection(|k, &()| k.clone());
308                (oks, errs)
309            }
310            ArrangementFlavor::Trace(_, oks, errs) => {
311                let oks = CollectionBundle::<T>::flat_map_core(oks.clone(), key, logic, refuel);
312                let errs = errs.clone().as_collection(|k, &()| k.clone());
313                (oks, errs)
314            }
315        }
316    }
317}
318impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> {
319    /// The scope containing the collection bundle.
320    pub fn scope(&self) -> Scope<'scope, T> {
321        match self {
322            ArrangementFlavor::Local(oks, _errs) => oks.stream.scope(),
323            ArrangementFlavor::Trace(_gid, oks, _errs) => oks.stream.scope(),
324        }
325    }
326
327    /// Brings the arrangement flavor into a region.
328    pub fn enter_region<'a>(&self, region: Scope<'a, T>) -> ArrangementFlavor<'a, T> {
329        match self {
330            ArrangementFlavor::Local(oks, errs) => ArrangementFlavor::Local(
331                oks.clone().enter_region(region),
332                errs.clone().enter_region(region),
333            ),
334            ArrangementFlavor::Trace(gid, oks, errs) => ArrangementFlavor::Trace(
335                *gid,
336                oks.clone().enter_region(region),
337                errs.clone().enter_region(region),
338            ),
339        }
340    }
341}
342impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> {
343    /// Extracts the arrangement flavor from a region.
344    pub fn leave_region<'outer>(&self, outer: Scope<'outer, T>) -> ArrangementFlavor<'outer, T> {
345        match self {
346            ArrangementFlavor::Local(oks, errs) => ArrangementFlavor::Local(
347                oks.clone().leave_region(outer),
348                errs.clone().leave_region(outer),
349            ),
350            ArrangementFlavor::Trace(gid, oks, errs) => ArrangementFlavor::Trace(
351                *gid,
352                oks.clone().leave_region(outer),
353                errs.clone().leave_region(outer),
354            ),
355        }
356    }
357}
358
359/// A bundle of the various ways a collection can be represented.
360///
361/// This type maintains the invariant that it does contain at least one valid
362/// source of data, either a collection or at least one arrangement.
363#[derive(Clone)]
364pub struct CollectionBundle<'scope, T: RenderTimestamp> {
365    pub collection: Option<(
366        VecCollection<'scope, T, Row, Diff>,
367        VecCollection<'scope, T, DataflowErrorSer, Diff>,
368    )>,
369    pub arranged: BTreeMap<Vec<MirScalarExpr>, ArrangementFlavor<'scope, T>>,
370}
371
372impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
373    /// Construct a new collection bundle from update streams.
374    pub fn from_collections(
375        oks: VecCollection<'scope, T, Row, Diff>,
376        errs: VecCollection<'scope, T, DataflowErrorSer, Diff>,
377    ) -> Self {
378        Self {
379            collection: Some((oks, errs)),
380            arranged: BTreeMap::default(),
381        }
382    }
383
384    /// Inserts arrangements by the expressions on which they are keyed.
385    pub fn from_expressions(
386        exprs: Vec<MirScalarExpr>,
387        arrangements: ArrangementFlavor<'scope, T>,
388    ) -> Self {
389        let mut arranged = BTreeMap::new();
390        arranged.insert(exprs, arrangements);
391        Self {
392            collection: None,
393            arranged,
394        }
395    }
396
397    /// Inserts arrangements by the columns on which they are keyed.
398    pub fn from_columns<I: IntoIterator<Item = usize>>(
399        columns: I,
400        arrangements: ArrangementFlavor<'scope, T>,
401    ) -> Self {
402        let mut keys = Vec::new();
403        for column in columns {
404            keys.push(MirScalarExpr::column(column));
405        }
406        Self::from_expressions(keys, arrangements)
407    }
408
409    /// The scope containing the collection bundle.
410    pub fn scope(&self) -> Scope<'scope, T> {
411        if let Some((oks, _errs)) = &self.collection {
412            oks.inner.scope()
413        } else {
414            self.arranged
415                .values()
416                .next()
417                .expect("Must contain a valid collection")
418                .scope()
419        }
420    }
421
422    /// Brings the collection bundle into a region.
423    pub fn enter_region<'inner>(&self, region: Scope<'inner, T>) -> CollectionBundle<'inner, T> {
424        CollectionBundle {
425            collection: self.collection.as_ref().map(|(oks, errs)| {
426                (
427                    oks.clone().enter_region(region),
428                    errs.clone().enter_region(region),
429                )
430            }),
431            arranged: self
432                .arranged
433                .iter()
434                .map(|(key, bundle)| (key.clone(), bundle.enter_region(region)))
435                .collect(),
436        }
437    }
438}
439
440impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
441    /// Extracts the collection bundle from a region.
442    pub fn leave_region<'outer>(&self, outer: Scope<'outer, T>) -> CollectionBundle<'outer, T> {
443        CollectionBundle {
444            collection: self.collection.as_ref().map(|(oks, errs)| {
445                (
446                    oks.clone().leave_region(outer),
447                    errs.clone().leave_region(outer),
448                )
449            }),
450            arranged: self
451                .arranged
452                .iter()
453                .map(|(key, bundle)| (key.clone(), bundle.leave_region(outer)))
454                .collect(),
455        }
456    }
457}
458
459impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
460    /// Asserts that the arrangement for a specific key
461    /// (or the raw collection for no key) exists,
462    /// and returns the corresponding collection.
463    ///
464    /// This returns the collection as-is, without
465    /// doing any unthinning transformation.
466    /// Therefore, it should be used when the appropriate transformation
467    /// was planned as part of a following MFP.
468    ///
469    /// If `key` is specified, the function converts the arrangement to a collection. It uses either
470    /// the fueled `flat_map` or `as_collection` method, depending on the flag
471    /// [`ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION`].
472    pub fn as_specific_collection(
473        &self,
474        key: Option<&[MirScalarExpr]>,
475        config_set: &ConfigSet,
476    ) -> (
477        VecCollection<'scope, T, Row, Diff>,
478        VecCollection<'scope, T, DataflowErrorSer, Diff>,
479    ) {
480        // Any operator that uses this method was told to use a particular
481        // collection during LIR planning, where we should have made
482        // sure that that collection exists.
483        //
484        // If it doesn't, we panic.
485        match key {
486            None => self
487                .collection
488                .clone()
489                .expect("The unarranged collection doesn't exist."),
490            Some(key) => {
491                let arranged = self.arranged.get(key).unwrap_or_else(|| {
492                    panic!("The collection arranged by {:?} doesn't exist.", key)
493                });
494                if ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION.get(config_set) {
495                    // Decode all columns, pass max_demand as usize::MAX.
496                    let (ok, err) = arranged.flat_map(None, usize::MAX, |borrow, t, r| {
497                        Some((SharedRow::pack(borrow.iter()), t, r))
498                    });
499                    (ok.as_collection(), err)
500                } else {
501                    #[allow(deprecated)]
502                    arranged.as_collection()
503                }
504            }
505        }
506    }
507
508    /// Constructs and applies logic to elements of a collection and returns the results.
509    ///
510    /// The function applies `logic` on elements. The logic conceptually receives
511    /// `(&Row, &Row)` pairs in the form of a datum vec in the expected order.
512    ///
513    /// If `key_val` is set, this is a promise that `logic` will produce no results on
514    /// records for which the key does not evaluate to the value. This is used when we
515    /// have an arrangement by that key to leap directly to exactly those records.
516    /// It is important that `logic` still guard against data that does not satisfy
517    /// this constraint, as this method does not statically know that it will have
518    /// that arrangement.
519    ///
520    /// The `max_demand` parameter limits the number of columns decoded from the
521    /// input. Only the first `max_demand` columns are decoded. Pass `usize::MAX` to
522    /// decode all columns.
523    pub fn flat_map<D, I, L>(
524        &self,
525        key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
526        max_demand: usize,
527        mut logic: L,
528    ) -> (
529        StreamVec<'scope, T, I::Item>,
530        VecCollection<'scope, T, DataflowErrorSer, Diff>,
531    )
532    where
533        I: IntoIterator<Item = (D, T, Diff)>,
534        D: Data,
535        L: for<'a> FnMut(&'a mut DatumVecBorrow<'_>, T, Diff) -> I + 'static,
536    {
537        // If `key_val` is set, we should have to use the corresponding arrangement.
538        // If there isn't one, that implies an error in the contract between
539        // key-production and available arrangements.
540        if let Some((key, val)) = key_val {
541            self.arrangement(&key)
542                .expect("Should have ensured during planning that this arrangement exists.")
543                .flat_map(val.as_ref(), max_demand, logic)
544        } else {
545            use timely::dataflow::operators::vec::Map;
546            let (oks, errs) = self
547                .collection
548                .clone()
549                .expect("Invariant violated: CollectionBundle contains no collection.");
550            let mut datums = DatumVec::new();
551            let oks = oks.inner.flat_map(move |(v, t, d)| {
552                logic(&mut datums.borrow_with_limit(&v, max_demand), t, d)
553            });
554            (oks, errs)
555        }
556    }
557
558    /// Factored out common logic for using literal keys in general traces.
559    ///
560    /// This logic is sufficiently interesting that we want to write it only
561    /// once, and thereby avoid any skew in the two uses of the logic.
562    ///
563    /// The function presents the contents of the trace as `(key, value, time, delta)` tuples,
564    /// where key and value are potentially specialized, but convertible into rows.
565    fn flat_map_core<Tr, D, I, L>(
566        trace: Arranged<'scope, Tr>,
567        key: Option<&<Tr::KeyContainer as BatchContainer>::Owned>,
568        mut logic: L,
569        refuel: usize,
570    ) -> StreamVec<'scope, T, I::Item>
571    where
572        Tr: for<'a> TraceReader<
573                Key<'a>: ToDatumIter,
574                Val<'a>: ToDatumIter,
575                Time = T,
576                Diff = mz_repr::Diff,
577            > + Clone
578            + 'static,
579        <Tr::KeyContainer as BatchContainer>::Owned: PartialEq,
580        I: IntoIterator<Item = (D, Tr::Time, Tr::Diff)>,
581        D: Data,
582        L: FnMut(Tr::Key<'_>, Tr::Val<'_>, T, mz_repr::Diff) -> I + 'static,
583    {
584        use differential_dataflow::consolidation::ConsolidatingContainerBuilder as CB;
585        let scope = trace.stream.scope();
586
587        let mut key_con = Tr::KeyContainer::with_capacity(1);
588        if let Some(key) = &key {
589            key_con.push_own(key);
590        }
591        let mode = if key.is_some() { "index" } else { "scan" };
592        let name = format!("ArrangementFlatMap({})", mode);
593        use timely::dataflow::operators::Operator;
594        trace
595            .stream
596            .unary::<CB<_>, _, _, _>(Pipeline, &name, move |_, info| {
597                // Acquire an activator to reschedule the operator when it has unfinished work.
598                let activator = scope.activator_for(info.address);
599                // Maintain a list of work to do, cursor to navigate and process.
600                let mut todo = std::collections::VecDeque::new();
601                move |input, output| {
602                    let key = key_con.get(0);
603                    // First, dequeue all batches.
604                    input.for_each(|time, data| {
605                        let capability = time.retain(0);
606                        for batch in data.iter() {
607                            // enqueue a capability, cursor, and batch.
608                            todo.push_back(PendingWork::new(
609                                capability.clone(),
610                                batch.cursor(),
611                                batch.clone(),
612                            ));
613                        }
614                    });
615
616                    // Second, make progress on `todo`.
617                    let mut fuel = refuel;
618                    while !todo.is_empty() && fuel > 0 {
619                        todo.front_mut().unwrap().do_work(
620                            key.as_ref(),
621                            &mut logic,
622                            &mut fuel,
623                            output,
624                        );
625                        if fuel > 0 {
626                            todo.pop_front();
627                        }
628                    }
629                    // If we have not finished all work, re-activate the operator.
630                    if !todo.is_empty() {
631                        activator.activate();
632                    }
633                }
634            })
635    }
636
637    /// Look up an arrangement by the expressions that form the key.
638    ///
639    /// The result may be `None` if no such arrangement exists, or it may be one of many
640    /// "arrangement flavors" that represent the types of arranged data we might have.
641    pub fn arrangement(&self, key: &[MirScalarExpr]) -> Option<ArrangementFlavor<'scope, T>> {
642        self.arranged.get(key).map(|x| x.clone())
643    }
644}
645
646impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
647    /// Presents `self` as a stream of updates, having been subjected to `mfp`.
648    ///
649    /// This operator is able to apply the logic of `mfp` early, which can substantially
650    /// reduce the amount of data produced when `mfp` is non-trivial.
651    ///
652    /// The `key_val` argument, when present, indicates that a specific arrangement should
653    /// be used, and if, in addition, the `val` component is present,
654    /// that we can seek to the supplied row.
655    pub fn as_collection_core(
656        &self,
657        mut mfp: MapFilterProject,
658        key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
659        until: Antichain<mz_repr::Timestamp>,
660        config_set: &ConfigSet,
661    ) -> (
662        VecCollection<'scope, T, mz_repr::Row, Diff>,
663        VecCollection<'scope, T, DataflowErrorSer, Diff>,
664    ) {
665        mfp.optimize();
666        let mfp_plan = mfp.clone().into_plan().unwrap();
667
668        // If the MFP is trivial, we can just call `as_collection`.
669        // In the case that we weren't going to apply the `key_val` optimization,
670        // this path results in a slightly smaller and faster
671        // dataflow graph, and is intended to fix
672        // https://github.com/MaterializeInc/database-issues/issues/3111
673        let has_key_val = if let Some((_key, Some(_val))) = &key_val {
674            true
675        } else {
676            false
677        };
678
679        if mfp_plan.is_identity() && !has_key_val {
680            let key = key_val.map(|(k, _v)| k);
681            return self.as_specific_collection(key.as_deref(), config_set);
682        }
683
684        let max_demand = mfp.demand().iter().max().map(|x| *x + 1).unwrap_or(0);
685        mfp.permute_fn(|c| c, max_demand);
686        mfp.optimize();
687        let mfp_plan = mfp.into_plan().unwrap();
688
689        let mut datum_vec = DatumVec::new();
690        // Wrap in an `Rc` so that lifetimes work out.
691        let until = std::rc::Rc::new(until);
692
693        let (stream, errors) = self.flat_map(key_val, max_demand, move |row_datums, time, diff| {
694            let mut row_builder = SharedRow::get();
695            let until = std::rc::Rc::clone(&until);
696            let temp_storage = RowArena::new();
697            let row_iter = row_datums.iter();
698            let mut datums_local = datum_vec.borrow();
699            datums_local.extend(row_iter);
700            let time = time.clone();
701            let event_time = time.event_time();
702            mfp_plan
703                .evaluate(
704                    &mut datums_local,
705                    &temp_storage,
706                    event_time,
707                    diff.clone(),
708                    move |time| !until.less_equal(time),
709                    &mut row_builder,
710                )
711                .map(move |x| match x {
712                    Ok((row, event_time, diff)) => {
713                        // Copy the whole time, and re-populate event time.
714                        let mut time: T = time.clone();
715                        *time.event_time_mut() = event_time;
716                        (Ok(row), time, diff)
717                    }
718                    Err((e, event_time, diff)) => {
719                        // Copy the whole time, and re-populate event time.
720                        let mut time: T = time.clone();
721                        *time.event_time_mut() = event_time;
722                        (Err(e), time, diff)
723                    }
724                })
725        });
726
727        use differential_dataflow::AsCollection;
728        let (oks, errs) = stream
729            .as_collection()
730            .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>(
731                "OkErr",
732                |x| x,
733            );
734
735        (oks, errors.concat(errs))
736    }
737    pub fn ensure_collections(
738        mut self,
739        collections: AvailableCollections,
740        input_key: Option<Vec<MirScalarExpr>>,
741        input_mfp: MapFilterProject,
742        as_of: Antichain<mz_repr::Timestamp>,
743        until: Antichain<mz_repr::Timestamp>,
744        config_set: &ConfigSet,
745        strategy: ArrangementStrategy,
746    ) -> Self
747    where
748        T: MaybeBucketByTime,
749    {
750        if collections == Default::default() {
751            return self;
752        }
753        // Cache collection to avoid reforming it each time.
754        //
755        // TODO(mcsherry): In theory this could be faster run out of another arrangement,
756        // as the `map_fallible` that follows could be run against an arrangement itself.
757        //
758        // Note(btv): If we ever do that, we would then only need to make the raw collection here
759        // if `collections.raw` is true.
760
761        for (key, _, _) in collections.arranged.iter() {
762            soft_assert_or_log!(
763                !self.arranged.contains_key(key),
764                "LIR ArrangeBy tried to create an existing arrangement"
765            );
766        }
767
768        // Track whether we applied temporal bucketing, to avoid double-bucketing.
769        let mut bucketed = false;
770
771        // True iff at least one new arrangement will actually be built below. Bucketing only
772        // pays off when something downstream merges/compacts the future-stamped updates; on a
773        // pure raw collection (no new arrangement) the work is wasted.
774        let will_create_arrangement = collections
775            .arranged
776            .iter()
777            .any(|(key, _, _)| !self.arranged.contains_key(key));
778
779        // We need the collection if either (1) it is explicitly demanded, or (2) we are going to render any arrangement
780        let form_raw_collection = collections.raw || will_create_arrangement;
781        if form_raw_collection && self.collection.is_none() {
782            let (oks, errs) =
783                self.as_collection_core(input_mfp, input_key.map(|k| (k, None)), until, config_set);
784            // Apply temporal bucketing when the lowering selected `TemporalBucketing` and
785            // we will build at least one arrangement. This path fires when the collection
786            // must be formed from scratch (e.g., from an arrangement via as_collection_core).
787            let oks = if will_create_arrangement
788                && matches!(strategy, ArrangementStrategy::TemporalBucketing)
789                && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(config_set)
790            {
791                let summary: mz_repr::Timestamp = TEMPORAL_BUCKETING_SUMMARY
792                    .get(config_set)
793                    .try_into()
794                    .expect("must fit");
795                bucketed = true;
796                T::maybe_apply_temporal_bucketing(oks.inner, as_of.clone(), summary)
797            } else {
798                oks
799            };
800            self.collection = Some((oks, errs));
801        }
802        for (key, _, thinning) in collections.arranged {
803            if !self.arranged.contains_key(&key) {
804                // TODO: Consider allowing more expressive names.
805                let name = format!("ArrangeBy[{:?}]", key);
806
807                let (oks, errs) = self
808                    .collection
809                    .take()
810                    .expect("Collection constructed above");
811                // Apply temporal bucketing if the collection already existed on
812                // the bundle (e.g., from an upstream temporal Mfp or Get) and we
813                // haven't bucketed yet. This is the common path for temporal-MFP
814                // → ArrangeBy flows.
815                let oks = if !bucketed
816                    && matches!(strategy, ArrangementStrategy::TemporalBucketing)
817                    && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(config_set)
818                {
819                    let summary: mz_repr::Timestamp = TEMPORAL_BUCKETING_SUMMARY
820                        .get(config_set)
821                        .try_into()
822                        .expect("must fit");
823                    bucketed = true;
824                    T::maybe_apply_temporal_bucketing(oks.inner, as_of.clone(), summary)
825                } else {
826                    oks
827                };
828                let (oks, errs_keyed, passthrough) =
829                    Self::arrange_collection(&name, oks, key.clone(), thinning.clone());
830                let errs_concat: KeyCollection<_, _, _> = errs.clone().concat(errs_keyed).into();
831                self.collection = Some((passthrough, errs));
832                let errs =
833                    errs_concat.mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, ErrSpine<_, _>>(
834                        &format!("{}-errors", name),
835                    );
836                self.arranged
837                    .insert(key, ArrangementFlavor::Local(oks, errs));
838            }
839        }
840        self
841    }
842
843    /// Builds an arrangement from a collection, using the specified key and value thinning.
844    ///
845    /// The arrangement's key is based on the `key` expressions, and the value the input with
846    /// the `thinning` applied to it. It selects which of the input columns are included in the
847    /// value of the arrangement. The thinning is in support of permuting arrangements such that
848    /// columns in the key are not included in the value.
849    ///
850    /// In addition to the ok and err streams, we produce a passthrough stream that forwards
851    /// the input as-is, which allows downstream consumers to reuse the collection without
852    /// teeing the stream.
853    fn arrange_collection(
854        name: &String,
855        oks: VecCollection<'scope, T, Row, Diff>,
856        key: Vec<MirScalarExpr>,
857        thinning: Vec<usize>,
858    ) -> (
859        Arranged<'scope, RowRowAgent<T, Diff>>,
860        VecCollection<'scope, T, DataflowErrorSer, Diff>,
861        VecCollection<'scope, T, Row, Diff>,
862    ) {
863        // This operator implements a `map_fallible`, but produces columnar updates for the ok
864        // stream. The `map_fallible` cannot be used here because the closure cannot return
865        // references, which is what we need to push into columnar streams. Instead, we use a
866        // bespoke operator that also optimizes reuse of allocations across individual updates.
867        let mut builder = OperatorBuilder::new("FormArrangementKey".to_string(), oks.inner.scope());
868        let (ok_output, ok_stream) = builder.new_output();
869        let mut ok_output =
870            OutputBuilder::<_, ColumnBuilder<((Row, Row), T, Diff)>>::from(ok_output);
871        let (err_output, err_stream) = builder.new_output();
872        let mut err_output = OutputBuilder::from(err_output);
873        let (passthrough_output, passthrough_stream) = builder.new_output();
874        let mut passthrough_output = OutputBuilder::from(passthrough_output);
875        let mut input = builder.new_input(oks.inner, Pipeline);
876        builder.set_notify_for(0, FrontierInterest::Never);
877        builder.build(move |_capabilities| {
878            let mut key_buf = Row::default();
879            let mut val_buf = Row::default();
880            let mut datums = DatumVec::new();
881            let mut temp_storage = RowArena::new();
882            move |_frontiers| {
883                let mut ok_output = ok_output.activate();
884                let mut err_output = err_output.activate();
885                let mut passthrough_output = passthrough_output.activate();
886                input.for_each(|time, data| {
887                    let mut ok_session = ok_output.session_with_builder(&time);
888                    let mut err_session = err_output.session(&time);
889                    for (row, time, diff) in data.iter() {
890                        temp_storage.clear();
891                        let datums = datums.borrow_with(row);
892                        let key_iter = key.iter().map(|k| k.eval(&datums, &temp_storage));
893                        match key_buf.packer().try_extend(key_iter) {
894                            Ok(()) => {
895                                let val_datum_iter = thinning.iter().map(|c| datums[*c]);
896                                val_buf.packer().extend(val_datum_iter);
897                                ok_session.give(((&*key_buf, &*val_buf), time, diff));
898                            }
899                            Err(e) => {
900                                err_session.give((e.into(), time.clone(), *diff));
901                            }
902                        }
903                    }
904                    passthrough_output.session(&time).give_container(data);
905                });
906            }
907        });
908
909        let oks = ok_stream
910            .mz_arrange_core::<
911                _,
912                Col2ValBatcher<_, _, _, _>,
913                RowRowBuilder<_, _>,
914                RowRowSpine<_, _>,
915            >(
916                ExchangeCore::<ColumnBuilder<_>, _>::new_core(
917                    columnar_exchange::<Row, Row, T, Diff>,
918                ),
919                name
920            );
921        (
922            oks,
923            err_stream.as_collection(),
924            passthrough_stream.as_collection(),
925        )
926    }
927}
928
929struct PendingWork<C>
930where
931    C: Cursor,
932{
933    capability: Capability<C::Time>,
934    cursor: C,
935    batch: C::Storage,
936}
937
938impl<C> PendingWork<C>
939where
940    C: Cursor<KeyContainer: BatchContainer<Owned: PartialEq + Sized>>,
941{
942    /// Create a new bundle of pending work, from the capability, cursor, and backing storage.
943    fn new(capability: Capability<C::Time>, cursor: C, batch: C::Storage) -> Self {
944        Self {
945            capability,
946            cursor,
947            batch,
948        }
949    }
950    /// Perform roughly `fuel` work through the cursor, applying `logic` and sending results to `output`.
951    fn do_work<I, D, L>(
952        &mut self,
953        key: Option<&C::Key<'_>>,
954        logic: &mut L,
955        fuel: &mut usize,
956        output: &mut OutputBuilderSession<'_, C::Time, ConsolidatingContainerBuilder<Vec<I::Item>>>,
957    ) where
958        I: IntoIterator<Item = (D, C::Time, C::Diff)>,
959        D: Data,
960        L: FnMut(C::Key<'_>, C::Val<'_>, C::Time, C::Diff) -> I + 'static,
961    {
962        use differential_dataflow::consolidation::consolidate;
963
964        // Attempt to make progress on this batch.
965        let mut work: usize = 0;
966        let mut session = output.session_with_builder(&self.capability);
967        let mut buffer = Vec::new();
968        if let Some(key) = key {
969            let key = C::KeyContainer::reborrow(*key);
970            if self.cursor.get_key(&self.batch).map(|k| k == key) != Some(true) {
971                self.cursor.seek_key(&self.batch, key);
972            }
973            if self.cursor.get_key(&self.batch).map(|k| k == key) == Some(true) {
974                let key = self.cursor.key(&self.batch);
975                while let Some(val) = self.cursor.get_val(&self.batch) {
976                    self.cursor.map_times(&self.batch, |time, diff| {
977                        buffer.push((C::owned_time(time), C::owned_diff(diff)));
978                    });
979                    consolidate(&mut buffer);
980                    for (time, diff) in buffer.drain(..) {
981                        for datum in logic(key, val, time, diff) {
982                            session.give(datum);
983                            work += 1;
984                        }
985                    }
986                    self.cursor.step_val(&self.batch);
987                    if work >= *fuel {
988                        *fuel = 0;
989                        return;
990                    }
991                }
992            }
993        } else {
994            while let Some(key) = self.cursor.get_key(&self.batch) {
995                while let Some(val) = self.cursor.get_val(&self.batch) {
996                    self.cursor.map_times(&self.batch, |time, diff| {
997                        buffer.push((C::owned_time(time), C::owned_diff(diff)));
998                    });
999                    consolidate(&mut buffer);
1000                    for (time, diff) in buffer.drain(..) {
1001                        for datum in logic(key, val, time, diff) {
1002                            session.give(datum);
1003                            work += 1;
1004                        }
1005                    }
1006                    self.cursor.step_val(&self.batch);
1007                    if work >= *fuel {
1008                        *fuel = 0;
1009                        return;
1010                    }
1011                }
1012                self.cursor.step_key(&self.batch);
1013            }
1014        }
1015        *fuel -= work;
1016    }
1017}