Skip to main content

mz_compute/render/
context.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Management of dataflow-local state, like arrangements, while building a
11//! dataflow.
12
13use std::collections::BTreeMap;
14use std::rc::Rc;
15
16use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
17use differential_dataflow::operators::arrange::Arranged;
18use differential_dataflow::trace::implementations::BatchContainer;
19use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
20use differential_dataflow::{AsCollection, Data, VecCollection};
21use mz_compute_types::dataflows::DataflowDescription;
22use mz_compute_types::dyncfgs::ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION;
23use mz_compute_types::plan::AvailableCollections;
24use mz_dyncfg::ConfigSet;
25use mz_expr::{Id, MapFilterProject, MirScalarExpr};
26use mz_ore::soft_assert_or_log;
27use mz_repr::fixed_length::ToDatumIter;
28use mz_repr::{DatumVec, DatumVecBorrow, Diff, GlobalId, Row, RowArena, SharedRow};
29use mz_storage_types::controller::CollectionMetadata;
30use mz_storage_types::errors::DataflowError;
31use mz_timely_util::columnar::builder::ColumnBuilder;
32use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
33use mz_timely_util::operator::CollectionExt;
34use timely::container::CapacityContainerBuilder;
35use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
36use timely::dataflow::operators::Capability;
37use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
38use timely::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession};
39use timely::dataflow::{Scope, StreamVec};
40use timely::progress::operate::FrontierInterest;
41use timely::progress::{Antichain, Timestamp};
42
43use crate::compute_state::ComputeState;
44use crate::extensions::arrange::{KeyCollection, MzArrange, MzArrangeCore};
45use crate::render::errors::ErrorLogger;
46use crate::render::{LinearJoinSpec, RenderTimestamp};
47use crate::row_spine::{DatumSeq, RowRowBuilder};
48use crate::typedefs::{
49    ErrAgent, ErrBatcher, ErrBuilder, ErrEnter, ErrSpine, RowRowAgent, RowRowEnter, RowRowSpine,
50};
51
52/// Dataflow-local collections and arrangements.
53///
54/// A context means to wrap available data assets and present them in an easy-to-use manner.
55/// These assets include dataflow-local collections and arrangements, as well as imported
56/// arrangements from outside the dataflow.
57///
58/// Context has a timestamp type `T`, which is the timestamp used by the scope in question.
59pub struct Context<'scope, T: RenderTimestamp> {
60    /// The scope within which all managed collections exist.
61    ///
62    /// It is an error to add any collections not contained in this scope.
63    pub(crate) scope: Scope<'scope, T>,
64    /// The debug name of the dataflow associated with this context.
65    pub debug_name: String,
66    /// The Timely ID of the dataflow associated with this context.
67    pub dataflow_id: usize,
68    /// The collection IDs of exports of the dataflow associated with this context.
69    pub export_ids: Vec<GlobalId>,
70    /// Frontier before which updates should not be emitted.
71    ///
72    /// We *must* apply it to sinks, to ensure correct outputs.
73    /// We *should* apply it to sources and imported traces, because it improves performance.
74    pub as_of_frontier: Antichain<mz_repr::Timestamp>,
75    /// Frontier after which updates should not be emitted.
76    /// Used to limit the amount of work done when appropriate.
77    pub until: Antichain<mz_repr::Timestamp>,
78    /// Bindings of identifiers to collections.
79    pub bindings: BTreeMap<Id, CollectionBundle<'scope, T>>,
80    /// The logger, from Timely's logging framework, if logs are enabled.
81    pub(super) compute_logger: Option<crate::logging::compute::Logger>,
82    /// Specification for rendering linear joins.
83    pub(super) linear_join_spec: LinearJoinSpec,
84    /// The expiration time for dataflows in this context. The output's frontier should never advance
85    /// past this frontier, except the empty frontier.
86    pub dataflow_expiration: Antichain<mz_repr::Timestamp>,
87    /// The config set for this context.
88    pub config_set: Rc<ConfigSet>,
89}
90
91impl<'scope, T: RenderTimestamp> Context<'scope, T> {
92    /// Creates a new empty Context.
93    pub fn for_dataflow_in<Plan>(
94        dataflow: &DataflowDescription<Plan, CollectionMetadata>,
95        scope: Scope<'scope, T>,
96        compute_state: &ComputeState,
97        until: Antichain<mz_repr::Timestamp>,
98        dataflow_expiration: Antichain<mz_repr::Timestamp>,
99    ) -> Self {
100        use mz_ore::collections::CollectionExt as IteratorExt;
101        let dataflow_id = *scope.addr().into_first();
102        let as_of_frontier = dataflow
103            .as_of
104            .clone()
105            .unwrap_or_else(|| Antichain::from_elem(Timestamp::minimum()));
106
107        let export_ids = dataflow.export_ids().collect();
108
109        // Skip compute event logging for transient dataflows. We do this to avoid overhead for
110        // slow-path peeks, but it also affects subscribes. For now that seems fine, but we may
111        // want to reconsider in the future.
112        let compute_logger = if dataflow.is_transient() {
113            None
114        } else {
115            compute_state.compute_logger.clone()
116        };
117
118        Self {
119            scope,
120            debug_name: dataflow.debug_name.clone(),
121            dataflow_id,
122            export_ids,
123            as_of_frontier,
124            until,
125            bindings: BTreeMap::new(),
126            compute_logger,
127            linear_join_spec: compute_state.linear_join_spec,
128            dataflow_expiration,
129            config_set: Rc::clone(&compute_state.worker_config),
130        }
131    }
132}
133
134impl<'scope, T: RenderTimestamp> Context<'scope, T> {
135    /// Insert a collection bundle by an identifier.
136    ///
137    /// This is expected to be used to install external collections (sources, indexes, other views),
138    /// as well as for `Let` bindings of local collections.
139    pub fn insert_id(
140        &mut self,
141        id: Id,
142        collection: CollectionBundle<'scope, T>,
143    ) -> Option<CollectionBundle<'scope, T>> {
144        self.bindings.insert(id, collection)
145    }
146    /// Remove a collection bundle by an identifier.
147    ///
148    /// The primary use of this method is uninstalling `Let` bindings.
149    pub fn remove_id(&mut self, id: Id) -> Option<CollectionBundle<'scope, T>> {
150        self.bindings.remove(&id)
151    }
152    /// Melds a collection bundle to whatever exists.
153    pub fn update_id(&mut self, id: Id, collection: CollectionBundle<'scope, T>) {
154        if !self.bindings.contains_key(&id) {
155            self.bindings.insert(id, collection);
156        } else {
157            let binding = self
158                .bindings
159                .get_mut(&id)
160                .expect("Binding verified to exist");
161            if collection.collection.is_some() {
162                binding.collection = collection.collection;
163            }
164            for (key, flavor) in collection.arranged.into_iter() {
165                binding.arranged.insert(key, flavor);
166            }
167        }
168    }
169    /// Look up a collection bundle by an identifier.
170    pub fn lookup_id(&self, id: Id) -> Option<CollectionBundle<'scope, T>> {
171        self.bindings.get(&id).cloned()
172    }
173
174    pub(super) fn error_logger(&self) -> ErrorLogger {
175        ErrorLogger::new(self.debug_name.clone())
176    }
177}
178
179impl<'scope, T: RenderTimestamp> Context<'scope, T> {
180    /// Brings the underlying arrangements and collections into a region.
181    pub fn enter_region<'a>(
182        &self,
183        region: Scope<'a, T>,
184        bindings: Option<&std::collections::BTreeSet<Id>>,
185    ) -> Context<'a, T> {
186        let bindings = self
187            .bindings
188            .iter()
189            .filter(|(key, _)| bindings.as_ref().map(|b| b.contains(key)).unwrap_or(true))
190            .map(|(key, bundle)| (*key, bundle.enter_region(region)))
191            .collect();
192
193        Context {
194            scope: region,
195            debug_name: self.debug_name.clone(),
196            dataflow_id: self.dataflow_id.clone(),
197            export_ids: self.export_ids.clone(),
198            as_of_frontier: self.as_of_frontier.clone(),
199            until: self.until.clone(),
200            compute_logger: self.compute_logger.clone(),
201            linear_join_spec: self.linear_join_spec.clone(),
202            bindings,
203            dataflow_expiration: self.dataflow_expiration.clone(),
204            config_set: Rc::clone(&self.config_set),
205        }
206    }
207}
208
209/// Describes flavor of arrangement: local or imported trace.
210#[derive(Clone)]
211pub enum ArrangementFlavor<'scope, T: RenderTimestamp> {
212    /// A dataflow-local arrangement.
213    Local(
214        Arranged<'scope, RowRowAgent<T, Diff>>,
215        Arranged<'scope, ErrAgent<T, Diff>>,
216    ),
217    /// An imported trace from outside the dataflow.
218    ///
219    /// The `GlobalId` identifier exists so that exports of this same trace
220    /// can refer back to and depend on the original instance.
221    Trace(
222        GlobalId,
223        Arranged<'scope, RowRowEnter<mz_repr::Timestamp, Diff, T>>,
224        Arranged<'scope, ErrEnter<mz_repr::Timestamp, T>>,
225    ),
226}
227
228impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> {
229    /// Presents `self` as a stream of updates.
230    ///
231    /// Deprecated: This function is not fueled and hence risks flattening the whole arrangement.
232    ///
233    /// This method presents the contents as they are, without further computation.
234    /// If you have logic that could be applied to each record, consider using the
235    /// `flat_map` methods which allows this and can reduce the work done.
236    #[deprecated(note = "Use `flat_map` instead.")]
237    pub fn as_collection(
238        &self,
239    ) -> (
240        VecCollection<'scope, T, Row, Diff>,
241        VecCollection<'scope, T, DataflowError, Diff>,
242    ) {
243        let mut datums = DatumVec::new();
244        let logic = move |k: DatumSeq, v: DatumSeq| {
245            let mut datums_borrow = datums.borrow();
246            datums_borrow.extend(k);
247            datums_borrow.extend(v);
248            SharedRow::pack(&**datums_borrow)
249        };
250        match &self {
251            ArrangementFlavor::Local(oks, errs) => (
252                oks.clone().as_collection(logic),
253                errs.clone().as_collection(|k, &()| k.clone()),
254            ),
255            ArrangementFlavor::Trace(_, oks, errs) => (
256                oks.clone().as_collection(logic),
257                errs.clone().as_collection(|k, &()| k.clone()),
258            ),
259        }
260    }
261
262    /// Constructs and applies logic to elements of `self` and returns the results.
263    ///
264    /// The `logic` receives a vector of datums, a timestamp, and a diff, and produces
265    /// an iterator of `(D, T, Diff)` updates.
266    ///
267    /// If `key` is set, this is a promise that `logic` will produce no results on
268    /// records for which the key does not evaluate to the value. This is used to
269    /// leap directly to exactly those records.
270    ///
271    /// The `max_demand` parameter limits the number of columns decoded from the
272    /// input. Only the first `max_demand` columns are decoded. Pass `usize::MAX` to
273    /// decode all columns.
274    pub fn flat_map<D, I, L>(
275        &self,
276        key: Option<&Row>,
277        max_demand: usize,
278        mut logic: L,
279    ) -> (
280        StreamVec<'scope, T, I::Item>,
281        VecCollection<'scope, T, DataflowError, Diff>,
282    )
283    where
284        I: IntoIterator<Item = (D, T, Diff)>,
285        D: Data,
286        L: for<'a, 'b> FnMut(&'a mut DatumVecBorrow<'b>, T, Diff) -> I + 'static,
287    {
288        // Set a number of tuples after which the operator should yield.
289        // This allows us to remain responsive even when enumerating a substantial
290        // arrangement, as well as provides time to accumulate our produced output.
291        let refuel = 1000000;
292
293        let mut datums = DatumVec::new();
294        let logic = move |k: DatumSeq, v: DatumSeq, t, d| {
295            let mut datums_borrow = datums.borrow();
296            datums_borrow.extend(k.to_datum_iter().take(max_demand));
297            let max_demand = max_demand.saturating_sub(datums_borrow.len());
298            datums_borrow.extend(v.to_datum_iter().take(max_demand));
299            logic(&mut datums_borrow, t, d)
300        };
301
302        match &self {
303            ArrangementFlavor::Local(oks, errs) => {
304                let oks = CollectionBundle::<T>::flat_map_core(oks.clone(), key, logic, refuel);
305                let errs = errs.clone().as_collection(|k, &()| k.clone());
306                (oks, errs)
307            }
308            ArrangementFlavor::Trace(_, oks, errs) => {
309                let oks = CollectionBundle::<T>::flat_map_core(oks.clone(), key, logic, refuel);
310                let errs = errs.clone().as_collection(|k, &()| k.clone());
311                (oks, errs)
312            }
313        }
314    }
315}
316impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> {
317    /// The scope containing the collection bundle.
318    pub fn scope(&self) -> Scope<'scope, T> {
319        match self {
320            ArrangementFlavor::Local(oks, _errs) => oks.stream.scope(),
321            ArrangementFlavor::Trace(_gid, oks, _errs) => oks.stream.scope(),
322        }
323    }
324
325    /// Brings the arrangement flavor into a region.
326    pub fn enter_region<'a>(&self, region: Scope<'a, T>) -> ArrangementFlavor<'a, T> {
327        match self {
328            ArrangementFlavor::Local(oks, errs) => ArrangementFlavor::Local(
329                oks.clone().enter_region(region),
330                errs.clone().enter_region(region),
331            ),
332            ArrangementFlavor::Trace(gid, oks, errs) => ArrangementFlavor::Trace(
333                *gid,
334                oks.clone().enter_region(region),
335                errs.clone().enter_region(region),
336            ),
337        }
338    }
339}
340impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> {
341    /// Extracts the arrangement flavor from a region.
342    pub fn leave_region<'outer>(&self, outer: Scope<'outer, T>) -> ArrangementFlavor<'outer, T> {
343        match self {
344            ArrangementFlavor::Local(oks, errs) => ArrangementFlavor::Local(
345                oks.clone().leave_region(outer),
346                errs.clone().leave_region(outer),
347            ),
348            ArrangementFlavor::Trace(gid, oks, errs) => ArrangementFlavor::Trace(
349                *gid,
350                oks.clone().leave_region(outer),
351                errs.clone().leave_region(outer),
352            ),
353        }
354    }
355}
356
357/// A bundle of the various ways a collection can be represented.
358///
359/// This type maintains the invariant that it does contain at least one valid
360/// source of data, either a collection or at least one arrangement.
361#[derive(Clone)]
362pub struct CollectionBundle<'scope, T: RenderTimestamp> {
363    pub collection: Option<(
364        VecCollection<'scope, T, Row, Diff>,
365        VecCollection<'scope, T, DataflowError, Diff>,
366    )>,
367    pub arranged: BTreeMap<Vec<MirScalarExpr>, ArrangementFlavor<'scope, T>>,
368}
369
370impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
371    /// Construct a new collection bundle from update streams.
372    pub fn from_collections(
373        oks: VecCollection<'scope, T, Row, Diff>,
374        errs: VecCollection<'scope, T, DataflowError, Diff>,
375    ) -> Self {
376        Self {
377            collection: Some((oks, errs)),
378            arranged: BTreeMap::default(),
379        }
380    }
381
382    /// Inserts arrangements by the expressions on which they are keyed.
383    pub fn from_expressions(
384        exprs: Vec<MirScalarExpr>,
385        arrangements: ArrangementFlavor<'scope, T>,
386    ) -> Self {
387        let mut arranged = BTreeMap::new();
388        arranged.insert(exprs, arrangements);
389        Self {
390            collection: None,
391            arranged,
392        }
393    }
394
395    /// Inserts arrangements by the columns on which they are keyed.
396    pub fn from_columns<I: IntoIterator<Item = usize>>(
397        columns: I,
398        arrangements: ArrangementFlavor<'scope, T>,
399    ) -> Self {
400        let mut keys = Vec::new();
401        for column in columns {
402            keys.push(MirScalarExpr::column(column));
403        }
404        Self::from_expressions(keys, arrangements)
405    }
406
407    /// The scope containing the collection bundle.
408    pub fn scope(&self) -> Scope<'scope, T> {
409        if let Some((oks, _errs)) = &self.collection {
410            oks.inner.scope()
411        } else {
412            self.arranged
413                .values()
414                .next()
415                .expect("Must contain a valid collection")
416                .scope()
417        }
418    }
419
420    /// Brings the collection bundle into a region.
421    pub fn enter_region<'inner>(&self, region: Scope<'inner, T>) -> CollectionBundle<'inner, T> {
422        CollectionBundle {
423            collection: self.collection.as_ref().map(|(oks, errs)| {
424                (
425                    oks.clone().enter_region(region),
426                    errs.clone().enter_region(region),
427                )
428            }),
429            arranged: self
430                .arranged
431                .iter()
432                .map(|(key, bundle)| (key.clone(), bundle.enter_region(region)))
433                .collect(),
434        }
435    }
436}
437
438impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
439    /// Extracts the collection bundle from a region.
440    pub fn leave_region<'outer>(&self, outer: Scope<'outer, T>) -> CollectionBundle<'outer, T> {
441        CollectionBundle {
442            collection: self.collection.as_ref().map(|(oks, errs)| {
443                (
444                    oks.clone().leave_region(outer),
445                    errs.clone().leave_region(outer),
446                )
447            }),
448            arranged: self
449                .arranged
450                .iter()
451                .map(|(key, bundle)| (key.clone(), bundle.leave_region(outer)))
452                .collect(),
453        }
454    }
455}
456
457impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
458    /// Asserts that the arrangement for a specific key
459    /// (or the raw collection for no key) exists,
460    /// and returns the corresponding collection.
461    ///
462    /// This returns the collection as-is, without
463    /// doing any unthinning transformation.
464    /// Therefore, it should be used when the appropriate transformation
465    /// was planned as part of a following MFP.
466    ///
467    /// If `key` is specified, the function converts the arrangement to a collection. It uses either
468    /// the fueled `flat_map` or `as_collection` method, depending on the flag
469    /// [`ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION`].
470    pub fn as_specific_collection(
471        &self,
472        key: Option<&[MirScalarExpr]>,
473        config_set: &ConfigSet,
474    ) -> (
475        VecCollection<'scope, T, Row, Diff>,
476        VecCollection<'scope, T, DataflowError, Diff>,
477    ) {
478        // Any operator that uses this method was told to use a particular
479        // collection during LIR planning, where we should have made
480        // sure that that collection exists.
481        //
482        // If it doesn't, we panic.
483        match key {
484            None => self
485                .collection
486                .clone()
487                .expect("The unarranged collection doesn't exist."),
488            Some(key) => {
489                let arranged = self.arranged.get(key).unwrap_or_else(|| {
490                    panic!("The collection arranged by {:?} doesn't exist.", key)
491                });
492                if ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION.get(config_set) {
493                    // Decode all columns, pass max_demand as usize::MAX.
494                    let (ok, err) = arranged.flat_map(None, usize::MAX, |borrow, t, r| {
495                        Some((SharedRow::pack(borrow.iter()), t, r))
496                    });
497                    (ok.as_collection(), err)
498                } else {
499                    #[allow(deprecated)]
500                    arranged.as_collection()
501                }
502            }
503        }
504    }
505
506    /// Constructs and applies logic to elements of a collection and returns the results.
507    ///
508    /// The function applies `logic` on elements. The logic conceptually receives
509    /// `(&Row, &Row)` pairs in the form of a datum vec in the expected order.
510    ///
511    /// If `key_val` is set, this is a promise that `logic` will produce no results on
512    /// records for which the key does not evaluate to the value. This is used when we
513    /// have an arrangement by that key to leap directly to exactly those records.
514    /// It is important that `logic` still guard against data that does not satisfy
515    /// this constraint, as this method does not statically know that it will have
516    /// that arrangement.
517    ///
518    /// The `max_demand` parameter limits the number of columns decoded from the
519    /// input. Only the first `max_demand` columns are decoded. Pass `usize::MAX` to
520    /// decode all columns.
521    pub fn flat_map<D, I, L>(
522        &self,
523        key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
524        max_demand: usize,
525        mut logic: L,
526    ) -> (
527        StreamVec<'scope, T, I::Item>,
528        VecCollection<'scope, T, DataflowError, Diff>,
529    )
530    where
531        I: IntoIterator<Item = (D, T, Diff)>,
532        D: Data,
533        L: for<'a> FnMut(&'a mut DatumVecBorrow<'_>, T, Diff) -> I + 'static,
534    {
535        // If `key_val` is set, we should have to use the corresponding arrangement.
536        // If there isn't one, that implies an error in the contract between
537        // key-production and available arrangements.
538        if let Some((key, val)) = key_val {
539            self.arrangement(&key)
540                .expect("Should have ensured during planning that this arrangement exists.")
541                .flat_map(val.as_ref(), max_demand, logic)
542        } else {
543            use timely::dataflow::operators::vec::Map;
544            let (oks, errs) = self
545                .collection
546                .clone()
547                .expect("Invariant violated: CollectionBundle contains no collection.");
548            let mut datums = DatumVec::new();
549            let oks = oks.inner.flat_map(move |(v, t, d)| {
550                logic(&mut datums.borrow_with_limit(&v, max_demand), t, d)
551            });
552            (oks, errs)
553        }
554    }
555
556    /// Factored out common logic for using literal keys in general traces.
557    ///
558    /// This logic is sufficiently interesting that we want to write it only
559    /// once, and thereby avoid any skew in the two uses of the logic.
560    ///
561    /// The function presents the contents of the trace as `(key, value, time, delta)` tuples,
562    /// where key and value are potentially specialized, but convertible into rows.
563    fn flat_map_core<Tr, D, I, L>(
564        trace: Arranged<'scope, Tr>,
565        key: Option<&<Tr::KeyContainer as BatchContainer>::Owned>,
566        mut logic: L,
567        refuel: usize,
568    ) -> StreamVec<'scope, T, I::Item>
569    where
570        Tr: for<'a> TraceReader<
571                Key<'a>: ToDatumIter,
572                Val<'a>: ToDatumIter,
573                Time = T,
574                Diff = mz_repr::Diff,
575            > + Clone
576            + 'static,
577        <Tr::KeyContainer as BatchContainer>::Owned: PartialEq,
578        I: IntoIterator<Item = (D, Tr::Time, Tr::Diff)>,
579        D: Data,
580        L: FnMut(Tr::Key<'_>, Tr::Val<'_>, T, mz_repr::Diff) -> I + 'static,
581    {
582        use differential_dataflow::consolidation::ConsolidatingContainerBuilder as CB;
583        let scope = trace.stream.scope();
584
585        let mut key_con = Tr::KeyContainer::with_capacity(1);
586        if let Some(key) = &key {
587            key_con.push_own(key);
588        }
589        let mode = if key.is_some() { "index" } else { "scan" };
590        let name = format!("ArrangementFlatMap({})", mode);
591        use timely::dataflow::operators::Operator;
592        trace
593            .stream
594            .unary::<CB<_>, _, _, _>(Pipeline, &name, move |_, info| {
595                // Acquire an activator to reschedule the operator when it has unfinished work.
596                let activator = scope.activator_for(info.address);
597                // Maintain a list of work to do, cursor to navigate and process.
598                let mut todo = std::collections::VecDeque::new();
599                move |input, output| {
600                    let key = key_con.get(0);
601                    // First, dequeue all batches.
602                    input.for_each(|time, data| {
603                        let capability = time.retain(0);
604                        for batch in data.iter() {
605                            // enqueue a capability, cursor, and batch.
606                            todo.push_back(PendingWork::new(
607                                capability.clone(),
608                                batch.cursor(),
609                                batch.clone(),
610                            ));
611                        }
612                    });
613
614                    // Second, make progress on `todo`.
615                    let mut fuel = refuel;
616                    while !todo.is_empty() && fuel > 0 {
617                        todo.front_mut().unwrap().do_work(
618                            key.as_ref(),
619                            &mut logic,
620                            &mut fuel,
621                            output,
622                        );
623                        if fuel > 0 {
624                            todo.pop_front();
625                        }
626                    }
627                    // If we have not finished all work, re-activate the operator.
628                    if !todo.is_empty() {
629                        activator.activate();
630                    }
631                }
632            })
633    }
634
635    /// Look up an arrangement by the expressions that form the key.
636    ///
637    /// The result may be `None` if no such arrangement exists, or it may be one of many
638    /// "arrangement flavors" that represent the types of arranged data we might have.
639    pub fn arrangement(&self, key: &[MirScalarExpr]) -> Option<ArrangementFlavor<'scope, T>> {
640        self.arranged.get(key).map(|x| x.clone())
641    }
642}
643
644impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
645    /// Presents `self` as a stream of updates, having been subjected to `mfp`.
646    ///
647    /// This operator is able to apply the logic of `mfp` early, which can substantially
648    /// reduce the amount of data produced when `mfp` is non-trivial.
649    ///
650    /// The `key_val` argument, when present, indicates that a specific arrangement should
651    /// be used, and if, in addition, the `val` component is present,
652    /// that we can seek to the supplied row.
653    pub fn as_collection_core(
654        &self,
655        mut mfp: MapFilterProject,
656        key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
657        until: Antichain<mz_repr::Timestamp>,
658        config_set: &ConfigSet,
659    ) -> (
660        VecCollection<'scope, T, mz_repr::Row, Diff>,
661        VecCollection<'scope, T, DataflowError, Diff>,
662    ) {
663        mfp.optimize();
664        let mfp_plan = mfp.clone().into_plan().unwrap();
665
666        // If the MFP is trivial, we can just call `as_collection`.
667        // In the case that we weren't going to apply the `key_val` optimization,
668        // this path results in a slightly smaller and faster
669        // dataflow graph, and is intended to fix
670        // https://github.com/MaterializeInc/database-issues/issues/3111
671        let has_key_val = if let Some((_key, Some(_val))) = &key_val {
672            true
673        } else {
674            false
675        };
676
677        if mfp_plan.is_identity() && !has_key_val {
678            let key = key_val.map(|(k, _v)| k);
679            return self.as_specific_collection(key.as_deref(), config_set);
680        }
681
682        let max_demand = mfp.demand().iter().max().map(|x| *x + 1).unwrap_or(0);
683        mfp.permute_fn(|c| c, max_demand);
684        mfp.optimize();
685        let mfp_plan = mfp.into_plan().unwrap();
686
687        let mut datum_vec = DatumVec::new();
688        // Wrap in an `Rc` so that lifetimes work out.
689        let until = std::rc::Rc::new(until);
690
691        let (stream, errors) = self.flat_map(key_val, max_demand, move |row_datums, time, diff| {
692            let mut row_builder = SharedRow::get();
693            let until = std::rc::Rc::clone(&until);
694            let temp_storage = RowArena::new();
695            let row_iter = row_datums.iter();
696            let mut datums_local = datum_vec.borrow();
697            datums_local.extend(row_iter);
698            let time = time.clone();
699            let event_time = time.event_time();
700            mfp_plan
701                .evaluate(
702                    &mut datums_local,
703                    &temp_storage,
704                    event_time,
705                    diff.clone(),
706                    move |time| !until.less_equal(time),
707                    &mut row_builder,
708                )
709                .map(move |x| match x {
710                    Ok((row, event_time, diff)) => {
711                        // Copy the whole time, and re-populate event time.
712                        let mut time: T = time.clone();
713                        *time.event_time_mut() = event_time;
714                        (Ok(row), time, diff)
715                    }
716                    Err((e, event_time, diff)) => {
717                        // Copy the whole time, and re-populate event time.
718                        let mut time: T = time.clone();
719                        *time.event_time_mut() = event_time;
720                        (Err(e), time, diff)
721                    }
722                })
723        });
724
725        use differential_dataflow::AsCollection;
726        let (oks, errs) = stream
727            .as_collection()
728            .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>(
729                "OkErr",
730                |x| x,
731            );
732
733        (oks, errors.concat(errs))
734    }
735    pub fn ensure_collections(
736        mut self,
737        collections: AvailableCollections,
738        input_key: Option<Vec<MirScalarExpr>>,
739        input_mfp: MapFilterProject,
740        until: Antichain<mz_repr::Timestamp>,
741        config_set: &ConfigSet,
742    ) -> Self {
743        if collections == Default::default() {
744            return self;
745        }
746        // Cache collection to avoid reforming it each time.
747        //
748        // TODO(mcsherry): In theory this could be faster run out of another arrangement,
749        // as the `map_fallible` that follows could be run against an arrangement itself.
750        //
751        // Note(btv): If we ever do that, we would then only need to make the raw collection here
752        // if `collections.raw` is true.
753
754        for (key, _, _) in collections.arranged.iter() {
755            soft_assert_or_log!(
756                !self.arranged.contains_key(key),
757                "LIR ArrangeBy tried to create an existing arrangement"
758            );
759        }
760
761        // We need the collection if either (1) it is explicitly demanded, or (2) we are going to render any arrangement
762        let form_raw_collection = collections.raw
763            || collections
764                .arranged
765                .iter()
766                .any(|(key, _, _)| !self.arranged.contains_key(key));
767        if form_raw_collection && self.collection.is_none() {
768            self.collection = Some(self.as_collection_core(
769                input_mfp,
770                input_key.map(|k| (k, None)),
771                until,
772                config_set,
773            ));
774        }
775        for (key, _, thinning) in collections.arranged {
776            if !self.arranged.contains_key(&key) {
777                // TODO: Consider allowing more expressive names.
778                let name = format!("ArrangeBy[{:?}]", key);
779
780                let (oks, errs) = self
781                    .collection
782                    .take()
783                    .expect("Collection constructed above");
784                let (oks, errs_keyed, passthrough) =
785                    Self::arrange_collection(&name, oks, key.clone(), thinning.clone());
786                let errs_concat: KeyCollection<_, _, _> = errs.clone().concat(errs_keyed).into();
787                self.collection = Some((passthrough, errs));
788                let errs =
789                    errs_concat.mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, ErrSpine<_, _>>(
790                        &format!("{}-errors", name),
791                    );
792                self.arranged
793                    .insert(key, ArrangementFlavor::Local(oks, errs));
794            }
795        }
796        self
797    }
798
799    /// Builds an arrangement from a collection, using the specified key and value thinning.
800    ///
801    /// The arrangement's key is based on the `key` expressions, and the value the input with
802    /// the `thinning` applied to it. It selects which of the input columns are included in the
803    /// value of the arrangement. The thinning is in support of permuting arrangements such that
804    /// columns in the key are not included in the value.
805    ///
806    /// In addition to the ok and err streams, we produce a passthrough stream that forwards
807    /// the input as-is, which allows downstream consumers to reuse the collection without
808    /// teeing the stream.
809    fn arrange_collection(
810        name: &String,
811        oks: VecCollection<'scope, T, Row, Diff>,
812        key: Vec<MirScalarExpr>,
813        thinning: Vec<usize>,
814    ) -> (
815        Arranged<'scope, RowRowAgent<T, Diff>>,
816        VecCollection<'scope, T, DataflowError, Diff>,
817        VecCollection<'scope, T, Row, Diff>,
818    ) {
819        // This operator implements a `map_fallible`, but produces columnar updates for the ok
820        // stream. The `map_fallible` cannot be used here because the closure cannot return
821        // references, which is what we need to push into columnar streams. Instead, we use a
822        // bespoke operator that also optimizes reuse of allocations across individual updates.
823        let mut builder = OperatorBuilder::new("FormArrangementKey".to_string(), oks.inner.scope());
824        let (ok_output, ok_stream) = builder.new_output();
825        let mut ok_output =
826            OutputBuilder::<_, ColumnBuilder<((Row, Row), T, Diff)>>::from(ok_output);
827        let (err_output, err_stream) = builder.new_output();
828        let mut err_output = OutputBuilder::from(err_output);
829        let (passthrough_output, passthrough_stream) = builder.new_output();
830        let mut passthrough_output = OutputBuilder::from(passthrough_output);
831        let mut input = builder.new_input(oks.inner, Pipeline);
832        builder.set_notify_for(0, FrontierInterest::Never);
833        builder.build(move |_capabilities| {
834            let mut key_buf = Row::default();
835            let mut val_buf = Row::default();
836            let mut datums = DatumVec::new();
837            let mut temp_storage = RowArena::new();
838            move |_frontiers| {
839                let mut ok_output = ok_output.activate();
840                let mut err_output = err_output.activate();
841                let mut passthrough_output = passthrough_output.activate();
842                input.for_each(|time, data| {
843                    let mut ok_session = ok_output.session_with_builder(&time);
844                    let mut err_session = err_output.session(&time);
845                    for (row, time, diff) in data.iter() {
846                        temp_storage.clear();
847                        let datums = datums.borrow_with(row);
848                        let key_iter = key.iter().map(|k| k.eval(&datums, &temp_storage));
849                        match key_buf.packer().try_extend(key_iter) {
850                            Ok(()) => {
851                                let val_datum_iter = thinning.iter().map(|c| datums[*c]);
852                                val_buf.packer().extend(val_datum_iter);
853                                ok_session.give(((&*key_buf, &*val_buf), time, diff));
854                            }
855                            Err(e) => {
856                                err_session.give((e.into(), time.clone(), *diff));
857                            }
858                        }
859                    }
860                    passthrough_output.session(&time).give_container(data);
861                });
862            }
863        });
864
865        let oks = ok_stream
866            .mz_arrange_core::<
867                _,
868                Col2ValBatcher<_, _, _, _>,
869                RowRowBuilder<_, _>,
870                RowRowSpine<_, _>,
871            >(
872                ExchangeCore::<ColumnBuilder<_>, _>::new_core(
873                    columnar_exchange::<Row, Row, T, Diff>,
874                ),
875                name
876            );
877        (
878            oks,
879            err_stream.as_collection(),
880            passthrough_stream.as_collection(),
881        )
882    }
883}
884
885struct PendingWork<C>
886where
887    C: Cursor,
888{
889    capability: Capability<C::Time>,
890    cursor: C,
891    batch: C::Storage,
892}
893
894impl<C> PendingWork<C>
895where
896    C: Cursor<KeyContainer: BatchContainer<Owned: PartialEq + Sized>>,
897{
898    /// Create a new bundle of pending work, from the capability, cursor, and backing storage.
899    fn new(capability: Capability<C::Time>, cursor: C, batch: C::Storage) -> Self {
900        Self {
901            capability,
902            cursor,
903            batch,
904        }
905    }
906    /// Perform roughly `fuel` work through the cursor, applying `logic` and sending results to `output`.
907    fn do_work<I, D, L>(
908        &mut self,
909        key: Option<&C::Key<'_>>,
910        logic: &mut L,
911        fuel: &mut usize,
912        output: &mut OutputBuilderSession<'_, C::Time, ConsolidatingContainerBuilder<Vec<I::Item>>>,
913    ) where
914        I: IntoIterator<Item = (D, C::Time, C::Diff)>,
915        D: Data,
916        L: FnMut(C::Key<'_>, C::Val<'_>, C::Time, C::Diff) -> I + 'static,
917    {
918        use differential_dataflow::consolidation::consolidate;
919
920        // Attempt to make progress on this batch.
921        let mut work: usize = 0;
922        let mut session = output.session_with_builder(&self.capability);
923        let mut buffer = Vec::new();
924        if let Some(key) = key {
925            let key = C::KeyContainer::reborrow(*key);
926            if self.cursor.get_key(&self.batch).map(|k| k == key) != Some(true) {
927                self.cursor.seek_key(&self.batch, key);
928            }
929            if self.cursor.get_key(&self.batch).map(|k| k == key) == Some(true) {
930                let key = self.cursor.key(&self.batch);
931                while let Some(val) = self.cursor.get_val(&self.batch) {
932                    self.cursor.map_times(&self.batch, |time, diff| {
933                        buffer.push((C::owned_time(time), C::owned_diff(diff)));
934                    });
935                    consolidate(&mut buffer);
936                    for (time, diff) in buffer.drain(..) {
937                        for datum in logic(key, val, time, diff) {
938                            session.give(datum);
939                            work += 1;
940                        }
941                    }
942                    self.cursor.step_val(&self.batch);
943                    if work >= *fuel {
944                        *fuel = 0;
945                        return;
946                    }
947                }
948            }
949        } else {
950            while let Some(key) = self.cursor.get_key(&self.batch) {
951                while let Some(val) = self.cursor.get_val(&self.batch) {
952                    self.cursor.map_times(&self.batch, |time, diff| {
953                        buffer.push((C::owned_time(time), C::owned_diff(diff)));
954                    });
955                    consolidate(&mut buffer);
956                    for (time, diff) in buffer.drain(..) {
957                        for datum in logic(key, val, time, diff) {
958                            session.give(datum);
959                            work += 1;
960                        }
961                    }
962                    self.cursor.step_val(&self.batch);
963                    if work >= *fuel {
964                        *fuel = 0;
965                        return;
966                    }
967                }
968                self.cursor.step_key(&self.batch);
969            }
970        }
971        *fuel -= work;
972    }
973}