Skip to main content

mz_compute/render/
context.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Management of dataflow-local state, like arrangements, while building a
11//! dataflow.
12
13use std::collections::BTreeMap;
14use std::rc::Rc;
15
16use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
17use differential_dataflow::operators::arrange::Arranged;
18use differential_dataflow::trace::implementations::BatchContainer;
19use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
20use differential_dataflow::{AsCollection, Data, VecCollection};
21use mz_compute_types::dataflows::DataflowDescription;
22use mz_compute_types::dyncfgs::ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION;
23use mz_compute_types::plan::AvailableCollections;
24use mz_dyncfg::ConfigSet;
25use mz_expr::{Id, MapFilterProject, MirScalarExpr};
26use mz_ore::soft_assert_or_log;
27use mz_repr::fixed_length::ToDatumIter;
28use mz_repr::{DatumVec, DatumVecBorrow, Diff, GlobalId, Row, RowArena, SharedRow};
29use mz_storage_types::controller::CollectionMetadata;
30use mz_storage_types::errors::DataflowError;
31use mz_timely_util::columnar::builder::ColumnBuilder;
32use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
33use mz_timely_util::operator::CollectionExt;
34use timely::container::CapacityContainerBuilder;
35use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
36use timely::dataflow::operators::Capability;
37use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
38use timely::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession};
39use timely::dataflow::scopes::Child;
40use timely::dataflow::{Scope, StreamVec};
41use timely::progress::operate::FrontierInterest;
42use timely::progress::timestamp::Refines;
43use timely::progress::{Antichain, Timestamp};
44
45use crate::compute_state::ComputeState;
46use crate::extensions::arrange::{KeyCollection, MzArrange, MzArrangeCore};
47use crate::render::errors::ErrorLogger;
48use crate::render::{LinearJoinSpec, RenderTimestamp};
49use crate::row_spine::{DatumSeq, RowRowBuilder};
50use crate::typedefs::{
51    ErrAgent, ErrBatcher, ErrBuilder, ErrEnter, ErrSpine, MzTimestamp, RowRowAgent, RowRowEnter,
52    RowRowSpine,
53};
54
55/// Dataflow-local collections and arrangements.
56///
57/// A context means to wrap available data assets and present them in an easy-to-use manner.
58/// These assets include dataflow-local collections and arrangements, as well as imported
59/// arrangements from outside the dataflow.
60///
61/// Context has two timestamp types, one from `S::Timestamp` and one from `T`, where the
62/// former must refine the latter. The former is the timestamp used by the scope in question,
63/// and the latter is the timestamp of imported traces. The two may be different in the case
64/// of regions or iteration.
65pub struct Context<S: Scope, T = mz_repr::Timestamp>
66where
67    T: MzTimestamp,
68    S::Timestamp: MzTimestamp + Refines<T>,
69{
70    /// The scope within which all managed collections exist.
71    ///
72    /// It is an error to add any collections not contained in this scope.
73    pub(crate) scope: S,
74    /// The debug name of the dataflow associated with this context.
75    pub debug_name: String,
76    /// The Timely ID of the dataflow associated with this context.
77    pub dataflow_id: usize,
78    /// The collection IDs of exports of the dataflow associated with this context.
79    pub export_ids: Vec<GlobalId>,
80    /// Frontier before which updates should not be emitted.
81    ///
82    /// We *must* apply it to sinks, to ensure correct outputs.
83    /// We *should* apply it to sources and imported traces, because it improves performance.
84    pub as_of_frontier: Antichain<T>,
85    /// Frontier after which updates should not be emitted.
86    /// Used to limit the amount of work done when appropriate.
87    pub until: Antichain<T>,
88    /// Bindings of identifiers to collections.
89    pub bindings: BTreeMap<Id, CollectionBundle<S, T>>,
90    /// The logger, from Timely's logging framework, if logs are enabled.
91    pub(super) compute_logger: Option<crate::logging::compute::Logger>,
92    /// Specification for rendering linear joins.
93    pub(super) linear_join_spec: LinearJoinSpec,
94    /// The expiration time for dataflows in this context. The output's frontier should never advance
95    /// past this frontier, except the empty frontier.
96    pub dataflow_expiration: Antichain<T>,
97    /// The config set for this context.
98    pub config_set: Rc<ConfigSet>,
99}
100
101impl<S: Scope> Context<S>
102where
103    S::Timestamp: MzTimestamp + Refines<mz_repr::Timestamp>,
104{
105    /// Creates a new empty Context.
106    pub fn for_dataflow_in<Plan>(
107        dataflow: &DataflowDescription<Plan, CollectionMetadata>,
108        scope: S,
109        compute_state: &ComputeState,
110        until: Antichain<mz_repr::Timestamp>,
111        dataflow_expiration: Antichain<mz_repr::Timestamp>,
112    ) -> Self {
113        use mz_ore::collections::CollectionExt as IteratorExt;
114        let dataflow_id = *scope.addr().into_first();
115        let as_of_frontier = dataflow
116            .as_of
117            .clone()
118            .unwrap_or_else(|| Antichain::from_elem(Timestamp::minimum()));
119
120        let export_ids = dataflow.export_ids().collect();
121
122        // Skip compute event logging for transient dataflows. We do this to avoid overhead for
123        // slow-path peeks, but it also affects subscribes. For now that seems fine, but we may
124        // want to reconsider in the future.
125        let compute_logger = if dataflow.is_transient() {
126            None
127        } else {
128            compute_state.compute_logger.clone()
129        };
130
131        Self {
132            scope,
133            debug_name: dataflow.debug_name.clone(),
134            dataflow_id,
135            export_ids,
136            as_of_frontier,
137            until,
138            bindings: BTreeMap::new(),
139            compute_logger,
140            linear_join_spec: compute_state.linear_join_spec,
141            dataflow_expiration,
142            config_set: Rc::clone(&compute_state.worker_config),
143        }
144    }
145}
146
147impl<S: Scope, T> Context<S, T>
148where
149    T: MzTimestamp,
150    S::Timestamp: MzTimestamp + Refines<T>,
151{
152    /// Insert a collection bundle by an identifier.
153    ///
154    /// This is expected to be used to install external collections (sources, indexes, other views),
155    /// as well as for `Let` bindings of local collections.
156    pub fn insert_id(
157        &mut self,
158        id: Id,
159        collection: CollectionBundle<S, T>,
160    ) -> Option<CollectionBundle<S, T>> {
161        self.bindings.insert(id, collection)
162    }
163    /// Remove a collection bundle by an identifier.
164    ///
165    /// The primary use of this method is uninstalling `Let` bindings.
166    pub fn remove_id(&mut self, id: Id) -> Option<CollectionBundle<S, T>> {
167        self.bindings.remove(&id)
168    }
169    /// Melds a collection bundle to whatever exists.
170    pub fn update_id(&mut self, id: Id, collection: CollectionBundle<S, T>) {
171        if !self.bindings.contains_key(&id) {
172            self.bindings.insert(id, collection);
173        } else {
174            let binding = self
175                .bindings
176                .get_mut(&id)
177                .expect("Binding verified to exist");
178            if collection.collection.is_some() {
179                binding.collection = collection.collection;
180            }
181            for (key, flavor) in collection.arranged.into_iter() {
182                binding.arranged.insert(key, flavor);
183            }
184        }
185    }
186    /// Look up a collection bundle by an identifier.
187    pub fn lookup_id(&self, id: Id) -> Option<CollectionBundle<S, T>> {
188        self.bindings.get(&id).cloned()
189    }
190
191    pub(super) fn error_logger(&self) -> ErrorLogger {
192        ErrorLogger::new(self.debug_name.clone())
193    }
194}
195
196impl<S: Scope, T> Context<S, T>
197where
198    T: MzTimestamp,
199    S::Timestamp: MzTimestamp + Refines<T>,
200{
201    /// Brings the underlying arrangements and collections into a region.
202    pub fn enter_region<'a>(
203        &self,
204        region: &Child<'a, S, S::Timestamp>,
205        bindings: Option<&std::collections::BTreeSet<Id>>,
206    ) -> Context<Child<'a, S, S::Timestamp>, T> {
207        let bindings = self
208            .bindings
209            .iter()
210            .filter(|(key, _)| bindings.as_ref().map(|b| b.contains(key)).unwrap_or(true))
211            .map(|(key, bundle)| (*key, bundle.enter_region(region)))
212            .collect();
213
214        Context {
215            scope: region.clone(),
216            debug_name: self.debug_name.clone(),
217            dataflow_id: self.dataflow_id.clone(),
218            export_ids: self.export_ids.clone(),
219            as_of_frontier: self.as_of_frontier.clone(),
220            until: self.until.clone(),
221            compute_logger: self.compute_logger.clone(),
222            linear_join_spec: self.linear_join_spec.clone(),
223            bindings,
224            dataflow_expiration: self.dataflow_expiration.clone(),
225            config_set: Rc::clone(&self.config_set),
226        }
227    }
228}
229
230/// Describes flavor of arrangement: local or imported trace.
231#[derive(Clone)]
232pub enum ArrangementFlavor<S: Scope, T = mz_repr::Timestamp>
233where
234    T: MzTimestamp,
235    S::Timestamp: MzTimestamp + Refines<T>,
236{
237    /// A dataflow-local arrangement.
238    Local(
239        Arranged<S, RowRowAgent<S::Timestamp, Diff>>,
240        Arranged<S, ErrAgent<S::Timestamp, Diff>>,
241    ),
242    /// An imported trace from outside the dataflow.
243    ///
244    /// The `GlobalId` identifier exists so that exports of this same trace
245    /// can refer back to and depend on the original instance.
246    Trace(
247        GlobalId,
248        Arranged<S, RowRowEnter<T, Diff, S::Timestamp>>,
249        Arranged<S, ErrEnter<T, S::Timestamp>>,
250    ),
251}
252
253impl<S: Scope, T> ArrangementFlavor<S, T>
254where
255    T: MzTimestamp,
256    S::Timestamp: MzTimestamp + Refines<T>,
257{
258    /// Presents `self` as a stream of updates.
259    ///
260    /// Deprecated: This function is not fueled and hence risks flattening the whole arrangement.
261    ///
262    /// This method presents the contents as they are, without further computation.
263    /// If you have logic that could be applied to each record, consider using the
264    /// `flat_map` methods which allows this and can reduce the work done.
265    #[deprecated(note = "Use `flat_map` instead.")]
266    pub fn as_collection(
267        &self,
268    ) -> (
269        VecCollection<S, Row, Diff>,
270        VecCollection<S, DataflowError, Diff>,
271    ) {
272        let mut datums = DatumVec::new();
273        let logic = move |k: DatumSeq, v: DatumSeq| {
274            let mut datums_borrow = datums.borrow();
275            datums_borrow.extend(k);
276            datums_borrow.extend(v);
277            SharedRow::pack(&**datums_borrow)
278        };
279        match &self {
280            ArrangementFlavor::Local(oks, errs) => (
281                oks.clone().as_collection(logic),
282                errs.clone().as_collection(|k, &()| k.clone()),
283            ),
284            ArrangementFlavor::Trace(_, oks, errs) => (
285                oks.clone().as_collection(logic),
286                errs.clone().as_collection(|k, &()| k.clone()),
287            ),
288        }
289    }
290
291    /// Constructs and applies logic to elements of `self` and returns the results.
292    ///
293    /// The `logic` receives a vector of datums, a timestamp, and a diff, and produces
294    /// an iterator of `(D, S::Timestamp, Diff)` updates.
295    ///
296    /// If `key` is set, this is a promise that `logic` will produce no results on
297    /// records for which the key does not evaluate to the value. This is used to
298    /// leap directly to exactly those records.
299    ///
300    /// The `max_demand` parameter limits the number of columns decoded from the
301    /// input. Only the first `max_demand` columns are decoded. Pass `usize::MAX` to
302    /// decode all columns.
303    pub fn flat_map<D, I, L>(
304        &self,
305        key: Option<&Row>,
306        max_demand: usize,
307        mut logic: L,
308    ) -> (StreamVec<S, I::Item>, VecCollection<S, DataflowError, Diff>)
309    where
310        I: IntoIterator<Item = (D, S::Timestamp, Diff)>,
311        D: Data,
312        L: for<'a, 'b> FnMut(&'a mut DatumVecBorrow<'b>, S::Timestamp, Diff) -> I + 'static,
313    {
314        // Set a number of tuples after which the operator should yield.
315        // This allows us to remain responsive even when enumerating a substantial
316        // arrangement, as well as provides time to accumulate our produced output.
317        let refuel = 1000000;
318
319        let mut datums = DatumVec::new();
320        let logic = move |k: DatumSeq, v: DatumSeq, t, d| {
321            let mut datums_borrow = datums.borrow();
322            datums_borrow.extend(k.to_datum_iter().take(max_demand));
323            let max_demand = max_demand.saturating_sub(datums_borrow.len());
324            datums_borrow.extend(v.to_datum_iter().take(max_demand));
325            logic(&mut datums_borrow, t, d)
326        };
327
328        match &self {
329            ArrangementFlavor::Local(oks, errs) => {
330                let oks = CollectionBundle::<S, T>::flat_map_core(oks.clone(), key, logic, refuel);
331                let errs = errs.clone().as_collection(|k, &()| k.clone());
332                (oks, errs)
333            }
334            ArrangementFlavor::Trace(_, oks, errs) => {
335                let oks = CollectionBundle::<S, T>::flat_map_core(oks.clone(), key, logic, refuel);
336                let errs = errs.clone().as_collection(|k, &()| k.clone());
337                (oks, errs)
338            }
339        }
340    }
341}
342impl<S: Scope, T> ArrangementFlavor<S, T>
343where
344    T: MzTimestamp,
345    S::Timestamp: MzTimestamp + Refines<T>,
346{
347    /// The scope containing the collection bundle.
348    pub fn scope(&self) -> S {
349        match self {
350            ArrangementFlavor::Local(oks, _errs) => oks.stream.scope(),
351            ArrangementFlavor::Trace(_gid, oks, _errs) => oks.stream.scope(),
352        }
353    }
354
355    /// Brings the arrangement flavor into a region.
356    pub fn enter_region<'a>(
357        &self,
358        region: &Child<'a, S, S::Timestamp>,
359    ) -> ArrangementFlavor<Child<'a, S, S::Timestamp>, T> {
360        match self {
361            ArrangementFlavor::Local(oks, errs) => ArrangementFlavor::Local(
362                oks.clone().enter_region(region),
363                errs.clone().enter_region(region),
364            ),
365            ArrangementFlavor::Trace(gid, oks, errs) => ArrangementFlavor::Trace(
366                *gid,
367                oks.clone().enter_region(region),
368                errs.clone().enter_region(region),
369            ),
370        }
371    }
372}
373impl<'a, S: Scope, T> ArrangementFlavor<Child<'a, S, S::Timestamp>, T>
374where
375    T: MzTimestamp,
376    S::Timestamp: MzTimestamp + Refines<T>,
377{
378    /// Extracts the arrangement flavor from a region.
379    pub fn leave_region(&self) -> ArrangementFlavor<S, T> {
380        match self {
381            ArrangementFlavor::Local(oks, errs) => {
382                ArrangementFlavor::Local(oks.clone().leave_region(), errs.clone().leave_region())
383            }
384            ArrangementFlavor::Trace(gid, oks, errs) => ArrangementFlavor::Trace(
385                *gid,
386                oks.clone().leave_region(),
387                errs.clone().leave_region(),
388            ),
389        }
390    }
391}
392
393/// A bundle of the various ways a collection can be represented.
394///
395/// This type maintains the invariant that it does contain at least one valid
396/// source of data, either a collection or at least one arrangement.
397#[derive(Clone)]
398pub struct CollectionBundle<S: Scope, T = mz_repr::Timestamp>
399where
400    T: MzTimestamp,
401    S::Timestamp: MzTimestamp + Refines<T>,
402{
403    pub collection: Option<(
404        VecCollection<S, Row, Diff>,
405        VecCollection<S, DataflowError, Diff>,
406    )>,
407    pub arranged: BTreeMap<Vec<MirScalarExpr>, ArrangementFlavor<S, T>>,
408}
409
410impl<S: Scope, T> CollectionBundle<S, T>
411where
412    T: MzTimestamp,
413    S::Timestamp: MzTimestamp + Refines<T>,
414{
415    /// Construct a new collection bundle from update streams.
416    pub fn from_collections(
417        oks: VecCollection<S, Row, Diff>,
418        errs: VecCollection<S, DataflowError, Diff>,
419    ) -> Self {
420        Self {
421            collection: Some((oks, errs)),
422            arranged: BTreeMap::default(),
423        }
424    }
425
426    /// Inserts arrangements by the expressions on which they are keyed.
427    pub fn from_expressions(
428        exprs: Vec<MirScalarExpr>,
429        arrangements: ArrangementFlavor<S, T>,
430    ) -> Self {
431        let mut arranged = BTreeMap::new();
432        arranged.insert(exprs, arrangements);
433        Self {
434            collection: None,
435            arranged,
436        }
437    }
438
439    /// Inserts arrangements by the columns on which they are keyed.
440    pub fn from_columns<I: IntoIterator<Item = usize>>(
441        columns: I,
442        arrangements: ArrangementFlavor<S, T>,
443    ) -> Self {
444        let mut keys = Vec::new();
445        for column in columns {
446            keys.push(MirScalarExpr::column(column));
447        }
448        Self::from_expressions(keys, arrangements)
449    }
450
451    /// The scope containing the collection bundle.
452    pub fn scope(&self) -> S {
453        if let Some((oks, _errs)) = &self.collection {
454            oks.inner.scope()
455        } else {
456            self.arranged
457                .values()
458                .next()
459                .expect("Must contain a valid collection")
460                .scope()
461        }
462    }
463
464    /// Brings the collection bundle into a region.
465    pub fn enter_region<'a>(
466        &self,
467        region: &Child<'a, S, S::Timestamp>,
468    ) -> CollectionBundle<Child<'a, S, S::Timestamp>, T> {
469        CollectionBundle {
470            collection: self.collection.as_ref().map(|(oks, errs)| {
471                (
472                    oks.clone().enter_region(region),
473                    errs.clone().enter_region(region),
474                )
475            }),
476            arranged: self
477                .arranged
478                .iter()
479                .map(|(key, bundle)| (key.clone(), bundle.enter_region(region)))
480                .collect(),
481        }
482    }
483}
484
485impl<'a, S: Scope, T> CollectionBundle<Child<'a, S, S::Timestamp>, T>
486where
487    T: MzTimestamp,
488    S::Timestamp: MzTimestamp + Refines<T>,
489{
490    /// Extracts the collection bundle from a region.
491    pub fn leave_region(&self) -> CollectionBundle<S, T> {
492        CollectionBundle {
493            collection: self
494                .collection
495                .as_ref()
496                .map(|(oks, errs)| (oks.clone().leave_region(), errs.clone().leave_region())),
497            arranged: self
498                .arranged
499                .iter()
500                .map(|(key, bundle)| (key.clone(), bundle.leave_region()))
501                .collect(),
502        }
503    }
504}
505
506impl<S: Scope, T> CollectionBundle<S, T>
507where
508    T: MzTimestamp,
509    S::Timestamp: MzTimestamp + Refines<T>,
510{
511    /// Asserts that the arrangement for a specific key
512    /// (or the raw collection for no key) exists,
513    /// and returns the corresponding collection.
514    ///
515    /// This returns the collection as-is, without
516    /// doing any unthinning transformation.
517    /// Therefore, it should be used when the appropriate transformation
518    /// was planned as part of a following MFP.
519    ///
520    /// If `key` is specified, the function converts the arrangement to a collection. It uses either
521    /// the fueled `flat_map` or `as_collection` method, depending on the flag
522    /// [`ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION`].
523    pub fn as_specific_collection(
524        &self,
525        key: Option<&[MirScalarExpr]>,
526        config_set: &ConfigSet,
527    ) -> (
528        VecCollection<S, Row, Diff>,
529        VecCollection<S, DataflowError, Diff>,
530    ) {
531        // Any operator that uses this method was told to use a particular
532        // collection during LIR planning, where we should have made
533        // sure that that collection exists.
534        //
535        // If it doesn't, we panic.
536        match key {
537            None => self
538                .collection
539                .clone()
540                .expect("The unarranged collection doesn't exist."),
541            Some(key) => {
542                let arranged = self.arranged.get(key).unwrap_or_else(|| {
543                    panic!("The collection arranged by {:?} doesn't exist.", key)
544                });
545                if ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION.get(config_set) {
546                    // Decode all columns, pass max_demand as usize::MAX.
547                    let (ok, err) = arranged.flat_map(None, usize::MAX, |borrow, t, r| {
548                        Some((SharedRow::pack(borrow.iter()), t, r))
549                    });
550                    (ok.as_collection(), err)
551                } else {
552                    #[allow(deprecated)]
553                    arranged.as_collection()
554                }
555            }
556        }
557    }
558
559    /// Constructs and applies logic to elements of a collection and returns the results.
560    ///
561    /// The function applies `logic` on elements. The logic conceptually receives
562    /// `(&Row, &Row)` pairs in the form of a datum vec in the expected order.
563    ///
564    /// If `key_val` is set, this is a promise that `logic` will produce no results on
565    /// records for which the key does not evaluate to the value. This is used when we
566    /// have an arrangement by that key to leap directly to exactly those records.
567    /// It is important that `logic` still guard against data that does not satisfy
568    /// this constraint, as this method does not statically know that it will have
569    /// that arrangement.
570    ///
571    /// The `max_demand` parameter limits the number of columns decoded from the
572    /// input. Only the first `max_demand` columns are decoded. Pass `usize::MAX` to
573    /// decode all columns.
574    pub fn flat_map<D, I, L>(
575        &self,
576        key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
577        max_demand: usize,
578        mut logic: L,
579    ) -> (StreamVec<S, I::Item>, VecCollection<S, DataflowError, Diff>)
580    where
581        I: IntoIterator<Item = (D, S::Timestamp, Diff)>,
582        D: Data,
583        L: for<'a> FnMut(&'a mut DatumVecBorrow<'_>, S::Timestamp, Diff) -> I + 'static,
584    {
585        // If `key_val` is set, we should have to use the corresponding arrangement.
586        // If there isn't one, that implies an error in the contract between
587        // key-production and available arrangements.
588        if let Some((key, val)) = key_val {
589            self.arrangement(&key)
590                .expect("Should have ensured during planning that this arrangement exists.")
591                .flat_map(val.as_ref(), max_demand, logic)
592        } else {
593            use timely::dataflow::operators::vec::Map;
594            let (oks, errs) = self
595                .collection
596                .clone()
597                .expect("Invariant violated: CollectionBundle contains no collection.");
598            let mut datums = DatumVec::new();
599            let oks = oks.inner.flat_map(move |(v, t, d)| {
600                logic(&mut datums.borrow_with_limit(&v, max_demand), t, d)
601            });
602            (oks, errs)
603        }
604    }
605
606    /// Factored out common logic for using literal keys in general traces.
607    ///
608    /// This logic is sufficiently interesting that we want to write it only
609    /// once, and thereby avoid any skew in the two uses of the logic.
610    ///
611    /// The function presents the contents of the trace as `(key, value, time, delta)` tuples,
612    /// where key and value are potentially specialized, but convertible into rows.
613    fn flat_map_core<Tr, D, I, L>(
614        trace: Arranged<S, Tr>,
615        key: Option<&Tr::KeyOwn>,
616        mut logic: L,
617        refuel: usize,
618    ) -> StreamVec<S, I::Item>
619    where
620        Tr: for<'a> TraceReader<
621                Key<'a>: ToDatumIter,
622                KeyOwn: PartialEq,
623                Val<'a>: ToDatumIter,
624                Time = S::Timestamp,
625                Diff = mz_repr::Diff,
626            > + Clone
627            + 'static,
628        I: IntoIterator<Item = (D, Tr::Time, Tr::Diff)>,
629        D: Data,
630        L: FnMut(Tr::Key<'_>, Tr::Val<'_>, S::Timestamp, mz_repr::Diff) -> I + 'static,
631    {
632        use differential_dataflow::consolidation::ConsolidatingContainerBuilder as CB;
633        let scope = trace.stream.scope();
634
635        let mut key_con = Tr::KeyContainer::with_capacity(1);
636        if let Some(key) = &key {
637            key_con.push_own(key);
638        }
639        let mode = if key.is_some() { "index" } else { "scan" };
640        let name = format!("ArrangementFlatMap({})", mode);
641        use timely::dataflow::operators::Operator;
642        trace
643            .stream
644            .unary::<CB<_>, _, _, _>(Pipeline, &name, move |_, info| {
645                // Acquire an activator to reschedule the operator when it has unfinished work.
646                let activator = scope.activator_for(info.address);
647                // Maintain a list of work to do, cursor to navigate and process.
648                let mut todo = std::collections::VecDeque::new();
649                move |input, output| {
650                    let key = key_con.get(0);
651                    // First, dequeue all batches.
652                    input.for_each(|time, data| {
653                        let capability = time.retain(0);
654                        for batch in data.iter() {
655                            // enqueue a capability, cursor, and batch.
656                            todo.push_back(PendingWork::new(
657                                capability.clone(),
658                                batch.cursor(),
659                                batch.clone(),
660                            ));
661                        }
662                    });
663
664                    // Second, make progress on `todo`.
665                    let mut fuel = refuel;
666                    while !todo.is_empty() && fuel > 0 {
667                        todo.front_mut().unwrap().do_work(
668                            key.as_ref(),
669                            &mut logic,
670                            &mut fuel,
671                            output,
672                        );
673                        if fuel > 0 {
674                            todo.pop_front();
675                        }
676                    }
677                    // If we have not finished all work, re-activate the operator.
678                    if !todo.is_empty() {
679                        activator.activate();
680                    }
681                }
682            })
683    }
684
685    /// Look up an arrangement by the expressions that form the key.
686    ///
687    /// The result may be `None` if no such arrangement exists, or it may be one of many
688    /// "arrangement flavors" that represent the types of arranged data we might have.
689    pub fn arrangement(&self, key: &[MirScalarExpr]) -> Option<ArrangementFlavor<S, T>> {
690        self.arranged.get(key).map(|x| x.clone())
691    }
692}
693
694impl<S, T> CollectionBundle<S, T>
695where
696    T: MzTimestamp,
697    S: Scope,
698    S::Timestamp: Refines<T> + RenderTimestamp,
699{
700    /// Presents `self` as a stream of updates, having been subjected to `mfp`.
701    ///
702    /// This operator is able to apply the logic of `mfp` early, which can substantially
703    /// reduce the amount of data produced when `mfp` is non-trivial.
704    ///
705    /// The `key_val` argument, when present, indicates that a specific arrangement should
706    /// be used, and if, in addition, the `val` component is present,
707    /// that we can seek to the supplied row.
708    pub fn as_collection_core(
709        &self,
710        mut mfp: MapFilterProject,
711        key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
712        until: Antichain<mz_repr::Timestamp>,
713        config_set: &ConfigSet,
714    ) -> (
715        VecCollection<S, mz_repr::Row, Diff>,
716        VecCollection<S, DataflowError, Diff>,
717    ) {
718        mfp.optimize();
719        let mfp_plan = mfp.clone().into_plan().unwrap();
720
721        // If the MFP is trivial, we can just call `as_collection`.
722        // In the case that we weren't going to apply the `key_val` optimization,
723        // this path results in a slightly smaller and faster
724        // dataflow graph, and is intended to fix
725        // https://github.com/MaterializeInc/database-issues/issues/3111
726        let has_key_val = if let Some((_key, Some(_val))) = &key_val {
727            true
728        } else {
729            false
730        };
731
732        if mfp_plan.is_identity() && !has_key_val {
733            let key = key_val.map(|(k, _v)| k);
734            return self.as_specific_collection(key.as_deref(), config_set);
735        }
736
737        let max_demand = mfp.demand().iter().max().map(|x| *x + 1).unwrap_or(0);
738        mfp.permute_fn(|c| c, max_demand);
739        mfp.optimize();
740        let mfp_plan = mfp.into_plan().unwrap();
741
742        let mut datum_vec = DatumVec::new();
743        // Wrap in an `Rc` so that lifetimes work out.
744        let until = std::rc::Rc::new(until);
745
746        let (stream, errors) = self.flat_map(key_val, max_demand, move |row_datums, time, diff| {
747            let mut row_builder = SharedRow::get();
748            let until = std::rc::Rc::clone(&until);
749            let temp_storage = RowArena::new();
750            let row_iter = row_datums.iter();
751            let mut datums_local = datum_vec.borrow();
752            datums_local.extend(row_iter);
753            let time = time.clone();
754            let event_time = time.event_time();
755            mfp_plan
756                .evaluate(
757                    &mut datums_local,
758                    &temp_storage,
759                    event_time,
760                    diff.clone(),
761                    move |time| !until.less_equal(time),
762                    &mut row_builder,
763                )
764                .map(move |x| match x {
765                    Ok((row, event_time, diff)) => {
766                        // Copy the whole time, and re-populate event time.
767                        let mut time: S::Timestamp = time.clone();
768                        *time.event_time_mut() = event_time;
769                        (Ok(row), time, diff)
770                    }
771                    Err((e, event_time, diff)) => {
772                        // Copy the whole time, and re-populate event time.
773                        let mut time: S::Timestamp = time.clone();
774                        *time.event_time_mut() = event_time;
775                        (Err(e), time, diff)
776                    }
777                })
778        });
779
780        use differential_dataflow::AsCollection;
781        let (oks, errs) = stream
782            .as_collection()
783            .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>(
784                "OkErr",
785                |x| x,
786            );
787
788        (oks, errors.concat(errs))
789    }
790    pub fn ensure_collections(
791        mut self,
792        collections: AvailableCollections,
793        input_key: Option<Vec<MirScalarExpr>>,
794        input_mfp: MapFilterProject,
795        until: Antichain<mz_repr::Timestamp>,
796        config_set: &ConfigSet,
797    ) -> Self {
798        if collections == Default::default() {
799            return self;
800        }
801        // Cache collection to avoid reforming it each time.
802        //
803        // TODO(mcsherry): In theory this could be faster run out of another arrangement,
804        // as the `map_fallible` that follows could be run against an arrangement itself.
805        //
806        // Note(btv): If we ever do that, we would then only need to make the raw collection here
807        // if `collections.raw` is true.
808
809        for (key, _, _) in collections.arranged.iter() {
810            soft_assert_or_log!(
811                !self.arranged.contains_key(key),
812                "LIR ArrangeBy tried to create an existing arrangement"
813            );
814        }
815
816        // We need the collection if either (1) it is explicitly demanded, or (2) we are going to render any arrangement
817        let form_raw_collection = collections.raw
818            || collections
819                .arranged
820                .iter()
821                .any(|(key, _, _)| !self.arranged.contains_key(key));
822        if form_raw_collection && self.collection.is_none() {
823            self.collection = Some(self.as_collection_core(
824                input_mfp,
825                input_key.map(|k| (k, None)),
826                until,
827                config_set,
828            ));
829        }
830        for (key, _, thinning) in collections.arranged {
831            if !self.arranged.contains_key(&key) {
832                // TODO: Consider allowing more expressive names.
833                let name = format!("ArrangeBy[{:?}]", key);
834
835                let (oks, errs) = self
836                    .collection
837                    .take()
838                    .expect("Collection constructed above");
839                let (oks, errs_keyed, passthrough) =
840                    Self::arrange_collection(&name, oks, key.clone(), thinning.clone());
841                let errs_concat: KeyCollection<_, _, _> = errs.clone().concat(errs_keyed).into();
842                self.collection = Some((passthrough, errs));
843                let errs =
844                    errs_concat.mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, ErrSpine<_, _>>(
845                        &format!("{}-errors", name),
846                    );
847                self.arranged
848                    .insert(key, ArrangementFlavor::Local(oks, errs));
849            }
850        }
851        self
852    }
853
854    /// Builds an arrangement from a collection, using the specified key and value thinning.
855    ///
856    /// The arrangement's key is based on the `key` expressions, and the value the input with
857    /// the `thinning` applied to it. It selects which of the input columns are included in the
858    /// value of the arrangement. The thinning is in support of permuting arrangements such that
859    /// columns in the key are not included in the value.
860    ///
861    /// In addition to the ok and err streams, we produce a passthrough stream that forwards
862    /// the input as-is, which allows downstream consumers to reuse the collection without
863    /// teeing the stream.
864    fn arrange_collection(
865        name: &String,
866        oks: VecCollection<S, Row, Diff>,
867        key: Vec<MirScalarExpr>,
868        thinning: Vec<usize>,
869    ) -> (
870        Arranged<S, RowRowAgent<S::Timestamp, Diff>>,
871        VecCollection<S, DataflowError, Diff>,
872        VecCollection<S, Row, Diff>,
873    ) {
874        // This operator implements a `map_fallible`, but produces columnar updates for the ok
875        // stream. The `map_fallible` cannot be used here because the closure cannot return
876        // references, which is what we need to push into columnar streams. Instead, we use a
877        // bespoke operator that also optimizes reuse of allocations across individual updates.
878        let mut builder = OperatorBuilder::new("FormArrangementKey".to_string(), oks.inner.scope());
879        let (ok_output, ok_stream) = builder.new_output();
880        let mut ok_output =
881            OutputBuilder::<_, ColumnBuilder<((Row, Row), S::Timestamp, Diff)>>::from(ok_output);
882        let (err_output, err_stream) = builder.new_output();
883        let mut err_output = OutputBuilder::from(err_output);
884        let (passthrough_output, passthrough_stream) = builder.new_output();
885        let mut passthrough_output = OutputBuilder::from(passthrough_output);
886        let mut input = builder.new_input(oks.inner, Pipeline);
887        builder.set_notify_for(0, FrontierInterest::Never);
888        builder.build(move |_capabilities| {
889            let mut key_buf = Row::default();
890            let mut val_buf = Row::default();
891            let mut datums = DatumVec::new();
892            let mut temp_storage = RowArena::new();
893            move |_frontiers| {
894                let mut ok_output = ok_output.activate();
895                let mut err_output = err_output.activate();
896                let mut passthrough_output = passthrough_output.activate();
897                input.for_each(|time, data| {
898                    let mut ok_session = ok_output.session_with_builder(&time);
899                    let mut err_session = err_output.session(&time);
900                    for (row, time, diff) in data.iter() {
901                        temp_storage.clear();
902                        let datums = datums.borrow_with(row);
903                        let key_iter = key.iter().map(|k| k.eval(&datums, &temp_storage));
904                        match key_buf.packer().try_extend(key_iter) {
905                            Ok(()) => {
906                                let val_datum_iter = thinning.iter().map(|c| datums[*c]);
907                                val_buf.packer().extend(val_datum_iter);
908                                ok_session.give(((&*key_buf, &*val_buf), time, diff));
909                            }
910                            Err(e) => {
911                                err_session.give((e.into(), time.clone(), *diff));
912                            }
913                        }
914                    }
915                    passthrough_output.session(&time).give_container(data);
916                });
917            }
918        });
919
920        let oks = ok_stream
921            .mz_arrange_core::<
922                _,
923                Col2ValBatcher<_, _, _, _>,
924                RowRowBuilder<_, _>,
925                RowRowSpine<_, _>,
926            >(
927                ExchangeCore::<ColumnBuilder<_>, _>::new_core(
928                    columnar_exchange::<Row, Row, S::Timestamp, Diff>,
929                ),
930                name
931            );
932        (
933            oks,
934            err_stream.as_collection(),
935            passthrough_stream.as_collection(),
936        )
937    }
938}
939
940struct PendingWork<C>
941where
942    C: Cursor,
943{
944    capability: Capability<C::Time>,
945    cursor: C,
946    batch: C::Storage,
947}
948
949impl<C> PendingWork<C>
950where
951    C: Cursor<KeyOwn: PartialEq + Sized>,
952{
953    /// Create a new bundle of pending work, from the capability, cursor, and backing storage.
954    fn new(capability: Capability<C::Time>, cursor: C, batch: C::Storage) -> Self {
955        Self {
956            capability,
957            cursor,
958            batch,
959        }
960    }
961    /// Perform roughly `fuel` work through the cursor, applying `logic` and sending results to `output`.
962    fn do_work<I, D, L>(
963        &mut self,
964        key: Option<&C::Key<'_>>,
965        logic: &mut L,
966        fuel: &mut usize,
967        output: &mut OutputBuilderSession<'_, C::Time, ConsolidatingContainerBuilder<Vec<I::Item>>>,
968    ) where
969        I: IntoIterator<Item = (D, C::Time, C::Diff)>,
970        D: Data,
971        L: FnMut(C::Key<'_>, C::Val<'_>, C::Time, C::Diff) -> I + 'static,
972    {
973        use differential_dataflow::consolidation::consolidate;
974
975        // Attempt to make progress on this batch.
976        let mut work: usize = 0;
977        let mut session = output.session_with_builder(&self.capability);
978        let mut buffer = Vec::new();
979        if let Some(key) = key {
980            let key = C::KeyContainer::reborrow(*key);
981            if self.cursor.get_key(&self.batch).map(|k| k == key) != Some(true) {
982                self.cursor.seek_key(&self.batch, key);
983            }
984            if self.cursor.get_key(&self.batch).map(|k| k == key) == Some(true) {
985                let key = self.cursor.key(&self.batch);
986                while let Some(val) = self.cursor.get_val(&self.batch) {
987                    self.cursor.map_times(&self.batch, |time, diff| {
988                        buffer.push((C::owned_time(time), C::owned_diff(diff)));
989                    });
990                    consolidate(&mut buffer);
991                    for (time, diff) in buffer.drain(..) {
992                        for datum in logic(key, val, time, diff) {
993                            session.give(datum);
994                            work += 1;
995                        }
996                    }
997                    self.cursor.step_val(&self.batch);
998                    if work >= *fuel {
999                        *fuel = 0;
1000                        return;
1001                    }
1002                }
1003            }
1004        } else {
1005            while let Some(key) = self.cursor.get_key(&self.batch) {
1006                while let Some(val) = self.cursor.get_val(&self.batch) {
1007                    self.cursor.map_times(&self.batch, |time, diff| {
1008                        buffer.push((C::owned_time(time), C::owned_diff(diff)));
1009                    });
1010                    consolidate(&mut buffer);
1011                    for (time, diff) in buffer.drain(..) {
1012                        for datum in logic(key, val, time, diff) {
1013                            session.give(datum);
1014                            work += 1;
1015                        }
1016                    }
1017                    self.cursor.step_val(&self.batch);
1018                    if work >= *fuel {
1019                        *fuel = 0;
1020                        return;
1021                    }
1022                }
1023                self.cursor.step_key(&self.batch);
1024            }
1025        }
1026        *fuel -= work;
1027    }
1028}