Skip to main content

mz_compute/render/
context.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Management of dataflow-local state, like arrangements, while building a
11//! dataflow.
12
13use std::collections::BTreeMap;
14use std::rc::Rc;
15
16use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
17use differential_dataflow::operators::arrange::Arranged;
18use differential_dataflow::trace::implementations::BatchContainer;
19use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
20use differential_dataflow::{AsCollection, Data, VecCollection};
21use mz_compute_types::dataflows::DataflowDescription;
22use mz_compute_types::dyncfgs::ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION;
23use mz_compute_types::plan::AvailableCollections;
24use mz_dyncfg::ConfigSet;
25use mz_expr::{Id, MapFilterProject, MirScalarExpr};
26use mz_ore::soft_assert_or_log;
27use mz_repr::fixed_length::ToDatumIter;
28use mz_repr::{DatumVec, DatumVecBorrow, Diff, GlobalId, Row, RowArena, SharedRow};
29use mz_storage_types::controller::CollectionMetadata;
30use mz_storage_types::errors::DataflowError;
31use mz_timely_util::columnar::builder::ColumnBuilder;
32use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
33use mz_timely_util::operator::CollectionExt;
34use timely::container::CapacityContainerBuilder;
35use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
36use timely::dataflow::operators::Capability;
37use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
38use timely::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession};
39use timely::dataflow::scopes::Child;
40use timely::dataflow::{Scope, Stream};
41use timely::progress::timestamp::Refines;
42use timely::progress::{Antichain, Timestamp};
43
44use crate::compute_state::ComputeState;
45use crate::extensions::arrange::{KeyCollection, MzArrange, MzArrangeCore};
46use crate::render::errors::ErrorLogger;
47use crate::render::{LinearJoinSpec, RenderTimestamp};
48use crate::row_spine::{DatumSeq, RowRowBuilder};
49use crate::typedefs::{
50    ErrAgent, ErrBatcher, ErrBuilder, ErrEnter, ErrSpine, MzTimestamp, RowRowAgent, RowRowEnter,
51    RowRowSpine,
52};
53
54/// Dataflow-local collections and arrangements.
55///
56/// A context means to wrap available data assets and present them in an easy-to-use manner.
57/// These assets include dataflow-local collections and arrangements, as well as imported
58/// arrangements from outside the dataflow.
59///
60/// Context has two timestamp types, one from `S::Timestamp` and one from `T`, where the
61/// former must refine the latter. The former is the timestamp used by the scope in question,
62/// and the latter is the timestamp of imported traces. The two may be different in the case
63/// of regions or iteration.
64pub struct Context<S: Scope, T = mz_repr::Timestamp>
65where
66    T: MzTimestamp,
67    S::Timestamp: MzTimestamp + Refines<T>,
68{
69    /// The scope within which all managed collections exist.
70    ///
71    /// It is an error to add any collections not contained in this scope.
72    pub(crate) scope: S,
73    /// The debug name of the dataflow associated with this context.
74    pub debug_name: String,
75    /// The Timely ID of the dataflow associated with this context.
76    pub dataflow_id: usize,
77    /// The collection IDs of exports of the dataflow associated with this context.
78    pub export_ids: Vec<GlobalId>,
79    /// Frontier before which updates should not be emitted.
80    ///
81    /// We *must* apply it to sinks, to ensure correct outputs.
82    /// We *should* apply it to sources and imported traces, because it improves performance.
83    pub as_of_frontier: Antichain<T>,
84    /// Frontier after which updates should not be emitted.
85    /// Used to limit the amount of work done when appropriate.
86    pub until: Antichain<T>,
87    /// Bindings of identifiers to collections.
88    pub bindings: BTreeMap<Id, CollectionBundle<S, T>>,
89    /// The logger, from Timely's logging framework, if logs are enabled.
90    pub(super) compute_logger: Option<crate::logging::compute::Logger>,
91    /// Specification for rendering linear joins.
92    pub(super) linear_join_spec: LinearJoinSpec,
93    /// The expiration time for dataflows in this context. The output's frontier should never advance
94    /// past this frontier, except the empty frontier.
95    pub dataflow_expiration: Antichain<T>,
96    /// The config set for this context.
97    pub config_set: Rc<ConfigSet>,
98}
99
100impl<S: Scope> Context<S>
101where
102    S::Timestamp: MzTimestamp + Refines<mz_repr::Timestamp>,
103{
104    /// Creates a new empty Context.
105    pub fn for_dataflow_in<Plan>(
106        dataflow: &DataflowDescription<Plan, CollectionMetadata>,
107        scope: S,
108        compute_state: &ComputeState,
109        until: Antichain<mz_repr::Timestamp>,
110        dataflow_expiration: Antichain<mz_repr::Timestamp>,
111    ) -> Self {
112        use mz_ore::collections::CollectionExt as IteratorExt;
113        let dataflow_id = *scope.addr().into_first();
114        let as_of_frontier = dataflow
115            .as_of
116            .clone()
117            .unwrap_or_else(|| Antichain::from_elem(Timestamp::minimum()));
118
119        let export_ids = dataflow.export_ids().collect();
120
121        // Skip compute event logging for transient dataflows. We do this to avoid overhead for
122        // slow-path peeks, but it also affects subscribes. For now that seems fine, but we may
123        // want to reconsider in the future.
124        let compute_logger = if dataflow.is_transient() {
125            None
126        } else {
127            compute_state.compute_logger.clone()
128        };
129
130        Self {
131            scope,
132            debug_name: dataflow.debug_name.clone(),
133            dataflow_id,
134            export_ids,
135            as_of_frontier,
136            until,
137            bindings: BTreeMap::new(),
138            compute_logger,
139            linear_join_spec: compute_state.linear_join_spec,
140            dataflow_expiration,
141            config_set: Rc::clone(&compute_state.worker_config),
142        }
143    }
144}
145
146impl<S: Scope, T> Context<S, T>
147where
148    T: MzTimestamp,
149    S::Timestamp: MzTimestamp + Refines<T>,
150{
151    /// Insert a collection bundle by an identifier.
152    ///
153    /// This is expected to be used to install external collections (sources, indexes, other views),
154    /// as well as for `Let` bindings of local collections.
155    pub fn insert_id(
156        &mut self,
157        id: Id,
158        collection: CollectionBundle<S, T>,
159    ) -> Option<CollectionBundle<S, T>> {
160        self.bindings.insert(id, collection)
161    }
162    /// Remove a collection bundle by an identifier.
163    ///
164    /// The primary use of this method is uninstalling `Let` bindings.
165    pub fn remove_id(&mut self, id: Id) -> Option<CollectionBundle<S, T>> {
166        self.bindings.remove(&id)
167    }
168    /// Melds a collection bundle to whatever exists.
169    pub fn update_id(&mut self, id: Id, collection: CollectionBundle<S, T>) {
170        if !self.bindings.contains_key(&id) {
171            self.bindings.insert(id, collection);
172        } else {
173            let binding = self
174                .bindings
175                .get_mut(&id)
176                .expect("Binding verified to exist");
177            if collection.collection.is_some() {
178                binding.collection = collection.collection;
179            }
180            for (key, flavor) in collection.arranged.into_iter() {
181                binding.arranged.insert(key, flavor);
182            }
183        }
184    }
185    /// Look up a collection bundle by an identifier.
186    pub fn lookup_id(&self, id: Id) -> Option<CollectionBundle<S, T>> {
187        self.bindings.get(&id).cloned()
188    }
189
190    pub(super) fn error_logger(&self) -> ErrorLogger {
191        ErrorLogger::new(self.debug_name.clone())
192    }
193}
194
195impl<S: Scope, T> Context<S, T>
196where
197    T: MzTimestamp,
198    S::Timestamp: MzTimestamp + Refines<T>,
199{
200    /// Brings the underlying arrangements and collections into a region.
201    pub fn enter_region<'a>(
202        &self,
203        region: &Child<'a, S, S::Timestamp>,
204        bindings: Option<&std::collections::BTreeSet<Id>>,
205    ) -> Context<Child<'a, S, S::Timestamp>, T> {
206        let bindings = self
207            .bindings
208            .iter()
209            .filter(|(key, _)| bindings.as_ref().map(|b| b.contains(key)).unwrap_or(true))
210            .map(|(key, bundle)| (*key, bundle.enter_region(region)))
211            .collect();
212
213        Context {
214            scope: region.clone(),
215            debug_name: self.debug_name.clone(),
216            dataflow_id: self.dataflow_id.clone(),
217            export_ids: self.export_ids.clone(),
218            as_of_frontier: self.as_of_frontier.clone(),
219            until: self.until.clone(),
220            compute_logger: self.compute_logger.clone(),
221            linear_join_spec: self.linear_join_spec.clone(),
222            bindings,
223            dataflow_expiration: self.dataflow_expiration.clone(),
224            config_set: Rc::clone(&self.config_set),
225        }
226    }
227}
228
229/// Describes flavor of arrangement: local or imported trace.
230#[derive(Clone)]
231pub enum ArrangementFlavor<S: Scope, T = mz_repr::Timestamp>
232where
233    T: MzTimestamp,
234    S::Timestamp: MzTimestamp + Refines<T>,
235{
236    /// A dataflow-local arrangement.
237    Local(
238        Arranged<S, RowRowAgent<S::Timestamp, Diff>>,
239        Arranged<S, ErrAgent<S::Timestamp, Diff>>,
240    ),
241    /// An imported trace from outside the dataflow.
242    ///
243    /// The `GlobalId` identifier exists so that exports of this same trace
244    /// can refer back to and depend on the original instance.
245    Trace(
246        GlobalId,
247        Arranged<S, RowRowEnter<T, Diff, S::Timestamp>>,
248        Arranged<S, ErrEnter<T, S::Timestamp>>,
249    ),
250}
251
252impl<S: Scope, T> ArrangementFlavor<S, T>
253where
254    T: MzTimestamp,
255    S::Timestamp: MzTimestamp + Refines<T>,
256{
257    /// Presents `self` as a stream of updates.
258    ///
259    /// Deprecated: This function is not fueled and hence risks flattening the whole arrangement.
260    ///
261    /// This method presents the contents as they are, without further computation.
262    /// If you have logic that could be applied to each record, consider using the
263    /// `flat_map` methods which allows this and can reduce the work done.
264    #[deprecated(note = "Use `flat_map` instead.")]
265    pub fn as_collection(
266        &self,
267    ) -> (
268        VecCollection<S, Row, Diff>,
269        VecCollection<S, DataflowError, Diff>,
270    ) {
271        let mut datums = DatumVec::new();
272        let logic = move |k: DatumSeq, v: DatumSeq| {
273            let mut datums_borrow = datums.borrow();
274            datums_borrow.extend(k);
275            datums_borrow.extend(v);
276            SharedRow::pack(&**datums_borrow)
277        };
278        match &self {
279            ArrangementFlavor::Local(oks, errs) => (
280                oks.as_collection(logic),
281                errs.as_collection(|k, &()| k.clone()),
282            ),
283            ArrangementFlavor::Trace(_, oks, errs) => (
284                oks.as_collection(logic),
285                errs.as_collection(|k, &()| k.clone()),
286            ),
287        }
288    }
289
290    /// Constructs and applies logic to elements of `self` and returns the results.
291    ///
292    /// The `logic` receives a vector of datums, a timestamp, and a diff, and produces
293    /// an iterator of `(D, S::Timestamp, Diff)` updates.
294    ///
295    /// If `key` is set, this is a promise that `logic` will produce no results on
296    /// records for which the key does not evaluate to the value. This is used to
297    /// leap directly to exactly those records.
298    ///
299    /// The `max_demand` parameter limits the number of columns decoded from the
300    /// input. Only the first `max_demand` columns are decoded. Pass `usize::MAX` to
301    /// decode all columns.
302    pub fn flat_map<D, I, L>(
303        &self,
304        key: Option<&Row>,
305        max_demand: usize,
306        mut logic: L,
307    ) -> (Stream<S, I::Item>, VecCollection<S, DataflowError, Diff>)
308    where
309        I: IntoIterator<Item = (D, S::Timestamp, Diff)>,
310        D: Data,
311        L: for<'a, 'b> FnMut(&'a mut DatumVecBorrow<'b>, S::Timestamp, Diff) -> I + 'static,
312    {
313        // Set a number of tuples after which the operator should yield.
314        // This allows us to remain responsive even when enumerating a substantial
315        // arrangement, as well as provides time to accumulate our produced output.
316        let refuel = 1000000;
317
318        let mut datums = DatumVec::new();
319        let logic = move |k: DatumSeq, v: DatumSeq, t, d| {
320            let mut datums_borrow = datums.borrow();
321            datums_borrow.extend(k.to_datum_iter().take(max_demand));
322            let max_demand = max_demand.saturating_sub(datums_borrow.len());
323            datums_borrow.extend(v.to_datum_iter().take(max_demand));
324            logic(&mut datums_borrow, t, d)
325        };
326
327        match &self {
328            ArrangementFlavor::Local(oks, errs) => {
329                let oks = CollectionBundle::<S, T>::flat_map_core(oks, key, logic, refuel);
330                let errs = errs.as_collection(|k, &()| k.clone());
331                (oks, errs)
332            }
333            ArrangementFlavor::Trace(_, oks, errs) => {
334                let oks = CollectionBundle::<S, T>::flat_map_core(oks, key, logic, refuel);
335                let errs = errs.as_collection(|k, &()| k.clone());
336                (oks, errs)
337            }
338        }
339    }
340}
341impl<S: Scope, T> ArrangementFlavor<S, T>
342where
343    T: MzTimestamp,
344    S::Timestamp: MzTimestamp + Refines<T>,
345{
346    /// The scope containing the collection bundle.
347    pub fn scope(&self) -> S {
348        match self {
349            ArrangementFlavor::Local(oks, _errs) => oks.stream.scope(),
350            ArrangementFlavor::Trace(_gid, oks, _errs) => oks.stream.scope(),
351        }
352    }
353
354    /// Brings the arrangement flavor into a region.
355    pub fn enter_region<'a>(
356        &self,
357        region: &Child<'a, S, S::Timestamp>,
358    ) -> ArrangementFlavor<Child<'a, S, S::Timestamp>, T> {
359        match self {
360            ArrangementFlavor::Local(oks, errs) => {
361                ArrangementFlavor::Local(oks.enter_region(region), errs.enter_region(region))
362            }
363            ArrangementFlavor::Trace(gid, oks, errs) => {
364                ArrangementFlavor::Trace(*gid, oks.enter_region(region), errs.enter_region(region))
365            }
366        }
367    }
368}
369impl<'a, S: Scope, T> ArrangementFlavor<Child<'a, S, S::Timestamp>, T>
370where
371    T: MzTimestamp,
372    S::Timestamp: MzTimestamp + Refines<T>,
373{
374    /// Extracts the arrangement flavor from a region.
375    pub fn leave_region(&self) -> ArrangementFlavor<S, T> {
376        match self {
377            ArrangementFlavor::Local(oks, errs) => {
378                ArrangementFlavor::Local(oks.leave_region(), errs.leave_region())
379            }
380            ArrangementFlavor::Trace(gid, oks, errs) => {
381                ArrangementFlavor::Trace(*gid, oks.leave_region(), errs.leave_region())
382            }
383        }
384    }
385}
386
387/// A bundle of the various ways a collection can be represented.
388///
389/// This type maintains the invariant that it does contain at least one valid
390/// source of data, either a collection or at least one arrangement.
391#[derive(Clone)]
392pub struct CollectionBundle<S: Scope, T = mz_repr::Timestamp>
393where
394    T: MzTimestamp,
395    S::Timestamp: MzTimestamp + Refines<T>,
396{
397    pub collection: Option<(
398        VecCollection<S, Row, Diff>,
399        VecCollection<S, DataflowError, Diff>,
400    )>,
401    pub arranged: BTreeMap<Vec<MirScalarExpr>, ArrangementFlavor<S, T>>,
402}
403
404impl<S: Scope, T> CollectionBundle<S, T>
405where
406    T: MzTimestamp,
407    S::Timestamp: MzTimestamp + Refines<T>,
408{
409    /// Construct a new collection bundle from update streams.
410    pub fn from_collections(
411        oks: VecCollection<S, Row, Diff>,
412        errs: VecCollection<S, DataflowError, Diff>,
413    ) -> Self {
414        Self {
415            collection: Some((oks, errs)),
416            arranged: BTreeMap::default(),
417        }
418    }
419
420    /// Inserts arrangements by the expressions on which they are keyed.
421    pub fn from_expressions(
422        exprs: Vec<MirScalarExpr>,
423        arrangements: ArrangementFlavor<S, T>,
424    ) -> Self {
425        let mut arranged = BTreeMap::new();
426        arranged.insert(exprs, arrangements);
427        Self {
428            collection: None,
429            arranged,
430        }
431    }
432
433    /// Inserts arrangements by the columns on which they are keyed.
434    pub fn from_columns<I: IntoIterator<Item = usize>>(
435        columns: I,
436        arrangements: ArrangementFlavor<S, T>,
437    ) -> Self {
438        let mut keys = Vec::new();
439        for column in columns {
440            keys.push(MirScalarExpr::column(column));
441        }
442        Self::from_expressions(keys, arrangements)
443    }
444
445    /// The scope containing the collection bundle.
446    pub fn scope(&self) -> S {
447        if let Some((oks, _errs)) = &self.collection {
448            oks.inner.scope()
449        } else {
450            self.arranged
451                .values()
452                .next()
453                .expect("Must contain a valid collection")
454                .scope()
455        }
456    }
457
458    /// Brings the collection bundle into a region.
459    pub fn enter_region<'a>(
460        &self,
461        region: &Child<'a, S, S::Timestamp>,
462    ) -> CollectionBundle<Child<'a, S, S::Timestamp>, T> {
463        CollectionBundle {
464            collection: self
465                .collection
466                .as_ref()
467                .map(|(oks, errs)| (oks.enter_region(region), errs.enter_region(region))),
468            arranged: self
469                .arranged
470                .iter()
471                .map(|(key, bundle)| (key.clone(), bundle.enter_region(region)))
472                .collect(),
473        }
474    }
475}
476
477impl<'a, S: Scope, T> CollectionBundle<Child<'a, S, S::Timestamp>, T>
478where
479    T: MzTimestamp,
480    S::Timestamp: MzTimestamp + Refines<T>,
481{
482    /// Extracts the collection bundle from a region.
483    pub fn leave_region(&self) -> CollectionBundle<S, T> {
484        CollectionBundle {
485            collection: self
486                .collection
487                .as_ref()
488                .map(|(oks, errs)| (oks.leave_region(), errs.leave_region())),
489            arranged: self
490                .arranged
491                .iter()
492                .map(|(key, bundle)| (key.clone(), bundle.leave_region()))
493                .collect(),
494        }
495    }
496}
497
498impl<S: Scope, T> CollectionBundle<S, T>
499where
500    T: MzTimestamp,
501    S::Timestamp: MzTimestamp + Refines<T>,
502{
503    /// Asserts that the arrangement for a specific key
504    /// (or the raw collection for no key) exists,
505    /// and returns the corresponding collection.
506    ///
507    /// This returns the collection as-is, without
508    /// doing any unthinning transformation.
509    /// Therefore, it should be used when the appropriate transformation
510    /// was planned as part of a following MFP.
511    ///
512    /// If `key` is specified, the function converts the arrangement to a collection. It uses either
513    /// the fueled `flat_map` or `as_collection` method, depending on the flag
514    /// [`ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION`].
515    pub fn as_specific_collection(
516        &self,
517        key: Option<&[MirScalarExpr]>,
518        config_set: &ConfigSet,
519    ) -> (
520        VecCollection<S, Row, Diff>,
521        VecCollection<S, DataflowError, Diff>,
522    ) {
523        // Any operator that uses this method was told to use a particular
524        // collection during LIR planning, where we should have made
525        // sure that that collection exists.
526        //
527        // If it doesn't, we panic.
528        match key {
529            None => self
530                .collection
531                .clone()
532                .expect("The unarranged collection doesn't exist."),
533            Some(key) => {
534                let arranged = self.arranged.get(key).unwrap_or_else(|| {
535                    panic!("The collection arranged by {:?} doesn't exist.", key)
536                });
537                if ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION.get(config_set) {
538                    // Decode all columns, pass max_demand as usize::MAX.
539                    let (ok, err) = arranged.flat_map(None, usize::MAX, |borrow, t, r| {
540                        Some((SharedRow::pack(borrow.iter()), t, r))
541                    });
542                    (ok.as_collection(), err)
543                } else {
544                    #[allow(deprecated)]
545                    arranged.as_collection()
546                }
547            }
548        }
549    }
550
551    /// Constructs and applies logic to elements of a collection and returns the results.
552    ///
553    /// The function applies `logic` on elements. The logic conceptually receives
554    /// `(&Row, &Row)` pairs in the form of a datum vec in the expected order.
555    ///
556    /// If `key_val` is set, this is a promise that `logic` will produce no results on
557    /// records for which the key does not evaluate to the value. This is used when we
558    /// have an arrangement by that key to leap directly to exactly those records.
559    /// It is important that `logic` still guard against data that does not satisfy
560    /// this constraint, as this method does not statically know that it will have
561    /// that arrangement.
562    ///
563    /// The `max_demand` parameter limits the number of columns decoded from the
564    /// input. Only the first `max_demand` columns are decoded. Pass `usize::MAX` to
565    /// decode all columns.
566    pub fn flat_map<D, I, L>(
567        &self,
568        key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
569        max_demand: usize,
570        mut logic: L,
571    ) -> (Stream<S, I::Item>, VecCollection<S, DataflowError, Diff>)
572    where
573        I: IntoIterator<Item = (D, S::Timestamp, Diff)>,
574        D: Data,
575        L: for<'a> FnMut(&'a mut DatumVecBorrow<'_>, S::Timestamp, Diff) -> I + 'static,
576    {
577        // If `key_val` is set, we should have to use the corresponding arrangement.
578        // If there isn't one, that implies an error in the contract between
579        // key-production and available arrangements.
580        if let Some((key, val)) = key_val {
581            self.arrangement(&key)
582                .expect("Should have ensured during planning that this arrangement exists.")
583                .flat_map(val.as_ref(), max_demand, logic)
584        } else {
585            use timely::dataflow::operators::Map;
586            let (oks, errs) = self
587                .collection
588                .clone()
589                .expect("Invariant violated: CollectionBundle contains no collection.");
590            let mut datums = DatumVec::new();
591            let oks = oks.inner.flat_map(move |(v, t, d)| {
592                logic(&mut datums.borrow_with_limit(&v, max_demand), t, d)
593            });
594            (oks, errs)
595        }
596    }
597
598    /// Factored out common logic for using literal keys in general traces.
599    ///
600    /// This logic is sufficiently interesting that we want to write it only
601    /// once, and thereby avoid any skew in the two uses of the logic.
602    ///
603    /// The function presents the contents of the trace as `(key, value, time, delta)` tuples,
604    /// where key and value are potentially specialized, but convertible into rows.
605    fn flat_map_core<Tr, D, I, L>(
606        trace: &Arranged<S, Tr>,
607        key: Option<&Tr::KeyOwn>,
608        mut logic: L,
609        refuel: usize,
610    ) -> Stream<S, I::Item>
611    where
612        Tr: for<'a> TraceReader<
613                Key<'a>: ToDatumIter,
614                KeyOwn: PartialEq,
615                Val<'a>: ToDatumIter,
616                Time = S::Timestamp,
617                Diff = mz_repr::Diff,
618            > + Clone
619            + 'static,
620        I: IntoIterator<Item = (D, Tr::Time, Tr::Diff)>,
621        D: Data,
622        L: FnMut(Tr::Key<'_>, Tr::Val<'_>, S::Timestamp, mz_repr::Diff) -> I + 'static,
623    {
624        use differential_dataflow::consolidation::ConsolidatingContainerBuilder as CB;
625
626        let mut key_con = Tr::KeyContainer::with_capacity(1);
627        if let Some(key) = &key {
628            key_con.push_own(key);
629        }
630        let mode = if key.is_some() { "index" } else { "scan" };
631        let name = format!("ArrangementFlatMap({})", mode);
632        use timely::dataflow::operators::Operator;
633        trace
634            .stream
635            .unary::<CB<_>, _, _, _>(Pipeline, &name, move |_, info| {
636                // Acquire an activator to reschedule the operator when it has unfinished work.
637                let activator = trace.stream.scope().activator_for(info.address);
638                // Maintain a list of work to do, cursor to navigate and process.
639                let mut todo = std::collections::VecDeque::new();
640                move |input, output| {
641                    let key = key_con.get(0);
642                    // First, dequeue all batches.
643                    input.for_each(|time, data| {
644                        let capability = time.retain();
645                        for batch in data.iter() {
646                            // enqueue a capability, cursor, and batch.
647                            todo.push_back(PendingWork::new(
648                                capability.clone(),
649                                batch.cursor(),
650                                batch.clone(),
651                            ));
652                        }
653                    });
654
655                    // Second, make progress on `todo`.
656                    let mut fuel = refuel;
657                    while !todo.is_empty() && fuel > 0 {
658                        todo.front_mut().unwrap().do_work(
659                            key.as_ref(),
660                            &mut logic,
661                            &mut fuel,
662                            output,
663                        );
664                        if fuel > 0 {
665                            todo.pop_front();
666                        }
667                    }
668                    // If we have not finished all work, re-activate the operator.
669                    if !todo.is_empty() {
670                        activator.activate();
671                    }
672                }
673            })
674    }
675
676    /// Look up an arrangement by the expressions that form the key.
677    ///
678    /// The result may be `None` if no such arrangement exists, or it may be one of many
679    /// "arrangement flavors" that represent the types of arranged data we might have.
680    pub fn arrangement(&self, key: &[MirScalarExpr]) -> Option<ArrangementFlavor<S, T>> {
681        self.arranged.get(key).map(|x| x.clone())
682    }
683}
684
685impl<S, T> CollectionBundle<S, T>
686where
687    T: MzTimestamp,
688    S: Scope,
689    S::Timestamp: Refines<T> + RenderTimestamp,
690{
691    /// Presents `self` as a stream of updates, having been subjected to `mfp`.
692    ///
693    /// This operator is able to apply the logic of `mfp` early, which can substantially
694    /// reduce the amount of data produced when `mfp` is non-trivial.
695    ///
696    /// The `key_val` argument, when present, indicates that a specific arrangement should
697    /// be used, and if, in addition, the `val` component is present,
698    /// that we can seek to the supplied row.
699    pub fn as_collection_core(
700        &self,
701        mut mfp: MapFilterProject,
702        key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
703        until: Antichain<mz_repr::Timestamp>,
704        config_set: &ConfigSet,
705    ) -> (
706        VecCollection<S, mz_repr::Row, Diff>,
707        VecCollection<S, DataflowError, Diff>,
708    ) {
709        mfp.optimize();
710        let mfp_plan = mfp.clone().into_plan().unwrap();
711
712        // If the MFP is trivial, we can just call `as_collection`.
713        // In the case that we weren't going to apply the `key_val` optimization,
714        // this path results in a slightly smaller and faster
715        // dataflow graph, and is intended to fix
716        // https://github.com/MaterializeInc/database-issues/issues/3111
717        let has_key_val = if let Some((_key, Some(_val))) = &key_val {
718            true
719        } else {
720            false
721        };
722
723        if mfp_plan.is_identity() && !has_key_val {
724            let key = key_val.map(|(k, _v)| k);
725            return self.as_specific_collection(key.as_deref(), config_set);
726        }
727
728        let max_demand = mfp.demand().iter().max().map(|x| *x + 1).unwrap_or(0);
729        mfp.permute_fn(|c| c, max_demand);
730        mfp.optimize();
731        let mfp_plan = mfp.into_plan().unwrap();
732
733        let mut datum_vec = DatumVec::new();
734        // Wrap in an `Rc` so that lifetimes work out.
735        let until = std::rc::Rc::new(until);
736
737        let (stream, errors) = self.flat_map(key_val, max_demand, move |row_datums, time, diff| {
738            let mut row_builder = SharedRow::get();
739            let until = std::rc::Rc::clone(&until);
740            let temp_storage = RowArena::new();
741            let row_iter = row_datums.iter();
742            let mut datums_local = datum_vec.borrow();
743            datums_local.extend(row_iter);
744            let time = time.clone();
745            let event_time = time.event_time();
746            mfp_plan
747                .evaluate(
748                    &mut datums_local,
749                    &temp_storage,
750                    event_time,
751                    diff.clone(),
752                    move |time| !until.less_equal(time),
753                    &mut row_builder,
754                )
755                .map(move |x| match x {
756                    Ok((row, event_time, diff)) => {
757                        // Copy the whole time, and re-populate event time.
758                        let mut time: S::Timestamp = time.clone();
759                        *time.event_time_mut() = event_time;
760                        (Ok(row), time, diff)
761                    }
762                    Err((e, event_time, diff)) => {
763                        // Copy the whole time, and re-populate event time.
764                        let mut time: S::Timestamp = time.clone();
765                        *time.event_time_mut() = event_time;
766                        (Err(e), time, diff)
767                    }
768                })
769        });
770
771        use differential_dataflow::AsCollection;
772        let (oks, errs) = stream
773            .as_collection()
774            .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>(
775                "OkErr",
776                |x| x,
777            );
778
779        (oks, errors.concat(&errs))
780    }
781    pub fn ensure_collections(
782        mut self,
783        collections: AvailableCollections,
784        input_key: Option<Vec<MirScalarExpr>>,
785        input_mfp: MapFilterProject,
786        until: Antichain<mz_repr::Timestamp>,
787        config_set: &ConfigSet,
788    ) -> Self {
789        if collections == Default::default() {
790            return self;
791        }
792        // Cache collection to avoid reforming it each time.
793        //
794        // TODO(mcsherry): In theory this could be faster run out of another arrangement,
795        // as the `map_fallible` that follows could be run against an arrangement itself.
796        //
797        // Note(btv): If we ever do that, we would then only need to make the raw collection here
798        // if `collections.raw` is true.
799
800        for (key, _, _) in collections.arranged.iter() {
801            soft_assert_or_log!(
802                !self.arranged.contains_key(key),
803                "LIR ArrangeBy tried to create an existing arrangement"
804            );
805        }
806
807        // We need the collection if either (1) it is explicitly demanded, or (2) we are going to render any arrangement
808        let form_raw_collection = collections.raw
809            || collections
810                .arranged
811                .iter()
812                .any(|(key, _, _)| !self.arranged.contains_key(key));
813        if form_raw_collection && self.collection.is_none() {
814            self.collection = Some(self.as_collection_core(
815                input_mfp,
816                input_key.map(|k| (k, None)),
817                until,
818                config_set,
819            ));
820        }
821        for (key, _, thinning) in collections.arranged {
822            if !self.arranged.contains_key(&key) {
823                // TODO: Consider allowing more expressive names.
824                let name = format!("ArrangeBy[{:?}]", key);
825
826                let (oks, errs) = self
827                    .collection
828                    .take()
829                    .expect("Collection constructed above");
830                let (oks, errs_keyed, passthrough) =
831                    Self::arrange_collection(&name, oks, key.clone(), thinning.clone());
832                let errs_concat: KeyCollection<_, _, _> = errs.concat(&errs_keyed).into();
833                self.collection = Some((passthrough, errs));
834                let errs =
835                    errs_concat.mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, ErrSpine<_, _>>(
836                        &format!("{}-errors", name),
837                    );
838                self.arranged
839                    .insert(key, ArrangementFlavor::Local(oks, errs));
840            }
841        }
842        self
843    }
844
845    /// Builds an arrangement from a collection, using the specified key and value thinning.
846    ///
847    /// The arrangement's key is based on the `key` expressions, and the value the input with
848    /// the `thinning` applied to it. It selects which of the input columns are included in the
849    /// value of the arrangement. The thinning is in support of permuting arrangements such that
850    /// columns in the key are not included in the value.
851    ///
852    /// In addition to the ok and err streams, we produce a passthrough stream that forwards
853    /// the input as-is, which allows downstream consumers to reuse the collection without
854    /// teeing the stream.
855    fn arrange_collection(
856        name: &String,
857        oks: VecCollection<S, Row, Diff>,
858        key: Vec<MirScalarExpr>,
859        thinning: Vec<usize>,
860    ) -> (
861        Arranged<S, RowRowAgent<S::Timestamp, Diff>>,
862        VecCollection<S, DataflowError, Diff>,
863        VecCollection<S, Row, Diff>,
864    ) {
865        // This operator implements a `map_fallible`, but produces columnar updates for the ok
866        // stream. The `map_fallible` cannot be used here because the closure cannot return
867        // references, which is what we need to push into columnar streams. Instead, we use a
868        // bespoke operator that also optimizes reuse of allocations across individual updates.
869        let mut builder = OperatorBuilder::new("FormArrangementKey".to_string(), oks.inner.scope());
870        let (ok_output, ok_stream) = builder.new_output();
871        let mut ok_output =
872            OutputBuilder::<_, ColumnBuilder<((Row, Row), S::Timestamp, Diff)>>::from(ok_output);
873        let (err_output, err_stream) = builder.new_output();
874        let mut err_output = OutputBuilder::from(err_output);
875        let (passthrough_output, passthrough_stream) = builder.new_output();
876        let mut passthrough_output = OutputBuilder::from(passthrough_output);
877        let mut input = builder.new_input(&oks.inner, Pipeline);
878        builder.set_notify(false);
879        builder.build(move |_capabilities| {
880            let mut key_buf = Row::default();
881            let mut val_buf = Row::default();
882            let mut datums = DatumVec::new();
883            let mut temp_storage = RowArena::new();
884            move |_frontiers| {
885                let mut ok_output = ok_output.activate();
886                let mut err_output = err_output.activate();
887                let mut passthrough_output = passthrough_output.activate();
888                input.for_each(|time, data| {
889                    let mut ok_session = ok_output.session_with_builder(&time);
890                    let mut err_session = err_output.session(&time);
891                    for (row, time, diff) in data.iter() {
892                        temp_storage.clear();
893                        let datums = datums.borrow_with(row);
894                        let key_iter = key.iter().map(|k| k.eval(&datums, &temp_storage));
895                        match key_buf.packer().try_extend(key_iter) {
896                            Ok(()) => {
897                                let val_datum_iter = thinning.iter().map(|c| datums[*c]);
898                                val_buf.packer().extend(val_datum_iter);
899                                ok_session.give(((&*key_buf, &*val_buf), time, diff));
900                            }
901                            Err(e) => {
902                                err_session.give((e.into(), time.clone(), *diff));
903                            }
904                        }
905                    }
906                    passthrough_output.session(&time).give_container(data);
907                });
908            }
909        });
910
911        let oks = ok_stream
912            .mz_arrange_core::<
913                _,
914                Col2ValBatcher<_, _, _, _>,
915                RowRowBuilder<_, _>,
916                RowRowSpine<_, _>,
917            >(
918                ExchangeCore::<ColumnBuilder<_>, _>::new_core(
919                    columnar_exchange::<Row, Row, S::Timestamp, Diff>,
920                ),
921                name
922            );
923        (
924            oks,
925            err_stream.as_collection(),
926            passthrough_stream.as_collection(),
927        )
928    }
929}
930
931struct PendingWork<C>
932where
933    C: Cursor,
934{
935    capability: Capability<C::Time>,
936    cursor: C,
937    batch: C::Storage,
938}
939
940impl<C> PendingWork<C>
941where
942    C: Cursor<KeyOwn: PartialEq + Sized>,
943{
944    /// Create a new bundle of pending work, from the capability, cursor, and backing storage.
945    fn new(capability: Capability<C::Time>, cursor: C, batch: C::Storage) -> Self {
946        Self {
947            capability,
948            cursor,
949            batch,
950        }
951    }
952    /// Perform roughly `fuel` work through the cursor, applying `logic` and sending results to `output`.
953    fn do_work<I, D, L>(
954        &mut self,
955        key: Option<&C::Key<'_>>,
956        logic: &mut L,
957        fuel: &mut usize,
958        output: &mut OutputBuilderSession<'_, C::Time, ConsolidatingContainerBuilder<Vec<I::Item>>>,
959    ) where
960        I: IntoIterator<Item = (D, C::Time, C::Diff)>,
961        D: Data,
962        L: FnMut(C::Key<'_>, C::Val<'_>, C::Time, C::Diff) -> I + 'static,
963    {
964        use differential_dataflow::consolidation::consolidate;
965
966        // Attempt to make progress on this batch.
967        let mut work: usize = 0;
968        let mut session = output.session_with_builder(&self.capability);
969        let mut buffer = Vec::new();
970        if let Some(key) = key {
971            let key = C::KeyContainer::reborrow(*key);
972            if self.cursor.get_key(&self.batch).map(|k| k == key) != Some(true) {
973                self.cursor.seek_key(&self.batch, key);
974            }
975            if self.cursor.get_key(&self.batch).map(|k| k == key) == Some(true) {
976                let key = self.cursor.key(&self.batch);
977                while let Some(val) = self.cursor.get_val(&self.batch) {
978                    self.cursor.map_times(&self.batch, |time, diff| {
979                        buffer.push((C::owned_time(time), C::owned_diff(diff)));
980                    });
981                    consolidate(&mut buffer);
982                    for (time, diff) in buffer.drain(..) {
983                        for datum in logic(key, val, time, diff) {
984                            session.give(datum);
985                            work += 1;
986                        }
987                    }
988                    self.cursor.step_val(&self.batch);
989                    if work >= *fuel {
990                        *fuel = 0;
991                        return;
992                    }
993                }
994            }
995        } else {
996            while let Some(key) = self.cursor.get_key(&self.batch) {
997                while let Some(val) = self.cursor.get_val(&self.batch) {
998                    self.cursor.map_times(&self.batch, |time, diff| {
999                        buffer.push((C::owned_time(time), C::owned_diff(diff)));
1000                    });
1001                    consolidate(&mut buffer);
1002                    for (time, diff) in buffer.drain(..) {
1003                        for datum in logic(key, val, time, diff) {
1004                            session.give(datum);
1005                            work += 1;
1006                        }
1007                    }
1008                    self.cursor.step_val(&self.batch);
1009                    if work >= *fuel {
1010                        *fuel = 0;
1011                        return;
1012                    }
1013                }
1014                self.cursor.step_key(&self.batch);
1015            }
1016        }
1017        *fuel -= work;
1018    }
1019}