Skip to main content

mz_compute/render/
context.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Management of dataflow-local state, like arrangements, while building a
11//! dataflow.
12
13use std::collections::BTreeMap;
14use std::rc::Rc;
15
16use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
17use differential_dataflow::operators::arrange::Arranged;
18use differential_dataflow::trace::implementations::BatchContainer;
19use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
20use differential_dataflow::{AsCollection, Data, VecCollection};
21use mz_compute_types::dataflows::DataflowDescription;
22use mz_compute_types::dyncfgs::ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION;
23use mz_compute_types::plan::AvailableCollections;
24use mz_dyncfg::ConfigSet;
25use mz_expr::{Id, MapFilterProject, MirScalarExpr};
26use mz_ore::soft_assert_or_log;
27use mz_repr::fixed_length::ToDatumIter;
28use mz_repr::{DatumVec, DatumVecBorrow, Diff, GlobalId, Row, RowArena, SharedRow};
29use mz_storage_types::controller::CollectionMetadata;
30use mz_storage_types::errors::DataflowError;
31use mz_timely_util::columnar::builder::ColumnBuilder;
32use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
33use mz_timely_util::operator::CollectionExt;
34use timely::container::CapacityContainerBuilder;
35use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
36use timely::dataflow::operators::Capability;
37use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
38use timely::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession};
39use timely::dataflow::scopes::Child;
40use timely::dataflow::{Scope, StreamVec};
41use timely::progress::timestamp::Refines;
42use timely::progress::{Antichain, Timestamp};
43
44use crate::compute_state::ComputeState;
45use crate::extensions::arrange::{KeyCollection, MzArrange, MzArrangeCore};
46use crate::render::errors::ErrorLogger;
47use crate::render::{LinearJoinSpec, RenderTimestamp};
48use crate::row_spine::{DatumSeq, RowRowBuilder};
49use crate::typedefs::{
50    ErrAgent, ErrBatcher, ErrBuilder, ErrEnter, ErrSpine, MzTimestamp, RowRowAgent, RowRowEnter,
51    RowRowSpine,
52};
53
54/// Dataflow-local collections and arrangements.
55///
56/// A context means to wrap available data assets and present them in an easy-to-use manner.
57/// These assets include dataflow-local collections and arrangements, as well as imported
58/// arrangements from outside the dataflow.
59///
60/// Context has two timestamp types, one from `S::Timestamp` and one from `T`, where the
61/// former must refine the latter. The former is the timestamp used by the scope in question,
62/// and the latter is the timestamp of imported traces. The two may be different in the case
63/// of regions or iteration.
64pub struct Context<S: Scope, T = mz_repr::Timestamp>
65where
66    T: MzTimestamp,
67    S::Timestamp: MzTimestamp + Refines<T>,
68{
69    /// The scope within which all managed collections exist.
70    ///
71    /// It is an error to add any collections not contained in this scope.
72    pub(crate) scope: S,
73    /// The debug name of the dataflow associated with this context.
74    pub debug_name: String,
75    /// The Timely ID of the dataflow associated with this context.
76    pub dataflow_id: usize,
77    /// The collection IDs of exports of the dataflow associated with this context.
78    pub export_ids: Vec<GlobalId>,
79    /// Frontier before which updates should not be emitted.
80    ///
81    /// We *must* apply it to sinks, to ensure correct outputs.
82    /// We *should* apply it to sources and imported traces, because it improves performance.
83    pub as_of_frontier: Antichain<T>,
84    /// Frontier after which updates should not be emitted.
85    /// Used to limit the amount of work done when appropriate.
86    pub until: Antichain<T>,
87    /// Bindings of identifiers to collections.
88    pub bindings: BTreeMap<Id, CollectionBundle<S, T>>,
89    /// The logger, from Timely's logging framework, if logs are enabled.
90    pub(super) compute_logger: Option<crate::logging::compute::Logger>,
91    /// Specification for rendering linear joins.
92    pub(super) linear_join_spec: LinearJoinSpec,
93    /// The expiration time for dataflows in this context. The output's frontier should never advance
94    /// past this frontier, except the empty frontier.
95    pub dataflow_expiration: Antichain<T>,
96    /// The config set for this context.
97    pub config_set: Rc<ConfigSet>,
98}
99
100impl<S: Scope> Context<S>
101where
102    S::Timestamp: MzTimestamp + Refines<mz_repr::Timestamp>,
103{
104    /// Creates a new empty Context.
105    pub fn for_dataflow_in<Plan>(
106        dataflow: &DataflowDescription<Plan, CollectionMetadata>,
107        scope: S,
108        compute_state: &ComputeState,
109        until: Antichain<mz_repr::Timestamp>,
110        dataflow_expiration: Antichain<mz_repr::Timestamp>,
111    ) -> Self {
112        use mz_ore::collections::CollectionExt as IteratorExt;
113        let dataflow_id = *scope.addr().into_first();
114        let as_of_frontier = dataflow
115            .as_of
116            .clone()
117            .unwrap_or_else(|| Antichain::from_elem(Timestamp::minimum()));
118
119        let export_ids = dataflow.export_ids().collect();
120
121        // Skip compute event logging for transient dataflows. We do this to avoid overhead for
122        // slow-path peeks, but it also affects subscribes. For now that seems fine, but we may
123        // want to reconsider in the future.
124        let compute_logger = if dataflow.is_transient() {
125            None
126        } else {
127            compute_state.compute_logger.clone()
128        };
129
130        Self {
131            scope,
132            debug_name: dataflow.debug_name.clone(),
133            dataflow_id,
134            export_ids,
135            as_of_frontier,
136            until,
137            bindings: BTreeMap::new(),
138            compute_logger,
139            linear_join_spec: compute_state.linear_join_spec,
140            dataflow_expiration,
141            config_set: Rc::clone(&compute_state.worker_config),
142        }
143    }
144}
145
146impl<S: Scope, T> Context<S, T>
147where
148    T: MzTimestamp,
149    S::Timestamp: MzTimestamp + Refines<T>,
150{
151    /// Insert a collection bundle by an identifier.
152    ///
153    /// This is expected to be used to install external collections (sources, indexes, other views),
154    /// as well as for `Let` bindings of local collections.
155    pub fn insert_id(
156        &mut self,
157        id: Id,
158        collection: CollectionBundle<S, T>,
159    ) -> Option<CollectionBundle<S, T>> {
160        self.bindings.insert(id, collection)
161    }
162    /// Remove a collection bundle by an identifier.
163    ///
164    /// The primary use of this method is uninstalling `Let` bindings.
165    pub fn remove_id(&mut self, id: Id) -> Option<CollectionBundle<S, T>> {
166        self.bindings.remove(&id)
167    }
168    /// Melds a collection bundle to whatever exists.
169    pub fn update_id(&mut self, id: Id, collection: CollectionBundle<S, T>) {
170        if !self.bindings.contains_key(&id) {
171            self.bindings.insert(id, collection);
172        } else {
173            let binding = self
174                .bindings
175                .get_mut(&id)
176                .expect("Binding verified to exist");
177            if collection.collection.is_some() {
178                binding.collection = collection.collection;
179            }
180            for (key, flavor) in collection.arranged.into_iter() {
181                binding.arranged.insert(key, flavor);
182            }
183        }
184    }
185    /// Look up a collection bundle by an identifier.
186    pub fn lookup_id(&self, id: Id) -> Option<CollectionBundle<S, T>> {
187        self.bindings.get(&id).cloned()
188    }
189
190    pub(super) fn error_logger(&self) -> ErrorLogger {
191        ErrorLogger::new(self.debug_name.clone())
192    }
193}
194
195impl<S: Scope, T> Context<S, T>
196where
197    T: MzTimestamp,
198    S::Timestamp: MzTimestamp + Refines<T>,
199{
200    /// Brings the underlying arrangements and collections into a region.
201    pub fn enter_region<'a>(
202        &self,
203        region: &Child<'a, S, S::Timestamp>,
204        bindings: Option<&std::collections::BTreeSet<Id>>,
205    ) -> Context<Child<'a, S, S::Timestamp>, T> {
206        let bindings = self
207            .bindings
208            .iter()
209            .filter(|(key, _)| bindings.as_ref().map(|b| b.contains(key)).unwrap_or(true))
210            .map(|(key, bundle)| (*key, bundle.enter_region(region)))
211            .collect();
212
213        Context {
214            scope: region.clone(),
215            debug_name: self.debug_name.clone(),
216            dataflow_id: self.dataflow_id.clone(),
217            export_ids: self.export_ids.clone(),
218            as_of_frontier: self.as_of_frontier.clone(),
219            until: self.until.clone(),
220            compute_logger: self.compute_logger.clone(),
221            linear_join_spec: self.linear_join_spec.clone(),
222            bindings,
223            dataflow_expiration: self.dataflow_expiration.clone(),
224            config_set: Rc::clone(&self.config_set),
225        }
226    }
227}
228
229/// Describes flavor of arrangement: local or imported trace.
230#[derive(Clone)]
231pub enum ArrangementFlavor<S: Scope, T = mz_repr::Timestamp>
232where
233    T: MzTimestamp,
234    S::Timestamp: MzTimestamp + Refines<T>,
235{
236    /// A dataflow-local arrangement.
237    Local(
238        Arranged<S, RowRowAgent<S::Timestamp, Diff>>,
239        Arranged<S, ErrAgent<S::Timestamp, Diff>>,
240    ),
241    /// An imported trace from outside the dataflow.
242    ///
243    /// The `GlobalId` identifier exists so that exports of this same trace
244    /// can refer back to and depend on the original instance.
245    Trace(
246        GlobalId,
247        Arranged<S, RowRowEnter<T, Diff, S::Timestamp>>,
248        Arranged<S, ErrEnter<T, S::Timestamp>>,
249    ),
250}
251
252impl<S: Scope, T> ArrangementFlavor<S, T>
253where
254    T: MzTimestamp,
255    S::Timestamp: MzTimestamp + Refines<T>,
256{
257    /// Presents `self` as a stream of updates.
258    ///
259    /// Deprecated: This function is not fueled and hence risks flattening the whole arrangement.
260    ///
261    /// This method presents the contents as they are, without further computation.
262    /// If you have logic that could be applied to each record, consider using the
263    /// `flat_map` methods which allows this and can reduce the work done.
264    #[deprecated(note = "Use `flat_map` instead.")]
265    pub fn as_collection(
266        &self,
267    ) -> (
268        VecCollection<S, Row, Diff>,
269        VecCollection<S, DataflowError, Diff>,
270    ) {
271        let mut datums = DatumVec::new();
272        let logic = move |k: DatumSeq, v: DatumSeq| {
273            let mut datums_borrow = datums.borrow();
274            datums_borrow.extend(k);
275            datums_borrow.extend(v);
276            SharedRow::pack(&**datums_borrow)
277        };
278        match &self {
279            ArrangementFlavor::Local(oks, errs) => (
280                oks.clone().as_collection(logic),
281                errs.clone().as_collection(|k, &()| k.clone()),
282            ),
283            ArrangementFlavor::Trace(_, oks, errs) => (
284                oks.clone().as_collection(logic),
285                errs.clone().as_collection(|k, &()| k.clone()),
286            ),
287        }
288    }
289
290    /// Constructs and applies logic to elements of `self` and returns the results.
291    ///
292    /// The `logic` receives a vector of datums, a timestamp, and a diff, and produces
293    /// an iterator of `(D, S::Timestamp, Diff)` updates.
294    ///
295    /// If `key` is set, this is a promise that `logic` will produce no results on
296    /// records for which the key does not evaluate to the value. This is used to
297    /// leap directly to exactly those records.
298    ///
299    /// The `max_demand` parameter limits the number of columns decoded from the
300    /// input. Only the first `max_demand` columns are decoded. Pass `usize::MAX` to
301    /// decode all columns.
302    pub fn flat_map<D, I, L>(
303        &self,
304        key: Option<&Row>,
305        max_demand: usize,
306        mut logic: L,
307    ) -> (StreamVec<S, I::Item>, VecCollection<S, DataflowError, Diff>)
308    where
309        I: IntoIterator<Item = (D, S::Timestamp, Diff)>,
310        D: Data,
311        L: for<'a, 'b> FnMut(&'a mut DatumVecBorrow<'b>, S::Timestamp, Diff) -> I + 'static,
312    {
313        // Set a number of tuples after which the operator should yield.
314        // This allows us to remain responsive even when enumerating a substantial
315        // arrangement, as well as provides time to accumulate our produced output.
316        let refuel = 1000000;
317
318        let mut datums = DatumVec::new();
319        let logic = move |k: DatumSeq, v: DatumSeq, t, d| {
320            let mut datums_borrow = datums.borrow();
321            datums_borrow.extend(k.to_datum_iter().take(max_demand));
322            let max_demand = max_demand.saturating_sub(datums_borrow.len());
323            datums_borrow.extend(v.to_datum_iter().take(max_demand));
324            logic(&mut datums_borrow, t, d)
325        };
326
327        match &self {
328            ArrangementFlavor::Local(oks, errs) => {
329                let oks = CollectionBundle::<S, T>::flat_map_core(oks.clone(), key, logic, refuel);
330                let errs = errs.clone().as_collection(|k, &()| k.clone());
331                (oks, errs)
332            }
333            ArrangementFlavor::Trace(_, oks, errs) => {
334                let oks = CollectionBundle::<S, T>::flat_map_core(oks.clone(), key, logic, refuel);
335                let errs = errs.clone().as_collection(|k, &()| k.clone());
336                (oks, errs)
337            }
338        }
339    }
340}
341impl<S: Scope, T> ArrangementFlavor<S, T>
342where
343    T: MzTimestamp,
344    S::Timestamp: MzTimestamp + Refines<T>,
345{
346    /// The scope containing the collection bundle.
347    pub fn scope(&self) -> S {
348        match self {
349            ArrangementFlavor::Local(oks, _errs) => oks.stream.scope(),
350            ArrangementFlavor::Trace(_gid, oks, _errs) => oks.stream.scope(),
351        }
352    }
353
354    /// Brings the arrangement flavor into a region.
355    pub fn enter_region<'a>(
356        &self,
357        region: &Child<'a, S, S::Timestamp>,
358    ) -> ArrangementFlavor<Child<'a, S, S::Timestamp>, T> {
359        match self {
360            ArrangementFlavor::Local(oks, errs) => ArrangementFlavor::Local(
361                oks.clone().enter_region(region),
362                errs.clone().enter_region(region),
363            ),
364            ArrangementFlavor::Trace(gid, oks, errs) => ArrangementFlavor::Trace(
365                *gid,
366                oks.clone().enter_region(region),
367                errs.clone().enter_region(region),
368            ),
369        }
370    }
371}
372impl<'a, S: Scope, T> ArrangementFlavor<Child<'a, S, S::Timestamp>, T>
373where
374    T: MzTimestamp,
375    S::Timestamp: MzTimestamp + Refines<T>,
376{
377    /// Extracts the arrangement flavor from a region.
378    pub fn leave_region(&self) -> ArrangementFlavor<S, T> {
379        match self {
380            ArrangementFlavor::Local(oks, errs) => {
381                ArrangementFlavor::Local(oks.clone().leave_region(), errs.clone().leave_region())
382            }
383            ArrangementFlavor::Trace(gid, oks, errs) => ArrangementFlavor::Trace(
384                *gid,
385                oks.clone().leave_region(),
386                errs.clone().leave_region(),
387            ),
388        }
389    }
390}
391
392/// A bundle of the various ways a collection can be represented.
393///
394/// This type maintains the invariant that it does contain at least one valid
395/// source of data, either a collection or at least one arrangement.
396#[derive(Clone)]
397pub struct CollectionBundle<S: Scope, T = mz_repr::Timestamp>
398where
399    T: MzTimestamp,
400    S::Timestamp: MzTimestamp + Refines<T>,
401{
402    pub collection: Option<(
403        VecCollection<S, Row, Diff>,
404        VecCollection<S, DataflowError, Diff>,
405    )>,
406    pub arranged: BTreeMap<Vec<MirScalarExpr>, ArrangementFlavor<S, T>>,
407}
408
409impl<S: Scope, T> CollectionBundle<S, T>
410where
411    T: MzTimestamp,
412    S::Timestamp: MzTimestamp + Refines<T>,
413{
414    /// Construct a new collection bundle from update streams.
415    pub fn from_collections(
416        oks: VecCollection<S, Row, Diff>,
417        errs: VecCollection<S, DataflowError, Diff>,
418    ) -> Self {
419        Self {
420            collection: Some((oks, errs)),
421            arranged: BTreeMap::default(),
422        }
423    }
424
425    /// Inserts arrangements by the expressions on which they are keyed.
426    pub fn from_expressions(
427        exprs: Vec<MirScalarExpr>,
428        arrangements: ArrangementFlavor<S, T>,
429    ) -> Self {
430        let mut arranged = BTreeMap::new();
431        arranged.insert(exprs, arrangements);
432        Self {
433            collection: None,
434            arranged,
435        }
436    }
437
438    /// Inserts arrangements by the columns on which they are keyed.
439    pub fn from_columns<I: IntoIterator<Item = usize>>(
440        columns: I,
441        arrangements: ArrangementFlavor<S, T>,
442    ) -> Self {
443        let mut keys = Vec::new();
444        for column in columns {
445            keys.push(MirScalarExpr::column(column));
446        }
447        Self::from_expressions(keys, arrangements)
448    }
449
450    /// The scope containing the collection bundle.
451    pub fn scope(&self) -> S {
452        if let Some((oks, _errs)) = &self.collection {
453            oks.inner.scope()
454        } else {
455            self.arranged
456                .values()
457                .next()
458                .expect("Must contain a valid collection")
459                .scope()
460        }
461    }
462
463    /// Brings the collection bundle into a region.
464    pub fn enter_region<'a>(
465        &self,
466        region: &Child<'a, S, S::Timestamp>,
467    ) -> CollectionBundle<Child<'a, S, S::Timestamp>, T> {
468        CollectionBundle {
469            collection: self.collection.as_ref().map(|(oks, errs)| {
470                (
471                    oks.clone().enter_region(region),
472                    errs.clone().enter_region(region),
473                )
474            }),
475            arranged: self
476                .arranged
477                .iter()
478                .map(|(key, bundle)| (key.clone(), bundle.enter_region(region)))
479                .collect(),
480        }
481    }
482}
483
484impl<'a, S: Scope, T> CollectionBundle<Child<'a, S, S::Timestamp>, T>
485where
486    T: MzTimestamp,
487    S::Timestamp: MzTimestamp + Refines<T>,
488{
489    /// Extracts the collection bundle from a region.
490    pub fn leave_region(&self) -> CollectionBundle<S, T> {
491        CollectionBundle {
492            collection: self
493                .collection
494                .as_ref()
495                .map(|(oks, errs)| (oks.clone().leave_region(), errs.clone().leave_region())),
496            arranged: self
497                .arranged
498                .iter()
499                .map(|(key, bundle)| (key.clone(), bundle.leave_region()))
500                .collect(),
501        }
502    }
503}
504
505impl<S: Scope, T> CollectionBundle<S, T>
506where
507    T: MzTimestamp,
508    S::Timestamp: MzTimestamp + Refines<T>,
509{
510    /// Asserts that the arrangement for a specific key
511    /// (or the raw collection for no key) exists,
512    /// and returns the corresponding collection.
513    ///
514    /// This returns the collection as-is, without
515    /// doing any unthinning transformation.
516    /// Therefore, it should be used when the appropriate transformation
517    /// was planned as part of a following MFP.
518    ///
519    /// If `key` is specified, the function converts the arrangement to a collection. It uses either
520    /// the fueled `flat_map` or `as_collection` method, depending on the flag
521    /// [`ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION`].
522    pub fn as_specific_collection(
523        &self,
524        key: Option<&[MirScalarExpr]>,
525        config_set: &ConfigSet,
526    ) -> (
527        VecCollection<S, Row, Diff>,
528        VecCollection<S, DataflowError, Diff>,
529    ) {
530        // Any operator that uses this method was told to use a particular
531        // collection during LIR planning, where we should have made
532        // sure that that collection exists.
533        //
534        // If it doesn't, we panic.
535        match key {
536            None => self
537                .collection
538                .clone()
539                .expect("The unarranged collection doesn't exist."),
540            Some(key) => {
541                let arranged = self.arranged.get(key).unwrap_or_else(|| {
542                    panic!("The collection arranged by {:?} doesn't exist.", key)
543                });
544                if ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION.get(config_set) {
545                    // Decode all columns, pass max_demand as usize::MAX.
546                    let (ok, err) = arranged.flat_map(None, usize::MAX, |borrow, t, r| {
547                        Some((SharedRow::pack(borrow.iter()), t, r))
548                    });
549                    (ok.as_collection(), err)
550                } else {
551                    #[allow(deprecated)]
552                    arranged.as_collection()
553                }
554            }
555        }
556    }
557
558    /// Constructs and applies logic to elements of a collection and returns the results.
559    ///
560    /// The function applies `logic` on elements. The logic conceptually receives
561    /// `(&Row, &Row)` pairs in the form of a datum vec in the expected order.
562    ///
563    /// If `key_val` is set, this is a promise that `logic` will produce no results on
564    /// records for which the key does not evaluate to the value. This is used when we
565    /// have an arrangement by that key to leap directly to exactly those records.
566    /// It is important that `logic` still guard against data that does not satisfy
567    /// this constraint, as this method does not statically know that it will have
568    /// that arrangement.
569    ///
570    /// The `max_demand` parameter limits the number of columns decoded from the
571    /// input. Only the first `max_demand` columns are decoded. Pass `usize::MAX` to
572    /// decode all columns.
573    pub fn flat_map<D, I, L>(
574        &self,
575        key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
576        max_demand: usize,
577        mut logic: L,
578    ) -> (StreamVec<S, I::Item>, VecCollection<S, DataflowError, Diff>)
579    where
580        I: IntoIterator<Item = (D, S::Timestamp, Diff)>,
581        D: Data,
582        L: for<'a> FnMut(&'a mut DatumVecBorrow<'_>, S::Timestamp, Diff) -> I + 'static,
583    {
584        // If `key_val` is set, we should have to use the corresponding arrangement.
585        // If there isn't one, that implies an error in the contract between
586        // key-production and available arrangements.
587        if let Some((key, val)) = key_val {
588            self.arrangement(&key)
589                .expect("Should have ensured during planning that this arrangement exists.")
590                .flat_map(val.as_ref(), max_demand, logic)
591        } else {
592            use timely::dataflow::operators::vec::Map;
593            let (oks, errs) = self
594                .collection
595                .clone()
596                .expect("Invariant violated: CollectionBundle contains no collection.");
597            let mut datums = DatumVec::new();
598            let oks = oks.inner.flat_map(move |(v, t, d)| {
599                logic(&mut datums.borrow_with_limit(&v, max_demand), t, d)
600            });
601            (oks, errs)
602        }
603    }
604
605    /// Factored out common logic for using literal keys in general traces.
606    ///
607    /// This logic is sufficiently interesting that we want to write it only
608    /// once, and thereby avoid any skew in the two uses of the logic.
609    ///
610    /// The function presents the contents of the trace as `(key, value, time, delta)` tuples,
611    /// where key and value are potentially specialized, but convertible into rows.
612    fn flat_map_core<Tr, D, I, L>(
613        trace: Arranged<S, Tr>,
614        key: Option<&Tr::KeyOwn>,
615        mut logic: L,
616        refuel: usize,
617    ) -> StreamVec<S, I::Item>
618    where
619        Tr: for<'a> TraceReader<
620                Key<'a>: ToDatumIter,
621                KeyOwn: PartialEq,
622                Val<'a>: ToDatumIter,
623                Time = S::Timestamp,
624                Diff = mz_repr::Diff,
625            > + Clone
626            + 'static,
627        I: IntoIterator<Item = (D, Tr::Time, Tr::Diff)>,
628        D: Data,
629        L: FnMut(Tr::Key<'_>, Tr::Val<'_>, S::Timestamp, mz_repr::Diff) -> I + 'static,
630    {
631        use differential_dataflow::consolidation::ConsolidatingContainerBuilder as CB;
632        let scope = trace.stream.scope();
633
634        let mut key_con = Tr::KeyContainer::with_capacity(1);
635        if let Some(key) = &key {
636            key_con.push_own(key);
637        }
638        let mode = if key.is_some() { "index" } else { "scan" };
639        let name = format!("ArrangementFlatMap({})", mode);
640        use timely::dataflow::operators::Operator;
641        trace
642            .stream
643            .unary::<CB<_>, _, _, _>(Pipeline, &name, move |_, info| {
644                // Acquire an activator to reschedule the operator when it has unfinished work.
645                let activator = scope.activator_for(info.address);
646                // Maintain a list of work to do, cursor to navigate and process.
647                let mut todo = std::collections::VecDeque::new();
648                move |input, output| {
649                    let key = key_con.get(0);
650                    // First, dequeue all batches.
651                    input.for_each(|time, data| {
652                        let capability = time.retain(0);
653                        for batch in data.iter() {
654                            // enqueue a capability, cursor, and batch.
655                            todo.push_back(PendingWork::new(
656                                capability.clone(),
657                                batch.cursor(),
658                                batch.clone(),
659                            ));
660                        }
661                    });
662
663                    // Second, make progress on `todo`.
664                    let mut fuel = refuel;
665                    while !todo.is_empty() && fuel > 0 {
666                        todo.front_mut().unwrap().do_work(
667                            key.as_ref(),
668                            &mut logic,
669                            &mut fuel,
670                            output,
671                        );
672                        if fuel > 0 {
673                            todo.pop_front();
674                        }
675                    }
676                    // If we have not finished all work, re-activate the operator.
677                    if !todo.is_empty() {
678                        activator.activate();
679                    }
680                }
681            })
682    }
683
684    /// Look up an arrangement by the expressions that form the key.
685    ///
686    /// The result may be `None` if no such arrangement exists, or it may be one of many
687    /// "arrangement flavors" that represent the types of arranged data we might have.
688    pub fn arrangement(&self, key: &[MirScalarExpr]) -> Option<ArrangementFlavor<S, T>> {
689        self.arranged.get(key).map(|x| x.clone())
690    }
691}
692
693impl<S, T> CollectionBundle<S, T>
694where
695    T: MzTimestamp,
696    S: Scope,
697    S::Timestamp: Refines<T> + RenderTimestamp,
698{
699    /// Presents `self` as a stream of updates, having been subjected to `mfp`.
700    ///
701    /// This operator is able to apply the logic of `mfp` early, which can substantially
702    /// reduce the amount of data produced when `mfp` is non-trivial.
703    ///
704    /// The `key_val` argument, when present, indicates that a specific arrangement should
705    /// be used, and if, in addition, the `val` component is present,
706    /// that we can seek to the supplied row.
707    pub fn as_collection_core(
708        &self,
709        mut mfp: MapFilterProject,
710        key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
711        until: Antichain<mz_repr::Timestamp>,
712        config_set: &ConfigSet,
713    ) -> (
714        VecCollection<S, mz_repr::Row, Diff>,
715        VecCollection<S, DataflowError, Diff>,
716    ) {
717        mfp.optimize();
718        let mfp_plan = mfp.clone().into_plan().unwrap();
719
720        // If the MFP is trivial, we can just call `as_collection`.
721        // In the case that we weren't going to apply the `key_val` optimization,
722        // this path results in a slightly smaller and faster
723        // dataflow graph, and is intended to fix
724        // https://github.com/MaterializeInc/database-issues/issues/3111
725        let has_key_val = if let Some((_key, Some(_val))) = &key_val {
726            true
727        } else {
728            false
729        };
730
731        if mfp_plan.is_identity() && !has_key_val {
732            let key = key_val.map(|(k, _v)| k);
733            return self.as_specific_collection(key.as_deref(), config_set);
734        }
735
736        let max_demand = mfp.demand().iter().max().map(|x| *x + 1).unwrap_or(0);
737        mfp.permute_fn(|c| c, max_demand);
738        mfp.optimize();
739        let mfp_plan = mfp.into_plan().unwrap();
740
741        let mut datum_vec = DatumVec::new();
742        // Wrap in an `Rc` so that lifetimes work out.
743        let until = std::rc::Rc::new(until);
744
745        let (stream, errors) = self.flat_map(key_val, max_demand, move |row_datums, time, diff| {
746            let mut row_builder = SharedRow::get();
747            let until = std::rc::Rc::clone(&until);
748            let temp_storage = RowArena::new();
749            let row_iter = row_datums.iter();
750            let mut datums_local = datum_vec.borrow();
751            datums_local.extend(row_iter);
752            let time = time.clone();
753            let event_time = time.event_time();
754            mfp_plan
755                .evaluate(
756                    &mut datums_local,
757                    &temp_storage,
758                    event_time,
759                    diff.clone(),
760                    move |time| !until.less_equal(time),
761                    &mut row_builder,
762                )
763                .map(move |x| match x {
764                    Ok((row, event_time, diff)) => {
765                        // Copy the whole time, and re-populate event time.
766                        let mut time: S::Timestamp = time.clone();
767                        *time.event_time_mut() = event_time;
768                        (Ok(row), time, diff)
769                    }
770                    Err((e, event_time, diff)) => {
771                        // Copy the whole time, and re-populate event time.
772                        let mut time: S::Timestamp = time.clone();
773                        *time.event_time_mut() = event_time;
774                        (Err(e), time, diff)
775                    }
776                })
777        });
778
779        use differential_dataflow::AsCollection;
780        let (oks, errs) = stream
781            .as_collection()
782            .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>(
783                "OkErr",
784                |x| x,
785            );
786
787        (oks, errors.concat(errs))
788    }
789    pub fn ensure_collections(
790        mut self,
791        collections: AvailableCollections,
792        input_key: Option<Vec<MirScalarExpr>>,
793        input_mfp: MapFilterProject,
794        until: Antichain<mz_repr::Timestamp>,
795        config_set: &ConfigSet,
796    ) -> Self {
797        if collections == Default::default() {
798            return self;
799        }
800        // Cache collection to avoid reforming it each time.
801        //
802        // TODO(mcsherry): In theory this could be faster run out of another arrangement,
803        // as the `map_fallible` that follows could be run against an arrangement itself.
804        //
805        // Note(btv): If we ever do that, we would then only need to make the raw collection here
806        // if `collections.raw` is true.
807
808        for (key, _, _) in collections.arranged.iter() {
809            soft_assert_or_log!(
810                !self.arranged.contains_key(key),
811                "LIR ArrangeBy tried to create an existing arrangement"
812            );
813        }
814
815        // We need the collection if either (1) it is explicitly demanded, or (2) we are going to render any arrangement
816        let form_raw_collection = collections.raw
817            || collections
818                .arranged
819                .iter()
820                .any(|(key, _, _)| !self.arranged.contains_key(key));
821        if form_raw_collection && self.collection.is_none() {
822            self.collection = Some(self.as_collection_core(
823                input_mfp,
824                input_key.map(|k| (k, None)),
825                until,
826                config_set,
827            ));
828        }
829        for (key, _, thinning) in collections.arranged {
830            if !self.arranged.contains_key(&key) {
831                // TODO: Consider allowing more expressive names.
832                let name = format!("ArrangeBy[{:?}]", key);
833
834                let (oks, errs) = self
835                    .collection
836                    .take()
837                    .expect("Collection constructed above");
838                let (oks, errs_keyed, passthrough) =
839                    Self::arrange_collection(&name, oks, key.clone(), thinning.clone());
840                let errs_concat: KeyCollection<_, _, _> = errs.clone().concat(errs_keyed).into();
841                self.collection = Some((passthrough, errs));
842                let errs =
843                    errs_concat.mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, ErrSpine<_, _>>(
844                        &format!("{}-errors", name),
845                    );
846                self.arranged
847                    .insert(key, ArrangementFlavor::Local(oks, errs));
848            }
849        }
850        self
851    }
852
853    /// Builds an arrangement from a collection, using the specified key and value thinning.
854    ///
855    /// The arrangement's key is based on the `key` expressions, and the value the input with
856    /// the `thinning` applied to it. It selects which of the input columns are included in the
857    /// value of the arrangement. The thinning is in support of permuting arrangements such that
858    /// columns in the key are not included in the value.
859    ///
860    /// In addition to the ok and err streams, we produce a passthrough stream that forwards
861    /// the input as-is, which allows downstream consumers to reuse the collection without
862    /// teeing the stream.
863    fn arrange_collection(
864        name: &String,
865        oks: VecCollection<S, Row, Diff>,
866        key: Vec<MirScalarExpr>,
867        thinning: Vec<usize>,
868    ) -> (
869        Arranged<S, RowRowAgent<S::Timestamp, Diff>>,
870        VecCollection<S, DataflowError, Diff>,
871        VecCollection<S, Row, Diff>,
872    ) {
873        // This operator implements a `map_fallible`, but produces columnar updates for the ok
874        // stream. The `map_fallible` cannot be used here because the closure cannot return
875        // references, which is what we need to push into columnar streams. Instead, we use a
876        // bespoke operator that also optimizes reuse of allocations across individual updates.
877        let mut builder = OperatorBuilder::new("FormArrangementKey".to_string(), oks.inner.scope());
878        let (ok_output, ok_stream) = builder.new_output();
879        let mut ok_output =
880            OutputBuilder::<_, ColumnBuilder<((Row, Row), S::Timestamp, Diff)>>::from(ok_output);
881        let (err_output, err_stream) = builder.new_output();
882        let mut err_output = OutputBuilder::from(err_output);
883        let (passthrough_output, passthrough_stream) = builder.new_output();
884        let mut passthrough_output = OutputBuilder::from(passthrough_output);
885        let mut input = builder.new_input(oks.inner, Pipeline);
886        builder.set_notify(false);
887        builder.build(move |_capabilities| {
888            let mut key_buf = Row::default();
889            let mut val_buf = Row::default();
890            let mut datums = DatumVec::new();
891            let mut temp_storage = RowArena::new();
892            move |_frontiers| {
893                let mut ok_output = ok_output.activate();
894                let mut err_output = err_output.activate();
895                let mut passthrough_output = passthrough_output.activate();
896                input.for_each(|time, data| {
897                    let mut ok_session = ok_output.session_with_builder(&time);
898                    let mut err_session = err_output.session(&time);
899                    for (row, time, diff) in data.iter() {
900                        temp_storage.clear();
901                        let datums = datums.borrow_with(row);
902                        let key_iter = key.iter().map(|k| k.eval(&datums, &temp_storage));
903                        match key_buf.packer().try_extend(key_iter) {
904                            Ok(()) => {
905                                let val_datum_iter = thinning.iter().map(|c| datums[*c]);
906                                val_buf.packer().extend(val_datum_iter);
907                                ok_session.give(((&*key_buf, &*val_buf), time, diff));
908                            }
909                            Err(e) => {
910                                err_session.give((e.into(), time.clone(), *diff));
911                            }
912                        }
913                    }
914                    passthrough_output.session(&time).give_container(data);
915                });
916            }
917        });
918
919        let oks = ok_stream
920            .mz_arrange_core::<
921                _,
922                Col2ValBatcher<_, _, _, _>,
923                RowRowBuilder<_, _>,
924                RowRowSpine<_, _>,
925            >(
926                ExchangeCore::<ColumnBuilder<_>, _>::new_core(
927                    columnar_exchange::<Row, Row, S::Timestamp, Diff>,
928                ),
929                name
930            );
931        (
932            oks,
933            err_stream.as_collection(),
934            passthrough_stream.as_collection(),
935        )
936    }
937}
938
939struct PendingWork<C>
940where
941    C: Cursor,
942{
943    capability: Capability<C::Time>,
944    cursor: C,
945    batch: C::Storage,
946}
947
948impl<C> PendingWork<C>
949where
950    C: Cursor<KeyOwn: PartialEq + Sized>,
951{
952    /// Create a new bundle of pending work, from the capability, cursor, and backing storage.
953    fn new(capability: Capability<C::Time>, cursor: C, batch: C::Storage) -> Self {
954        Self {
955            capability,
956            cursor,
957            batch,
958        }
959    }
960    /// Perform roughly `fuel` work through the cursor, applying `logic` and sending results to `output`.
961    fn do_work<I, D, L>(
962        &mut self,
963        key: Option<&C::Key<'_>>,
964        logic: &mut L,
965        fuel: &mut usize,
966        output: &mut OutputBuilderSession<'_, C::Time, ConsolidatingContainerBuilder<Vec<I::Item>>>,
967    ) where
968        I: IntoIterator<Item = (D, C::Time, C::Diff)>,
969        D: Data,
970        L: FnMut(C::Key<'_>, C::Val<'_>, C::Time, C::Diff) -> I + 'static,
971    {
972        use differential_dataflow::consolidation::consolidate;
973
974        // Attempt to make progress on this batch.
975        let mut work: usize = 0;
976        let mut session = output.session_with_builder(&self.capability);
977        let mut buffer = Vec::new();
978        if let Some(key) = key {
979            let key = C::KeyContainer::reborrow(*key);
980            if self.cursor.get_key(&self.batch).map(|k| k == key) != Some(true) {
981                self.cursor.seek_key(&self.batch, key);
982            }
983            if self.cursor.get_key(&self.batch).map(|k| k == key) == Some(true) {
984                let key = self.cursor.key(&self.batch);
985                while let Some(val) = self.cursor.get_val(&self.batch) {
986                    self.cursor.map_times(&self.batch, |time, diff| {
987                        buffer.push((C::owned_time(time), C::owned_diff(diff)));
988                    });
989                    consolidate(&mut buffer);
990                    for (time, diff) in buffer.drain(..) {
991                        for datum in logic(key, val, time, diff) {
992                            session.give(datum);
993                            work += 1;
994                        }
995                    }
996                    self.cursor.step_val(&self.batch);
997                    if work >= *fuel {
998                        *fuel = 0;
999                        return;
1000                    }
1001                }
1002            }
1003        } else {
1004            while let Some(key) = self.cursor.get_key(&self.batch) {
1005                while let Some(val) = self.cursor.get_val(&self.batch) {
1006                    self.cursor.map_times(&self.batch, |time, diff| {
1007                        buffer.push((C::owned_time(time), C::owned_diff(diff)));
1008                    });
1009                    consolidate(&mut buffer);
1010                    for (time, diff) in buffer.drain(..) {
1011                        for datum in logic(key, val, time, diff) {
1012                            session.give(datum);
1013                            work += 1;
1014                        }
1015                    }
1016                    self.cursor.step_val(&self.batch);
1017                    if work >= *fuel {
1018                        *fuel = 0;
1019                        return;
1020                    }
1021                }
1022                self.cursor.step_key(&self.batch);
1023            }
1024        }
1025        *fuel -= work;
1026    }
1027}