mz_compute/render/
context.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Management of dataflow-local state, like arrangements, while building a
11//! dataflow.
12
13use std::cell::RefCell;
14use std::collections::BTreeMap;
15use std::rc::Rc;
16
17use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
18use differential_dataflow::operators::arrange::Arranged;
19use differential_dataflow::trace::implementations::BatchContainer;
20use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
21use differential_dataflow::{AsCollection, Collection, Data};
22use mz_compute_types::dataflows::DataflowDescription;
23use mz_compute_types::dyncfgs::ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION;
24use mz_compute_types::plan::AvailableCollections;
25use mz_dyncfg::ConfigSet;
26use mz_expr::{Id, MapFilterProject, MirScalarExpr};
27use mz_ore::soft_assert_or_log;
28use mz_repr::fixed_length::ToDatumIter;
29use mz_repr::{DatumVec, DatumVecBorrow, Diff, GlobalId, Row, RowArena, SharedRow};
30use mz_storage_types::controller::CollectionMetadata;
31use mz_storage_types::errors::DataflowError;
32use mz_timely_util::builder_async::{ButtonHandle, PressOnDropButton};
33use mz_timely_util::columnar::builder::ColumnBuilder;
34use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
35use mz_timely_util::operator::{CollectionExt, StreamExt};
36use timely::container::CapacityContainerBuilder;
37use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
38use timely::dataflow::operators::Capability;
39use timely::dataflow::operators::generic::OutputHandleCore;
40use timely::dataflow::scopes::Child;
41use timely::dataflow::{Scope, Stream};
42use timely::progress::timestamp::Refines;
43use timely::progress::{Antichain, Timestamp};
44
45use crate::compute_state::ComputeState;
46use crate::extensions::arrange::{KeyCollection, MzArrange, MzArrangeCore};
47use crate::render::errors::ErrorLogger;
48use crate::render::{LinearJoinSpec, RenderTimestamp};
49use crate::row_spine::{DatumSeq, RowRowBuilder};
50use crate::typedefs::{
51    ErrAgent, ErrBatcher, ErrBuilder, ErrEnter, ErrSpine, MzTimestamp, RowRowAgent, RowRowEnter,
52    RowRowSpine,
53};
54
55/// Dataflow-local collections and arrangements.
56///
57/// A context means to wrap available data assets and present them in an easy-to-use manner.
58/// These assets include dataflow-local collections and arrangements, as well as imported
59/// arrangements from outside the dataflow.
60///
61/// Context has two timestamp types, one from `S::Timestamp` and one from `T`, where the
62/// former must refine the latter. The former is the timestamp used by the scope in question,
63/// and the latter is the timestamp of imported traces. The two may be different in the case
64/// of regions or iteration.
65pub struct Context<S: Scope, T = mz_repr::Timestamp>
66where
67    T: MzTimestamp,
68    S::Timestamp: MzTimestamp + Refines<T>,
69{
70    /// The scope within which all managed collections exist.
71    ///
72    /// It is an error to add any collections not contained in this scope.
73    pub(crate) scope: S,
74    /// The debug name of the dataflow associated with this context.
75    pub debug_name: String,
76    /// The Timely ID of the dataflow associated with this context.
77    pub dataflow_id: usize,
78    /// The collection IDs of exports of the dataflow associated with this context.
79    pub export_ids: Vec<GlobalId>,
80    /// Frontier before which updates should not be emitted.
81    ///
82    /// We *must* apply it to sinks, to ensure correct outputs.
83    /// We *should* apply it to sources and imported traces, because it improves performance.
84    pub as_of_frontier: Antichain<T>,
85    /// Frontier after which updates should not be emitted.
86    /// Used to limit the amount of work done when appropriate.
87    pub until: Antichain<T>,
88    /// Bindings of identifiers to collections.
89    pub bindings: BTreeMap<Id, CollectionBundle<S, T>>,
90    /// A handle that operators can probe to know whether the dataflow is shutting down.
91    pub(super) shutdown_probe: ShutdownProbe,
92    /// The logger, from Timely's logging framework, if logs are enabled.
93    pub(super) compute_logger: Option<crate::logging::compute::Logger>,
94    /// Specification for rendering linear joins.
95    pub(super) linear_join_spec: LinearJoinSpec,
96    /// The expiration time for dataflows in this context. The output's frontier should never advance
97    /// past this frontier, except the empty frontier.
98    pub dataflow_expiration: Antichain<T>,
99    /// The config set for this context.
100    pub config_set: Rc<ConfigSet>,
101}
102
103impl<S: Scope> Context<S>
104where
105    S::Timestamp: MzTimestamp + Refines<mz_repr::Timestamp>,
106{
107    /// Creates a new empty Context.
108    pub fn for_dataflow_in<Plan>(
109        dataflow: &DataflowDescription<Plan, CollectionMetadata>,
110        scope: S,
111        compute_state: &ComputeState,
112        until: Antichain<mz_repr::Timestamp>,
113        dataflow_expiration: Antichain<mz_repr::Timestamp>,
114    ) -> Self {
115        use mz_ore::collections::CollectionExt as IteratorExt;
116        let dataflow_id = *scope.addr().into_first();
117        let as_of_frontier = dataflow
118            .as_of
119            .clone()
120            .unwrap_or_else(|| Antichain::from_elem(Timestamp::minimum()));
121
122        let export_ids = dataflow.export_ids().collect();
123
124        // Skip compute event logging for transient dataflows. We do this to avoid overhead for
125        // slow-path peeks, but it also affects subscribes. For now that seems fine, but we may
126        // want to reconsider in the future.
127        let compute_logger = if dataflow.is_transient() {
128            None
129        } else {
130            compute_state.compute_logger.clone()
131        };
132
133        Self {
134            scope,
135            debug_name: dataflow.debug_name.clone(),
136            dataflow_id,
137            export_ids,
138            as_of_frontier,
139            until,
140            bindings: BTreeMap::new(),
141            shutdown_probe: Default::default(),
142            compute_logger,
143            linear_join_spec: compute_state.linear_join_spec,
144            dataflow_expiration,
145            config_set: Rc::clone(&compute_state.worker_config),
146        }
147    }
148}
149
150impl<S: Scope, T> Context<S, T>
151where
152    T: MzTimestamp,
153    S::Timestamp: MzTimestamp + Refines<T>,
154{
155    /// Insert a collection bundle by an identifier.
156    ///
157    /// This is expected to be used to install external collections (sources, indexes, other views),
158    /// as well as for `Let` bindings of local collections.
159    pub fn insert_id(
160        &mut self,
161        id: Id,
162        collection: CollectionBundle<S, T>,
163    ) -> Option<CollectionBundle<S, T>> {
164        self.bindings.insert(id, collection)
165    }
166    /// Remove a collection bundle by an identifier.
167    ///
168    /// The primary use of this method is uninstalling `Let` bindings.
169    pub fn remove_id(&mut self, id: Id) -> Option<CollectionBundle<S, T>> {
170        self.bindings.remove(&id)
171    }
172    /// Melds a collection bundle to whatever exists.
173    pub fn update_id(&mut self, id: Id, collection: CollectionBundle<S, T>) {
174        if !self.bindings.contains_key(&id) {
175            self.bindings.insert(id, collection);
176        } else {
177            let binding = self
178                .bindings
179                .get_mut(&id)
180                .expect("Binding verified to exist");
181            if collection.collection.is_some() {
182                binding.collection = collection.collection;
183            }
184            for (key, flavor) in collection.arranged.into_iter() {
185                binding.arranged.insert(key, flavor);
186            }
187        }
188    }
189    /// Look up a collection bundle by an identifier.
190    pub fn lookup_id(&self, id: Id) -> Option<CollectionBundle<S, T>> {
191        self.bindings.get(&id).cloned()
192    }
193
194    pub(super) fn error_logger(&self) -> ErrorLogger {
195        ErrorLogger::new(self.shutdown_probe.clone(), self.debug_name.clone())
196    }
197}
198
199impl<S: Scope, T> Context<S, T>
200where
201    T: MzTimestamp,
202    S::Timestamp: MzTimestamp + Refines<T>,
203{
204    /// Brings the underlying arrangements and collections into a region.
205    pub fn enter_region<'a>(
206        &self,
207        region: &Child<'a, S, S::Timestamp>,
208        bindings: Option<&std::collections::BTreeSet<Id>>,
209    ) -> Context<Child<'a, S, S::Timestamp>, T> {
210        let bindings = self
211            .bindings
212            .iter()
213            .filter(|(key, _)| bindings.as_ref().map(|b| b.contains(key)).unwrap_or(true))
214            .map(|(key, bundle)| (*key, bundle.enter_region(region)))
215            .collect();
216
217        Context {
218            scope: region.clone(),
219            debug_name: self.debug_name.clone(),
220            dataflow_id: self.dataflow_id.clone(),
221            export_ids: self.export_ids.clone(),
222            as_of_frontier: self.as_of_frontier.clone(),
223            until: self.until.clone(),
224            shutdown_probe: self.shutdown_probe.clone(),
225            compute_logger: self.compute_logger.clone(),
226            linear_join_spec: self.linear_join_spec.clone(),
227            bindings,
228            dataflow_expiration: self.dataflow_expiration.clone(),
229            config_set: Rc::clone(&self.config_set),
230        }
231    }
232}
233
234pub(super) fn shutdown_token<G: Scope>(scope: &mut G) -> (ShutdownProbe, PressOnDropButton) {
235    let (button_handle, button) = mz_timely_util::builder_async::button(scope, scope.addr());
236    let probe = ShutdownProbe::new(button_handle);
237    let token = button.press_on_drop();
238    (probe, token)
239}
240
241/// Convenient wrapper around an optional `ButtonHandle` that can be used to check whether a
242/// dataflow is shutting down.
243///
244/// Instances created through the `Default` impl act as if the dataflow never shuts down.
245/// Instances created through [`ShutdownProbe::new`] defer to the wrapped button.
246#[derive(Clone, Default)]
247pub(super) struct ShutdownProbe(Option<Rc<RefCell<ButtonHandle>>>);
248
249impl ShutdownProbe {
250    /// Construct a `ShutdownProbe` instance that defers to `button`.
251    fn new(button: ButtonHandle) -> Self {
252        Self(Some(Rc::new(RefCell::new(button))))
253    }
254
255    /// Returns whether the dataflow is in the process of shutting down.
256    ///
257    /// The result of this method is synchronized among workers: It only returns `true` once all
258    /// workers have dropped their shutdown token.
259    pub(super) fn in_shutdown(&self) -> bool {
260        match &self.0 {
261            Some(t) => t.borrow_mut().all_pressed(),
262            None => false,
263        }
264    }
265
266    /// Returns whether the dataflow is in the process of shutting down on the current worker.
267    ///
268    /// In contrast to [`ShutdownProbe::in_shutdown`], this method returns `true` as soon as the
269    /// current worker has dropped its shutdown token, without waiting for other workers.
270    pub(super) fn in_local_shutdown(&self) -> bool {
271        match &self.0 {
272            Some(t) => t.borrow_mut().local_pressed(),
273            None => false,
274        }
275    }
276}
277
278/// Describes flavor of arrangement: local or imported trace.
279#[derive(Clone)]
280pub enum ArrangementFlavor<S: Scope, T = mz_repr::Timestamp>
281where
282    T: MzTimestamp,
283    S::Timestamp: MzTimestamp + Refines<T>,
284{
285    /// A dataflow-local arrangement.
286    Local(
287        Arranged<S, RowRowAgent<S::Timestamp, Diff>>,
288        Arranged<S, ErrAgent<S::Timestamp, Diff>>,
289    ),
290    /// An imported trace from outside the dataflow.
291    ///
292    /// The `GlobalId` identifier exists so that exports of this same trace
293    /// can refer back to and depend on the original instance.
294    Trace(
295        GlobalId,
296        Arranged<S, RowRowEnter<T, Diff, S::Timestamp>>,
297        Arranged<S, ErrEnter<T, S::Timestamp>>,
298    ),
299}
300
301impl<S: Scope, T> ArrangementFlavor<S, T>
302where
303    T: MzTimestamp,
304    S::Timestamp: MzTimestamp + Refines<T>,
305{
306    /// Presents `self` as a stream of updates.
307    ///
308    /// Deprecated: This function is not fueled and hence risks flattening the whole arrangement.
309    ///
310    /// This method presents the contents as they are, without further computation.
311    /// If you have logic that could be applied to each record, consider using the
312    /// `flat_map` methods which allows this and can reduce the work done.
313    #[deprecated(note = "Use `flat_map` instead.")]
314    pub fn as_collection(&self) -> (Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>) {
315        let mut datums = DatumVec::new();
316        let logic = move |k: DatumSeq, v: DatumSeq| {
317            let mut datums_borrow = datums.borrow();
318            datums_borrow.extend(k);
319            datums_borrow.extend(v);
320            SharedRow::pack(&**datums_borrow)
321        };
322        match &self {
323            ArrangementFlavor::Local(oks, errs) => (
324                oks.as_collection(logic),
325                errs.as_collection(|k, &()| k.clone()),
326            ),
327            ArrangementFlavor::Trace(_, oks, errs) => (
328                oks.as_collection(logic),
329                errs.as_collection(|k, &()| k.clone()),
330            ),
331        }
332    }
333
334    /// Constructs and applies logic to elements of `self` and returns the results.
335    ///
336    /// The `logic` receives a vector of datums, a timestamp, and a diff, and produces
337    /// an iterator of `(D, S::Timestamp, Diff)` updates.
338    ///
339    /// If `key` is set, this is a promise that `logic` will produce no results on
340    /// records for which the key does not evaluate to the value. This is used to
341    /// leap directly to exactly those records.
342    ///
343    /// The `max_demand` parameter limits the number of columns decoded from the
344    /// input. Only the first `max_demand` columns are decoded. Pass `usize::MAX` to
345    /// decode all columns.
346    pub fn flat_map<D, I, L>(
347        &self,
348        key: Option<&Row>,
349        max_demand: usize,
350        mut logic: L,
351    ) -> (Stream<S, I::Item>, Collection<S, DataflowError, Diff>)
352    where
353        I: IntoIterator<Item = (D, S::Timestamp, Diff)>,
354        D: Data,
355        L: for<'a, 'b> FnMut(&'a mut DatumVecBorrow<'b>, S::Timestamp, Diff) -> I + 'static,
356    {
357        // Set a number of tuples after which the operator should yield.
358        // This allows us to remain responsive even when enumerating a substantial
359        // arrangement, as well as provides time to accumulate our produced output.
360        let refuel = 1000000;
361
362        let mut datums = DatumVec::new();
363        let logic = move |k: DatumSeq, v: DatumSeq, t, d| {
364            let mut datums_borrow = datums.borrow();
365            datums_borrow.extend(k.to_datum_iter().take(max_demand));
366            let max_demand = max_demand.saturating_sub(datums_borrow.len());
367            datums_borrow.extend(v.to_datum_iter().take(max_demand));
368            logic(&mut datums_borrow, t, d)
369        };
370
371        match &self {
372            ArrangementFlavor::Local(oks, errs) => {
373                let oks = CollectionBundle::<S, T>::flat_map_core(oks, key, logic, refuel);
374                let errs = errs.as_collection(|k, &()| k.clone());
375                (oks, errs)
376            }
377            ArrangementFlavor::Trace(_, oks, errs) => {
378                let oks = CollectionBundle::<S, T>::flat_map_core(oks, key, logic, refuel);
379                let errs = errs.as_collection(|k, &()| k.clone());
380                (oks, errs)
381            }
382        }
383    }
384}
385impl<S: Scope, T> ArrangementFlavor<S, T>
386where
387    T: MzTimestamp,
388    S::Timestamp: MzTimestamp + Refines<T>,
389{
390    /// The scope containing the collection bundle.
391    pub fn scope(&self) -> S {
392        match self {
393            ArrangementFlavor::Local(oks, _errs) => oks.stream.scope(),
394            ArrangementFlavor::Trace(_gid, oks, _errs) => oks.stream.scope(),
395        }
396    }
397
398    /// Brings the arrangement flavor into a region.
399    pub fn enter_region<'a>(
400        &self,
401        region: &Child<'a, S, S::Timestamp>,
402    ) -> ArrangementFlavor<Child<'a, S, S::Timestamp>, T> {
403        match self {
404            ArrangementFlavor::Local(oks, errs) => {
405                ArrangementFlavor::Local(oks.enter_region(region), errs.enter_region(region))
406            }
407            ArrangementFlavor::Trace(gid, oks, errs) => {
408                ArrangementFlavor::Trace(*gid, oks.enter_region(region), errs.enter_region(region))
409            }
410        }
411    }
412}
413impl<'a, S: Scope, T> ArrangementFlavor<Child<'a, S, S::Timestamp>, T>
414where
415    T: MzTimestamp,
416    S::Timestamp: MzTimestamp + Refines<T>,
417{
418    /// Extracts the arrangement flavor from a region.
419    pub fn leave_region(&self) -> ArrangementFlavor<S, T> {
420        match self {
421            ArrangementFlavor::Local(oks, errs) => {
422                ArrangementFlavor::Local(oks.leave_region(), errs.leave_region())
423            }
424            ArrangementFlavor::Trace(gid, oks, errs) => {
425                ArrangementFlavor::Trace(*gid, oks.leave_region(), errs.leave_region())
426            }
427        }
428    }
429}
430
431/// A bundle of the various ways a collection can be represented.
432///
433/// This type maintains the invariant that it does contain at least one valid
434/// source of data, either a collection or at least one arrangement.
435#[derive(Clone)]
436pub struct CollectionBundle<S: Scope, T = mz_repr::Timestamp>
437where
438    T: MzTimestamp,
439    S::Timestamp: MzTimestamp + Refines<T>,
440{
441    pub collection: Option<(Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>)>,
442    pub arranged: BTreeMap<Vec<MirScalarExpr>, ArrangementFlavor<S, T>>,
443}
444
445impl<S: Scope, T> CollectionBundle<S, T>
446where
447    T: MzTimestamp,
448    S::Timestamp: MzTimestamp + Refines<T>,
449{
450    /// Construct a new collection bundle from update streams.
451    pub fn from_collections(
452        oks: Collection<S, Row, Diff>,
453        errs: Collection<S, DataflowError, Diff>,
454    ) -> Self {
455        Self {
456            collection: Some((oks, errs)),
457            arranged: BTreeMap::default(),
458        }
459    }
460
461    /// Inserts arrangements by the expressions on which they are keyed.
462    pub fn from_expressions(
463        exprs: Vec<MirScalarExpr>,
464        arrangements: ArrangementFlavor<S, T>,
465    ) -> Self {
466        let mut arranged = BTreeMap::new();
467        arranged.insert(exprs, arrangements);
468        Self {
469            collection: None,
470            arranged,
471        }
472    }
473
474    /// Inserts arrangements by the columns on which they are keyed.
475    pub fn from_columns<I: IntoIterator<Item = usize>>(
476        columns: I,
477        arrangements: ArrangementFlavor<S, T>,
478    ) -> Self {
479        let mut keys = Vec::new();
480        for column in columns {
481            keys.push(MirScalarExpr::column(column));
482        }
483        Self::from_expressions(keys, arrangements)
484    }
485
486    /// The scope containing the collection bundle.
487    pub fn scope(&self) -> S {
488        if let Some((oks, _errs)) = &self.collection {
489            oks.inner.scope()
490        } else {
491            self.arranged
492                .values()
493                .next()
494                .expect("Must contain a valid collection")
495                .scope()
496        }
497    }
498
499    /// Brings the collection bundle into a region.
500    pub fn enter_region<'a>(
501        &self,
502        region: &Child<'a, S, S::Timestamp>,
503    ) -> CollectionBundle<Child<'a, S, S::Timestamp>, T> {
504        CollectionBundle {
505            collection: self
506                .collection
507                .as_ref()
508                .map(|(oks, errs)| (oks.enter_region(region), errs.enter_region(region))),
509            arranged: self
510                .arranged
511                .iter()
512                .map(|(key, bundle)| (key.clone(), bundle.enter_region(region)))
513                .collect(),
514        }
515    }
516}
517
518impl<'a, S: Scope, T> CollectionBundle<Child<'a, S, S::Timestamp>, T>
519where
520    T: MzTimestamp,
521    S::Timestamp: MzTimestamp + Refines<T>,
522{
523    /// Extracts the collection bundle from a region.
524    pub fn leave_region(&self) -> CollectionBundle<S, T> {
525        CollectionBundle {
526            collection: self
527                .collection
528                .as_ref()
529                .map(|(oks, errs)| (oks.leave_region(), errs.leave_region())),
530            arranged: self
531                .arranged
532                .iter()
533                .map(|(key, bundle)| (key.clone(), bundle.leave_region()))
534                .collect(),
535        }
536    }
537}
538
539impl<S: Scope, T> CollectionBundle<S, T>
540where
541    T: MzTimestamp,
542    S::Timestamp: MzTimestamp + Refines<T>,
543{
544    /// Asserts that the arrangement for a specific key
545    /// (or the raw collection for no key) exists,
546    /// and returns the corresponding collection.
547    ///
548    /// This returns the collection as-is, without
549    /// doing any unthinning transformation.
550    /// Therefore, it should be used when the appropriate transformation
551    /// was planned as part of a following MFP.
552    ///
553    /// If `key` is specified, the function converts the arrangement to a collection. It uses either
554    /// the fueled `flat_map` or `as_collection` method, depending on the flag
555    /// [`ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION`].
556    pub fn as_specific_collection(
557        &self,
558        key: Option<&[MirScalarExpr]>,
559        config_set: &ConfigSet,
560    ) -> (Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>) {
561        // Any operator that uses this method was told to use a particular
562        // collection during LIR planning, where we should have made
563        // sure that that collection exists.
564        //
565        // If it doesn't, we panic.
566        match key {
567            None => self
568                .collection
569                .clone()
570                .expect("The unarranged collection doesn't exist."),
571            Some(key) => {
572                let arranged = self.arranged.get(key).unwrap_or_else(|| {
573                    panic!("The collection arranged by {:?} doesn't exist.", key)
574                });
575                if ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION.get(config_set) {
576                    // Decode all columns, pass max_demand as usize::MAX.
577                    let (ok, err) = arranged.flat_map(None, usize::MAX, |borrow, t, r| {
578                        Some((SharedRow::pack(borrow.iter()), t, r))
579                    });
580                    (ok.as_collection(), err)
581                } else {
582                    #[allow(deprecated)]
583                    arranged.as_collection()
584                }
585            }
586        }
587    }
588
589    /// Constructs and applies logic to elements of a collection and returns the results.
590    ///
591    /// The function applies `logic` on elements. The logic conceptually receives
592    /// `(&Row, &Row)` pairs in the form of a datum vec in the expected order.
593    ///
594    /// If `key_val` is set, this is a promise that `logic` will produce no results on
595    /// records for which the key does not evaluate to the value. This is used when we
596    /// have an arrangement by that key to leap directly to exactly those records.
597    /// It is important that `logic` still guard against data that does not satisfy
598    /// this constraint, as this method does not statically know that it will have
599    /// that arrangement.
600    ///
601    /// The `max_demand` parameter limits the number of columns decoded from the
602    /// input. Only the first `max_demand` columns are decoded. Pass `usize::MAX` to
603    /// decode all columns.
604    pub fn flat_map<D, I, L>(
605        &self,
606        key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
607        max_demand: usize,
608        mut logic: L,
609    ) -> (Stream<S, I::Item>, Collection<S, DataflowError, Diff>)
610    where
611        I: IntoIterator<Item = (D, S::Timestamp, Diff)>,
612        D: Data,
613        L: for<'a> FnMut(&'a mut DatumVecBorrow<'_>, S::Timestamp, Diff) -> I + 'static,
614    {
615        // If `key_val` is set, we should have to use the corresponding arrangement.
616        // If there isn't one, that implies an error in the contract between
617        // key-production and available arrangements.
618        if let Some((key, val)) = key_val {
619            self.arrangement(&key)
620                .expect("Should have ensured during planning that this arrangement exists.")
621                .flat_map(val.as_ref(), max_demand, logic)
622        } else {
623            use timely::dataflow::operators::Map;
624            let (oks, errs) = self
625                .collection
626                .clone()
627                .expect("Invariant violated: CollectionBundle contains no collection.");
628            let mut datums = DatumVec::new();
629            let oks = oks.inner.flat_map(move |(v, t, d)| {
630                logic(&mut datums.borrow_with_limit(&v, max_demand), t, d)
631            });
632            (oks, errs)
633        }
634    }
635
636    /// Factored out common logic for using literal keys in general traces.
637    ///
638    /// This logic is sufficiently interesting that we want to write it only
639    /// once, and thereby avoid any skew in the two uses of the logic.
640    ///
641    /// The function presents the contents of the trace as `(key, value, time, delta)` tuples,
642    /// where key and value are potentially specialized, but convertible into rows.
643    fn flat_map_core<Tr, D, I, L>(
644        trace: &Arranged<S, Tr>,
645        key: Option<&Tr::KeyOwn>,
646        mut logic: L,
647        refuel: usize,
648    ) -> Stream<S, I::Item>
649    where
650        Tr: for<'a> TraceReader<
651                Key<'a>: ToDatumIter,
652                KeyOwn: PartialEq,
653                Val<'a>: ToDatumIter,
654                Time = S::Timestamp,
655                Diff = mz_repr::Diff,
656            > + Clone
657            + 'static,
658        I: IntoIterator<Item = (D, Tr::Time, Tr::Diff)>,
659        D: Data,
660        L: FnMut(Tr::Key<'_>, Tr::Val<'_>, S::Timestamp, mz_repr::Diff) -> I + 'static,
661    {
662        use differential_dataflow::consolidation::ConsolidatingContainerBuilder as CB;
663
664        let mut key_con = Tr::KeyContainer::with_capacity(1);
665        if let Some(key) = &key {
666            key_con.push_own(key);
667        }
668        let mode = if key.is_some() { "index" } else { "scan" };
669        let name = format!("ArrangementFlatMap({})", mode);
670        use timely::dataflow::operators::Operator;
671        trace
672            .stream
673            .unary::<CB<_>, _, _, _>(Pipeline, &name, move |_, info| {
674                // Acquire an activator to reschedule the operator when it has unfinished work.
675                let activator = trace.stream.scope().activator_for(info.address);
676                // Maintain a list of work to do, cursor to navigate and process.
677                let mut todo = std::collections::VecDeque::new();
678                move |input, output| {
679                    let key = key_con.get(0);
680                    // First, dequeue all batches.
681                    input.for_each(|time, data| {
682                        let capability = time.retain();
683                        for batch in data.iter() {
684                            // enqueue a capability, cursor, and batch.
685                            todo.push_back(PendingWork::new(
686                                capability.clone(),
687                                batch.cursor(),
688                                batch.clone(),
689                            ));
690                        }
691                    });
692
693                    // Second, make progress on `todo`.
694                    let mut fuel = refuel;
695                    while !todo.is_empty() && fuel > 0 {
696                        todo.front_mut().unwrap().do_work(
697                            key.as_ref(),
698                            &mut logic,
699                            &mut fuel,
700                            output,
701                        );
702                        if fuel > 0 {
703                            todo.pop_front();
704                        }
705                    }
706                    // If we have not finished all work, re-activate the operator.
707                    if !todo.is_empty() {
708                        activator.activate();
709                    }
710                }
711            })
712    }
713
714    /// Look up an arrangement by the expressions that form the key.
715    ///
716    /// The result may be `None` if no such arrangement exists, or it may be one of many
717    /// "arrangement flavors" that represent the types of arranged data we might have.
718    pub fn arrangement(&self, key: &[MirScalarExpr]) -> Option<ArrangementFlavor<S, T>> {
719        self.arranged.get(key).map(|x| x.clone())
720    }
721}
722
723impl<S, T> CollectionBundle<S, T>
724where
725    T: MzTimestamp,
726    S: Scope,
727    S::Timestamp: Refines<T> + RenderTimestamp,
728{
729    /// Presents `self` as a stream of updates, having been subjected to `mfp`.
730    ///
731    /// This operator is able to apply the logic of `mfp` early, which can substantially
732    /// reduce the amount of data produced when `mfp` is non-trivial.
733    ///
734    /// The `key_val` argument, when present, indicates that a specific arrangement should
735    /// be used, and if, in addition, the `val` component is present,
736    /// that we can seek to the supplied row.
737    pub fn as_collection_core(
738        &self,
739        mut mfp: MapFilterProject,
740        key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
741        until: Antichain<mz_repr::Timestamp>,
742        config_set: &ConfigSet,
743    ) -> (
744        Collection<S, mz_repr::Row, Diff>,
745        Collection<S, DataflowError, Diff>,
746    ) {
747        mfp.optimize();
748        let mfp_plan = mfp.clone().into_plan().unwrap();
749
750        // If the MFP is trivial, we can just call `as_collection`.
751        // In the case that we weren't going to apply the `key_val` optimization,
752        // this path results in a slightly smaller and faster
753        // dataflow graph, and is intended to fix
754        // https://github.com/MaterializeInc/database-issues/issues/3111
755        let has_key_val = if let Some((_key, Some(_val))) = &key_val {
756            true
757        } else {
758            false
759        };
760
761        if mfp_plan.is_identity() && !has_key_val {
762            let key = key_val.map(|(k, _v)| k);
763            return self.as_specific_collection(key.as_deref(), config_set);
764        }
765
766        let max_demand = mfp.demand().iter().max().map(|x| *x + 1).unwrap_or(0);
767        mfp.permute_fn(|c| c, max_demand);
768        mfp.optimize();
769        let mfp_plan = mfp.into_plan().unwrap();
770
771        let mut datum_vec = DatumVec::new();
772        // Wrap in an `Rc` so that lifetimes work out.
773        let until = std::rc::Rc::new(until);
774
775        let (stream, errors) = self.flat_map(key_val, max_demand, move |row_datums, time, diff| {
776            let mut row_builder = SharedRow::get();
777            let until = std::rc::Rc::clone(&until);
778            let temp_storage = RowArena::new();
779            let row_iter = row_datums.iter();
780            let mut datums_local = datum_vec.borrow();
781            datums_local.extend(row_iter);
782            let time = time.clone();
783            let event_time = time.event_time();
784            mfp_plan
785                .evaluate(
786                    &mut datums_local,
787                    &temp_storage,
788                    event_time,
789                    diff.clone(),
790                    move |time| !until.less_equal(time),
791                    &mut row_builder,
792                )
793                .map(move |x| match x {
794                    Ok((row, event_time, diff)) => {
795                        // Copy the whole time, and re-populate event time.
796                        let mut time: S::Timestamp = time.clone();
797                        *time.event_time_mut() = event_time;
798                        (Ok(row), time, diff)
799                    }
800                    Err((e, event_time, diff)) => {
801                        // Copy the whole time, and re-populate event time.
802                        let mut time: S::Timestamp = time.clone();
803                        *time.event_time_mut() = event_time;
804                        (Err(e), time, diff)
805                    }
806                })
807        });
808
809        use differential_dataflow::AsCollection;
810        let (oks, errs) = stream
811            .as_collection()
812            .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>(
813                "OkErr",
814                |x| x,
815            );
816
817        (oks, errors.concat(&errs))
818    }
819    pub fn ensure_collections(
820        mut self,
821        collections: AvailableCollections,
822        input_key: Option<Vec<MirScalarExpr>>,
823        input_mfp: MapFilterProject,
824        until: Antichain<mz_repr::Timestamp>,
825        config_set: &ConfigSet,
826    ) -> Self {
827        if collections == Default::default() {
828            return self;
829        }
830        // Cache collection to avoid reforming it each time.
831        //
832        // TODO(mcsherry): In theory this could be faster run out of another arrangement,
833        // as the `map_fallible` that follows could be run against an arrangement itself.
834        //
835        // Note(btv): If we ever do that, we would then only need to make the raw collection here
836        // if `collections.raw` is true.
837
838        for (key, _, _) in collections.arranged.iter() {
839            soft_assert_or_log!(
840                !self.arranged.contains_key(key),
841                "LIR ArrangeBy tried to create an existing arrangement"
842            );
843        }
844
845        // We need the collection if either (1) it is explicitly demanded, or (2) we are going to render any arrangement
846        let form_raw_collection = collections.raw
847            || collections
848                .arranged
849                .iter()
850                .any(|(key, _, _)| !self.arranged.contains_key(key));
851        if form_raw_collection && self.collection.is_none() {
852            self.collection = Some(self.as_collection_core(
853                input_mfp,
854                input_key.map(|k| (k, None)),
855                until,
856                config_set,
857            ));
858        }
859        for (key, _, thinning) in collections.arranged {
860            if !self.arranged.contains_key(&key) {
861                // TODO: Consider allowing more expressive names.
862                let name = format!("ArrangeBy[{:?}]", key);
863
864                let (oks, errs) = self
865                    .collection
866                    .clone()
867                    .expect("Collection constructed above");
868                let (oks, errs_keyed) =
869                    Self::arrange_collection(&name, oks, key.clone(), thinning.clone());
870                let errs: KeyCollection<_, _, _> = errs.concat(&errs_keyed).into();
871                let errs = errs.mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, ErrSpine<_, _>>(
872                    &format!("{}-errors", name),
873                );
874                self.arranged
875                    .insert(key, ArrangementFlavor::Local(oks, errs));
876            }
877        }
878        self
879    }
880
881    /// Builds an arrangement from a collection, using the specified key and value thinning.
882    ///
883    /// The arrangement's key is based on the `key` expressions, and the value the input with
884    /// the `thinning` applied to it. It selects which of the input columns are included in the
885    /// value of the arrangement. The thinning is in support of permuting arrangements such that
886    /// columns in the key are not included in the value.
887    fn arrange_collection(
888        name: &String,
889        oks: Collection<S, Row, Diff>,
890        key: Vec<MirScalarExpr>,
891        thinning: Vec<usize>,
892    ) -> (
893        Arranged<S, RowRowAgent<S::Timestamp, Diff>>,
894        Collection<S, DataflowError, Diff>,
895    ) {
896        // The following `unary_fallible` implements a `map_fallible`, but produces columnar updates
897        // for the ok stream. The `map_fallible` cannot be used here because the closure cannot
898        // return references, which is what we need to push into columnar streams. Instead, we use
899        // a bespoke operator that also optimizes reuse of allocations across individual updates.
900        let (oks, errs) = oks
901            .inner
902            .unary_fallible::<ColumnBuilder<((Row, Row), S::Timestamp, Diff)>, _, _, _>(
903                Pipeline,
904                "FormArrangementKey",
905                move |_, _| {
906                    Box::new(move |input, ok, err| {
907                        let mut key_buf = Row::default();
908                        let mut val_buf = Row::default();
909                        let mut datums = DatumVec::new();
910                        let mut temp_storage = RowArena::new();
911                        while let Some((time, data)) = input.next() {
912                            let mut ok_session = ok.session_with_builder(&time);
913                            let mut err_session = err.session(&time);
914                            for (row, time, diff) in data.iter() {
915                                temp_storage.clear();
916                                let datums = datums.borrow_with(row);
917                                let key_iter = key.iter().map(|k| k.eval(&datums, &temp_storage));
918                                match key_buf.packer().try_extend(key_iter) {
919                                    Ok(()) => {
920                                        let val_datum_iter = thinning.iter().map(|c| datums[*c]);
921                                        val_buf.packer().extend(val_datum_iter);
922                                        ok_session.give(((&*key_buf, &*val_buf), time, diff));
923                                    }
924                                    Err(e) => {
925                                        err_session.give((e.into(), time.clone(), *diff));
926                                    }
927                                }
928                            }
929                        }
930                    })
931                },
932            );
933        let oks = oks
934            .mz_arrange_core::<_, Col2ValBatcher<_, _,_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
935                ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<Row, Row, S::Timestamp, Diff>),name
936            );
937        (oks, errs.as_collection())
938    }
939}
940
941struct PendingWork<C>
942where
943    C: Cursor,
944{
945    capability: Capability<C::Time>,
946    cursor: C,
947    batch: C::Storage,
948}
949
950impl<C> PendingWork<C>
951where
952    C: Cursor<KeyOwn: PartialEq + Sized>,
953{
954    /// Create a new bundle of pending work, from the capability, cursor, and backing storage.
955    fn new(capability: Capability<C::Time>, cursor: C, batch: C::Storage) -> Self {
956        Self {
957            capability,
958            cursor,
959            batch,
960        }
961    }
962    /// Perform roughly `fuel` work through the cursor, applying `logic` and sending results to `output`.
963    fn do_work<I, D, L>(
964        &mut self,
965        key: Option<&C::Key<'_>>,
966        logic: &mut L,
967        fuel: &mut usize,
968        output: &mut OutputHandleCore<
969            '_,
970            C::Time,
971            ConsolidatingContainerBuilder<Vec<I::Item>>,
972            timely::dataflow::channels::pushers::Tee<C::Time, Vec<I::Item>>,
973        >,
974    ) where
975        I: IntoIterator<Item = (D, C::Time, C::Diff)>,
976        D: Data,
977        L: FnMut(C::Key<'_>, C::Val<'_>, C::Time, C::Diff) -> I + 'static,
978    {
979        use differential_dataflow::consolidation::consolidate;
980
981        // Attempt to make progress on this batch.
982        let mut work: usize = 0;
983        let mut session = output.session_with_builder(&self.capability);
984        let mut buffer = Vec::new();
985        if let Some(key) = key {
986            let key = C::KeyContainer::reborrow(*key);
987            if self.cursor.get_key(&self.batch).map(|k| k == key) != Some(true) {
988                self.cursor.seek_key(&self.batch, key);
989            }
990            if self.cursor.get_key(&self.batch).map(|k| k == key) == Some(true) {
991                let key = self.cursor.key(&self.batch);
992                while let Some(val) = self.cursor.get_val(&self.batch) {
993                    self.cursor.map_times(&self.batch, |time, diff| {
994                        buffer.push((C::owned_time(time), C::owned_diff(diff)));
995                    });
996                    consolidate(&mut buffer);
997                    for (time, diff) in buffer.drain(..) {
998                        for datum in logic(key, val, time, diff) {
999                            session.give(datum);
1000                            work += 1;
1001                        }
1002                    }
1003                    self.cursor.step_val(&self.batch);
1004                    if work >= *fuel {
1005                        *fuel = 0;
1006                        return;
1007                    }
1008                }
1009            }
1010        } else {
1011            while let Some(key) = self.cursor.get_key(&self.batch) {
1012                while let Some(val) = self.cursor.get_val(&self.batch) {
1013                    self.cursor.map_times(&self.batch, |time, diff| {
1014                        buffer.push((C::owned_time(time), C::owned_diff(diff)));
1015                    });
1016                    consolidate(&mut buffer);
1017                    for (time, diff) in buffer.drain(..) {
1018                        for datum in logic(key, val, time, diff) {
1019                            session.give(datum);
1020                            work += 1;
1021                        }
1022                    }
1023                    self.cursor.step_val(&self.batch);
1024                    if work >= *fuel {
1025                        *fuel = 0;
1026                        return;
1027                    }
1028                }
1029                self.cursor.step_key(&self.batch);
1030            }
1031        }
1032        *fuel -= work;
1033    }
1034}