mz_compute/render/
context.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Management of dataflow-local state, like arrangements, while building a
11//! dataflow.
12
13use std::cell::RefCell;
14use std::collections::BTreeMap;
15use std::rc::Rc;
16use std::sync::mpsc;
17
18use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
19use differential_dataflow::operators::arrange::Arranged;
20use differential_dataflow::trace::implementations::BatchContainer;
21use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
22use differential_dataflow::{AsCollection, Collection, Data};
23use mz_compute_types::dataflows::DataflowDescription;
24use mz_compute_types::dyncfgs::ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION;
25use mz_compute_types::plan::{AvailableCollections, LirId};
26use mz_dyncfg::ConfigSet;
27use mz_expr::{Id, MapFilterProject, MirScalarExpr};
28use mz_ore::soft_assert_or_log;
29use mz_repr::fixed_length::ToDatumIter;
30use mz_repr::{DatumVec, DatumVecBorrow, Diff, GlobalId, Row, RowArena, SharedRow};
31use mz_storage_types::controller::CollectionMetadata;
32use mz_storage_types::errors::DataflowError;
33use mz_timely_util::builder_async::{ButtonHandle, PressOnDropButton};
34use mz_timely_util::columnar::builder::ColumnBuilder;
35use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
36use mz_timely_util::operator::{CollectionExt, StreamExt};
37use timely::Container;
38use timely::container::CapacityContainerBuilder;
39use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
40use timely::dataflow::operators::Capability;
41use timely::dataflow::operators::generic::OutputHandleCore;
42use timely::dataflow::scopes::Child;
43use timely::dataflow::{Scope, Stream};
44use timely::progress::timestamp::Refines;
45use timely::progress::{Antichain, Timestamp};
46use tracing::error;
47
48use crate::compute_state::{ComputeState, HydrationEvent};
49use crate::extensions::arrange::{KeyCollection, MzArrange, MzArrangeCore};
50use crate::render::errors::ErrorLogger;
51use crate::render::{LinearJoinSpec, RenderTimestamp};
52use crate::row_spine::{DatumSeq, RowRowBuilder};
53use crate::typedefs::{
54    ErrAgent, ErrBatcher, ErrBuilder, ErrEnter, ErrSpine, MzTimestamp, RowRowAgent, RowRowEnter,
55    RowRowSpine,
56};
57
58/// Dataflow-local collections and arrangements.
59///
60/// A context means to wrap available data assets and present them in an easy-to-use manner.
61/// These assets include dataflow-local collections and arrangements, as well as imported
62/// arrangements from outside the dataflow.
63///
64/// Context has two timestamp types, one from `S::Timestamp` and one from `T`, where the
65/// former must refine the latter. The former is the timestamp used by the scope in question,
66/// and the latter is the timestamp of imported traces. The two may be different in the case
67/// of regions or iteration.
68pub struct Context<S: Scope, T = mz_repr::Timestamp>
69where
70    T: MzTimestamp,
71    S::Timestamp: MzTimestamp + Refines<T>,
72{
73    /// The scope within which all managed collections exist.
74    ///
75    /// It is an error to add any collections not contained in this scope.
76    pub(crate) scope: S,
77    /// The debug name of the dataflow associated with this context.
78    pub debug_name: String,
79    /// The Timely ID of the dataflow associated with this context.
80    pub dataflow_id: usize,
81    /// Frontier before which updates should not be emitted.
82    ///
83    /// We *must* apply it to sinks, to ensure correct outputs.
84    /// We *should* apply it to sources and imported traces, because it improves performance.
85    pub as_of_frontier: Antichain<T>,
86    /// Frontier after which updates should not be emitted.
87    /// Used to limit the amount of work done when appropriate.
88    pub until: Antichain<T>,
89    /// Bindings of identifiers to collections.
90    pub bindings: BTreeMap<Id, CollectionBundle<S, T>>,
91    /// A handle that operators can probe to know whether the dataflow is shutting down.
92    pub(super) shutdown_probe: ShutdownProbe,
93    /// A logger that operators can use to report hydration events.
94    ///
95    /// `None` if no hydration events should be logged in this context.
96    pub(super) hydration_logger: Option<HydrationLogger>,
97    /// The logger, from Timely's logging framework, if logs are enabled.
98    pub(super) compute_logger: Option<crate::logging::compute::Logger>,
99    /// Specification for rendering linear joins.
100    pub(super) linear_join_spec: LinearJoinSpec,
101    /// The expiration time for dataflows in this context. The output's frontier should never advance
102    /// past this frontier, except the empty frontier.
103    pub dataflow_expiration: Antichain<T>,
104    /// The config set for this context.
105    pub config_set: Rc<ConfigSet>,
106}
107
108impl<S: Scope> Context<S>
109where
110    S::Timestamp: MzTimestamp + Refines<mz_repr::Timestamp>,
111{
112    /// Creates a new empty Context.
113    pub fn for_dataflow_in<Plan>(
114        dataflow: &DataflowDescription<Plan, CollectionMetadata>,
115        scope: S,
116        compute_state: &ComputeState,
117        until: Antichain<mz_repr::Timestamp>,
118        dataflow_expiration: Antichain<mz_repr::Timestamp>,
119    ) -> Self {
120        use mz_ore::collections::CollectionExt as IteratorExt;
121        let dataflow_id = *scope.addr().into_first();
122        let as_of_frontier = dataflow
123            .as_of
124            .clone()
125            .unwrap_or_else(|| Antichain::from_elem(Timestamp::minimum()));
126
127        // Skip operator hydration logging for transient dataflows. We do this to avoid overhead
128        // for slow-path peeks, but it also affects subscribes. For now that seems fine, but we may
129        // want to reconsider in the future.
130        //
131        // Similarly, we won't capture a compute_logger for logging LIR->address mappings for transient dataflows.
132        let (hydration_logger, compute_logger) = if dataflow.is_transient() {
133            (None, None)
134        } else {
135            (
136                Some(HydrationLogger {
137                    export_ids: dataflow.export_ids().collect(),
138                    tx: compute_state.hydration_tx.clone(),
139                }),
140                compute_state.compute_logger.clone(),
141            )
142        };
143
144        Self {
145            scope,
146            debug_name: dataflow.debug_name.clone(),
147            dataflow_id,
148            as_of_frontier,
149            until,
150            bindings: BTreeMap::new(),
151            shutdown_probe: Default::default(),
152            hydration_logger,
153            compute_logger,
154            linear_join_spec: compute_state.linear_join_spec,
155            dataflow_expiration,
156            config_set: Rc::clone(&compute_state.worker_config),
157        }
158    }
159}
160
161impl<S: Scope, T> Context<S, T>
162where
163    T: MzTimestamp,
164    S::Timestamp: MzTimestamp + Refines<T>,
165{
166    /// Insert a collection bundle by an identifier.
167    ///
168    /// This is expected to be used to install external collections (sources, indexes, other views),
169    /// as well as for `Let` bindings of local collections.
170    pub fn insert_id(
171        &mut self,
172        id: Id,
173        collection: CollectionBundle<S, T>,
174    ) -> Option<CollectionBundle<S, T>> {
175        self.bindings.insert(id, collection)
176    }
177    /// Remove a collection bundle by an identifier.
178    ///
179    /// The primary use of this method is uninstalling `Let` bindings.
180    pub fn remove_id(&mut self, id: Id) -> Option<CollectionBundle<S, T>> {
181        self.bindings.remove(&id)
182    }
183    /// Melds a collection bundle to whatever exists.
184    pub fn update_id(&mut self, id: Id, collection: CollectionBundle<S, T>) {
185        if !self.bindings.contains_key(&id) {
186            self.bindings.insert(id, collection);
187        } else {
188            let binding = self
189                .bindings
190                .get_mut(&id)
191                .expect("Binding verified to exist");
192            if collection.collection.is_some() {
193                binding.collection = collection.collection;
194            }
195            for (key, flavor) in collection.arranged.into_iter() {
196                binding.arranged.insert(key, flavor);
197            }
198        }
199    }
200    /// Look up a collection bundle by an identifier.
201    pub fn lookup_id(&self, id: Id) -> Option<CollectionBundle<S, T>> {
202        self.bindings.get(&id).cloned()
203    }
204
205    pub(super) fn error_logger(&self) -> ErrorLogger {
206        ErrorLogger::new(self.shutdown_probe.clone(), self.debug_name.clone())
207    }
208}
209
210impl<S: Scope, T> Context<S, T>
211where
212    T: MzTimestamp,
213    S::Timestamp: MzTimestamp + Refines<T>,
214{
215    /// Brings the underlying arrangements and collections into a region.
216    pub fn enter_region<'a>(
217        &self,
218        region: &Child<'a, S, S::Timestamp>,
219        bindings: Option<&std::collections::BTreeSet<Id>>,
220    ) -> Context<Child<'a, S, S::Timestamp>, T> {
221        let bindings = self
222            .bindings
223            .iter()
224            .filter(|(key, _)| bindings.as_ref().map(|b| b.contains(key)).unwrap_or(true))
225            .map(|(key, bundle)| (*key, bundle.enter_region(region)))
226            .collect();
227
228        Context {
229            scope: region.clone(),
230            debug_name: self.debug_name.clone(),
231            dataflow_id: self.dataflow_id.clone(),
232            as_of_frontier: self.as_of_frontier.clone(),
233            until: self.until.clone(),
234            shutdown_probe: self.shutdown_probe.clone(),
235            hydration_logger: self.hydration_logger.clone(),
236            compute_logger: self.compute_logger.clone(),
237            linear_join_spec: self.linear_join_spec.clone(),
238            bindings,
239            dataflow_expiration: self.dataflow_expiration.clone(),
240            config_set: Rc::clone(&self.config_set),
241        }
242    }
243}
244
245pub(super) fn shutdown_token<G: Scope>(scope: &mut G) -> (ShutdownProbe, PressOnDropButton) {
246    let (button_handle, button) = mz_timely_util::builder_async::button(scope, scope.addr());
247    let probe = ShutdownProbe::new(button_handle);
248    let token = button.press_on_drop();
249    (probe, token)
250}
251
252/// Convenient wrapper around an optional `ButtonHandle` that can be used to check whether a
253/// dataflow is shutting down.
254///
255/// Instances created through the `Default` impl act as if the dataflow never shuts down.
256/// Instances created through [`ShutdownProbe::new`] defer to the wrapped button.
257#[derive(Clone, Default)]
258pub(super) struct ShutdownProbe(Option<Rc<RefCell<ButtonHandle>>>);
259
260impl ShutdownProbe {
261    /// Construct a `ShutdownProbe` instance that defers to `button`.
262    fn new(button: ButtonHandle) -> Self {
263        Self(Some(Rc::new(RefCell::new(button))))
264    }
265
266    /// Returns whether the dataflow is in the process of shutting down.
267    ///
268    /// The result of this method is synchronized among workers: It only returns `true` once all
269    /// workers have dropped their shutdown token.
270    pub(super) fn in_shutdown(&self) -> bool {
271        match &self.0 {
272            Some(t) => t.borrow_mut().all_pressed(),
273            None => false,
274        }
275    }
276
277    /// Returns whether the dataflow is in the process of shutting down on the current worker.
278    ///
279    /// In contrast to [`ShutdownProbe::in_shutdown`], this method returns `true` as soon as the
280    /// current worker has dropped its shutdown token, without waiting for other workers.
281    pub(super) fn in_local_shutdown(&self) -> bool {
282        match &self.0 {
283            Some(t) => t.borrow_mut().local_pressed(),
284            None => false,
285        }
286    }
287}
288
289/// A logger for operator hydration events emitted for a dataflow export.
290#[derive(Clone)]
291pub(super) struct HydrationLogger {
292    export_ids: Vec<GlobalId>,
293    tx: mpsc::Sender<HydrationEvent>,
294}
295
296impl HydrationLogger {
297    /// Log a hydration event for the identified LIR node.
298    ///
299    /// The expectation is that rendering code arranges for `hydrated = false` to be logged for
300    /// each LIR node when a dataflow is first created. Then `hydrated = true` should be logged as
301    /// operators become hydrated.
302    pub fn log(&self, lir_id: LirId, hydrated: bool) {
303        for &export_id in &self.export_ids {
304            let event = HydrationEvent {
305                export_id,
306                lir_id,
307                hydrated,
308            };
309            if self.tx.send(event).is_err() {
310                error!("hydration event receiver dropped unexpectely");
311            }
312        }
313    }
314}
315
316/// Describes flavor of arrangement: local or imported trace.
317#[derive(Clone)]
318pub enum ArrangementFlavor<S: Scope, T = mz_repr::Timestamp>
319where
320    T: MzTimestamp,
321    S::Timestamp: MzTimestamp + Refines<T>,
322{
323    /// A dataflow-local arrangement.
324    Local(
325        Arranged<S, RowRowAgent<S::Timestamp, Diff>>,
326        Arranged<S, ErrAgent<S::Timestamp, Diff>>,
327    ),
328    /// An imported trace from outside the dataflow.
329    ///
330    /// The `GlobalId` identifier exists so that exports of this same trace
331    /// can refer back to and depend on the original instance.
332    Trace(
333        GlobalId,
334        Arranged<S, RowRowEnter<T, Diff, S::Timestamp>>,
335        Arranged<S, ErrEnter<T, S::Timestamp>>,
336    ),
337}
338
339impl<S: Scope, T> ArrangementFlavor<S, T>
340where
341    T: MzTimestamp,
342    S::Timestamp: MzTimestamp + Refines<T>,
343{
344    /// Presents `self` as a stream of updates.
345    ///
346    /// Deprecated: This function is not fueled and hence risks flattening the whole arrangement.
347    ///
348    /// This method presents the contents as they are, without further computation.
349    /// If you have logic that could be applied to each record, consider using the
350    /// `flat_map` methods which allows this and can reduce the work done.
351    #[deprecated(note = "Use `flat_map` instead.")]
352    pub fn as_collection(&self) -> (Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>) {
353        let mut datums = DatumVec::new();
354        let logic = move |k: DatumSeq, v: DatumSeq| {
355            let mut datums_borrow = datums.borrow();
356            datums_borrow.extend(k);
357            datums_borrow.extend(v);
358            SharedRow::pack(&**datums_borrow)
359        };
360        match &self {
361            ArrangementFlavor::Local(oks, errs) => (
362                oks.as_collection(logic),
363                errs.as_collection(|k, &()| k.clone()),
364            ),
365            ArrangementFlavor::Trace(_, oks, errs) => (
366                oks.as_collection(logic),
367                errs.as_collection(|k, &()| k.clone()),
368            ),
369        }
370    }
371
372    /// Constructs and applies logic to elements of `self` and returns the results.
373    ///
374    /// The `logic` receives a vector of datums, a timestamp, and a diff, and produces
375    /// an iterator of `(D, S::Timestamp, Diff)` updates.
376    ///
377    /// If `key` is set, this is a promise that `logic` will produce no results on
378    /// records for which the key does not evaluate to the value. This is used to
379    /// leap directly to exactly those records.
380    ///
381    /// The `max_demand` parameter limits the number of columns decoded from the
382    /// input. Only the first `max_demand` columns are decoded. Pass `usize::MAX` to
383    /// decode all columns.
384    pub fn flat_map<D, I, L>(
385        &self,
386        key: Option<&Row>,
387        max_demand: usize,
388        mut logic: L,
389    ) -> (Stream<S, I::Item>, Collection<S, DataflowError, Diff>)
390    where
391        I: IntoIterator<Item = (D, S::Timestamp, Diff)>,
392        D: Data,
393        L: for<'a, 'b> FnMut(&'a mut DatumVecBorrow<'b>, S::Timestamp, Diff) -> I + 'static,
394    {
395        // Set a number of tuples after which the operator should yield.
396        // This allows us to remain responsive even when enumerating a substantial
397        // arrangement, as well as provides time to accumulate our produced output.
398        let refuel = 1000000;
399
400        let mut datums = DatumVec::new();
401        let logic = move |k: DatumSeq, v: DatumSeq, t, d| {
402            let mut datums_borrow = datums.borrow();
403            datums_borrow.extend(k.to_datum_iter().take(max_demand));
404            let max_demand = max_demand.saturating_sub(datums_borrow.len());
405            datums_borrow.extend(v.to_datum_iter().take(max_demand));
406            logic(&mut datums_borrow, t, d)
407        };
408
409        match &self {
410            ArrangementFlavor::Local(oks, errs) => {
411                let oks = CollectionBundle::<S, T>::flat_map_core(oks, key, logic, refuel);
412                let errs = errs.as_collection(|k, &()| k.clone());
413                (oks, errs)
414            }
415            ArrangementFlavor::Trace(_, oks, errs) => {
416                let oks = CollectionBundle::<S, T>::flat_map_core(oks, key, logic, refuel);
417                let errs = errs.as_collection(|k, &()| k.clone());
418                (oks, errs)
419            }
420        }
421    }
422}
423impl<S: Scope, T> ArrangementFlavor<S, T>
424where
425    T: MzTimestamp,
426    S::Timestamp: MzTimestamp + Refines<T>,
427{
428    /// The scope containing the collection bundle.
429    pub fn scope(&self) -> S {
430        match self {
431            ArrangementFlavor::Local(oks, _errs) => oks.stream.scope(),
432            ArrangementFlavor::Trace(_gid, oks, _errs) => oks.stream.scope(),
433        }
434    }
435
436    /// Brings the arrangement flavor into a region.
437    pub fn enter_region<'a>(
438        &self,
439        region: &Child<'a, S, S::Timestamp>,
440    ) -> ArrangementFlavor<Child<'a, S, S::Timestamp>, T> {
441        match self {
442            ArrangementFlavor::Local(oks, errs) => {
443                ArrangementFlavor::Local(oks.enter_region(region), errs.enter_region(region))
444            }
445            ArrangementFlavor::Trace(gid, oks, errs) => {
446                ArrangementFlavor::Trace(*gid, oks.enter_region(region), errs.enter_region(region))
447            }
448        }
449    }
450}
451impl<'a, S: Scope, T> ArrangementFlavor<Child<'a, S, S::Timestamp>, T>
452where
453    T: MzTimestamp,
454    S::Timestamp: MzTimestamp + Refines<T>,
455{
456    /// Extracts the arrangement flavor from a region.
457    pub fn leave_region(&self) -> ArrangementFlavor<S, T> {
458        match self {
459            ArrangementFlavor::Local(oks, errs) => {
460                ArrangementFlavor::Local(oks.leave_region(), errs.leave_region())
461            }
462            ArrangementFlavor::Trace(gid, oks, errs) => {
463                ArrangementFlavor::Trace(*gid, oks.leave_region(), errs.leave_region())
464            }
465        }
466    }
467}
468
469/// A bundle of the various ways a collection can be represented.
470///
471/// This type maintains the invariant that it does contain at least one valid
472/// source of data, either a collection or at least one arrangement.
473#[derive(Clone)]
474pub struct CollectionBundle<S: Scope, T = mz_repr::Timestamp>
475where
476    T: MzTimestamp,
477    S::Timestamp: MzTimestamp + Refines<T>,
478{
479    pub collection: Option<(Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>)>,
480    pub arranged: BTreeMap<Vec<MirScalarExpr>, ArrangementFlavor<S, T>>,
481}
482
483impl<S: Scope, T> CollectionBundle<S, T>
484where
485    T: MzTimestamp,
486    S::Timestamp: MzTimestamp + Refines<T>,
487{
488    /// Construct a new collection bundle from update streams.
489    pub fn from_collections(
490        oks: Collection<S, Row, Diff>,
491        errs: Collection<S, DataflowError, Diff>,
492    ) -> Self {
493        Self {
494            collection: Some((oks, errs)),
495            arranged: BTreeMap::default(),
496        }
497    }
498
499    /// Inserts arrangements by the expressions on which they are keyed.
500    pub fn from_expressions(
501        exprs: Vec<MirScalarExpr>,
502        arrangements: ArrangementFlavor<S, T>,
503    ) -> Self {
504        let mut arranged = BTreeMap::new();
505        arranged.insert(exprs, arrangements);
506        Self {
507            collection: None,
508            arranged,
509        }
510    }
511
512    /// Inserts arrangements by the columns on which they are keyed.
513    pub fn from_columns<I: IntoIterator<Item = usize>>(
514        columns: I,
515        arrangements: ArrangementFlavor<S, T>,
516    ) -> Self {
517        let mut keys = Vec::new();
518        for column in columns {
519            keys.push(MirScalarExpr::column(column));
520        }
521        Self::from_expressions(keys, arrangements)
522    }
523
524    /// The scope containing the collection bundle.
525    pub fn scope(&self) -> S {
526        if let Some((oks, _errs)) = &self.collection {
527            oks.inner.scope()
528        } else {
529            self.arranged
530                .values()
531                .next()
532                .expect("Must contain a valid collection")
533                .scope()
534        }
535    }
536
537    /// Brings the collection bundle into a region.
538    pub fn enter_region<'a>(
539        &self,
540        region: &Child<'a, S, S::Timestamp>,
541    ) -> CollectionBundle<Child<'a, S, S::Timestamp>, T> {
542        CollectionBundle {
543            collection: self
544                .collection
545                .as_ref()
546                .map(|(oks, errs)| (oks.enter_region(region), errs.enter_region(region))),
547            arranged: self
548                .arranged
549                .iter()
550                .map(|(key, bundle)| (key.clone(), bundle.enter_region(region)))
551                .collect(),
552        }
553    }
554}
555
556impl<'a, S: Scope, T> CollectionBundle<Child<'a, S, S::Timestamp>, T>
557where
558    T: MzTimestamp,
559    S::Timestamp: MzTimestamp + Refines<T>,
560{
561    /// Extracts the collection bundle from a region.
562    pub fn leave_region(&self) -> CollectionBundle<S, T> {
563        CollectionBundle {
564            collection: self
565                .collection
566                .as_ref()
567                .map(|(oks, errs)| (oks.leave_region(), errs.leave_region())),
568            arranged: self
569                .arranged
570                .iter()
571                .map(|(key, bundle)| (key.clone(), bundle.leave_region()))
572                .collect(),
573        }
574    }
575}
576
577impl<S: Scope, T> CollectionBundle<S, T>
578where
579    T: MzTimestamp,
580    S::Timestamp: MzTimestamp + Refines<T>,
581{
582    /// Asserts that the arrangement for a specific key
583    /// (or the raw collection for no key) exists,
584    /// and returns the corresponding collection.
585    ///
586    /// This returns the collection as-is, without
587    /// doing any unthinning transformation.
588    /// Therefore, it should be used when the appropriate transformation
589    /// was planned as part of a following MFP.
590    ///
591    /// If `key` is specified, the function converts the arrangement to a collection. It uses either
592    /// the fueled `flat_map` or `as_collection` method, depending on the flag
593    /// [`ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION`].
594    pub fn as_specific_collection(
595        &self,
596        key: Option<&[MirScalarExpr]>,
597        config_set: &ConfigSet,
598    ) -> (Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>) {
599        // Any operator that uses this method was told to use a particular
600        // collection during LIR planning, where we should have made
601        // sure that that collection exists.
602        //
603        // If it doesn't, we panic.
604        match key {
605            None => self
606                .collection
607                .clone()
608                .expect("The unarranged collection doesn't exist."),
609            Some(key) => {
610                let arranged = self.arranged.get(key).unwrap_or_else(|| {
611                    panic!("The collection arranged by {:?} doesn't exist.", key)
612                });
613                if ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION.get(config_set) {
614                    // Decode all columns, pass max_demand as usize::MAX.
615                    let (ok, err) = arranged.flat_map(None, usize::MAX, |borrow, t, r| {
616                        Some((SharedRow::pack(borrow.iter()), t, r))
617                    });
618                    (ok.as_collection(), err)
619                } else {
620                    #[allow(deprecated)]
621                    arranged.as_collection()
622                }
623            }
624        }
625    }
626
627    /// Constructs and applies logic to elements of a collection and returns the results.
628    ///
629    /// The function applies `logic` on elements. The logic conceptually receives
630    /// `(&Row, &Row)` pairs in the form of a datum vec in the expected order.
631    ///
632    /// If `key_val` is set, this is a promise that `logic` will produce no results on
633    /// records for which the key does not evaluate to the value. This is used when we
634    /// have an arrangement by that key to leap directly to exactly those records.
635    /// It is important that `logic` still guard against data that does not satisfy
636    /// this constraint, as this method does not statically know that it will have
637    /// that arrangement.
638    ///
639    /// The `max_demand` parameter limits the number of columns decoded from the
640    /// input. Only the first `max_demand` columns are decoded. Pass `usize::MAX` to
641    /// decode all columns.
642    pub fn flat_map<D, I, L>(
643        &self,
644        key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
645        max_demand: usize,
646        mut logic: L,
647    ) -> (Stream<S, I::Item>, Collection<S, DataflowError, Diff>)
648    where
649        I: IntoIterator<Item = (D, S::Timestamp, Diff)>,
650        D: Data,
651        L: for<'a> FnMut(&'a mut DatumVecBorrow<'_>, S::Timestamp, Diff) -> I + 'static,
652    {
653        // If `key_val` is set, we should have to use the corresponding arrangement.
654        // If there isn't one, that implies an error in the contract between
655        // key-production and available arrangements.
656        if let Some((key, val)) = key_val {
657            self.arrangement(&key)
658                .expect("Should have ensured during planning that this arrangement exists.")
659                .flat_map(val.as_ref(), max_demand, logic)
660        } else {
661            use timely::dataflow::operators::Map;
662            let (oks, errs) = self
663                .collection
664                .clone()
665                .expect("Invariant violated: CollectionBundle contains no collection.");
666            let mut datums = DatumVec::new();
667            let oks = oks.inner.flat_map(move |(v, t, d)| {
668                logic(&mut datums.borrow_with_limit(&v, max_demand), t, d)
669            });
670            (oks, errs)
671        }
672    }
673
674    /// Factored out common logic for using literal keys in general traces.
675    ///
676    /// This logic is sufficiently interesting that we want to write it only
677    /// once, and thereby avoid any skew in the two uses of the logic.
678    ///
679    /// The function presents the contents of the trace as `(key, value, time, delta)` tuples,
680    /// where key and value are potentially specialized, but convertible into rows.
681    fn flat_map_core<Tr, D, I, L>(
682        trace: &Arranged<S, Tr>,
683        key: Option<&Tr::KeyOwn>,
684        mut logic: L,
685        refuel: usize,
686    ) -> Stream<S, I::Item>
687    where
688        Tr: for<'a> TraceReader<
689                Key<'a>: ToDatumIter,
690                KeyOwn: PartialEq,
691                Val<'a>: ToDatumIter,
692                Time = S::Timestamp,
693                Diff = mz_repr::Diff,
694            > + Clone
695            + 'static,
696        I: IntoIterator<Item = (D, Tr::Time, Tr::Diff)>,
697        D: Data,
698        L: FnMut(Tr::Key<'_>, Tr::Val<'_>, S::Timestamp, mz_repr::Diff) -> I + 'static,
699    {
700        use differential_dataflow::consolidation::ConsolidatingContainerBuilder as CB;
701
702        let mut key_con = Tr::KeyContainer::with_capacity(1);
703        if let Some(key) = &key {
704            key_con.push_own(key);
705        }
706        let mode = if key.is_some() { "index" } else { "scan" };
707        let name = format!("ArrangementFlatMap({})", mode);
708        use timely::dataflow::operators::Operator;
709        trace
710            .stream
711            .unary::<CB<_>, _, _, _>(Pipeline, &name, move |_, info| {
712                // Acquire an activator to reschedule the operator when it has unfinished work.
713                let activator = trace.stream.scope().activator_for(info.address);
714                // Maintain a list of work to do, cursor to navigate and process.
715                let mut todo = std::collections::VecDeque::new();
716                move |input, output| {
717                    let key = key_con.get(0);
718                    // First, dequeue all batches.
719                    input.for_each(|time, data| {
720                        let capability = time.retain();
721                        for batch in data.iter() {
722                            // enqueue a capability, cursor, and batch.
723                            todo.push_back(PendingWork::new(
724                                capability.clone(),
725                                batch.cursor(),
726                                batch.clone(),
727                            ));
728                        }
729                    });
730
731                    // Second, make progress on `todo`.
732                    let mut fuel = refuel;
733                    while !todo.is_empty() && fuel > 0 {
734                        todo.front_mut().unwrap().do_work(
735                            key.as_ref(),
736                            &mut logic,
737                            &mut fuel,
738                            output,
739                        );
740                        if fuel > 0 {
741                            todo.pop_front();
742                        }
743                    }
744                    // If we have not finished all work, re-activate the operator.
745                    if !todo.is_empty() {
746                        activator.activate();
747                    }
748                }
749            })
750    }
751
752    /// Look up an arrangement by the expressions that form the key.
753    ///
754    /// The result may be `None` if no such arrangement exists, or it may be one of many
755    /// "arrangement flavors" that represent the types of arranged data we might have.
756    pub fn arrangement(&self, key: &[MirScalarExpr]) -> Option<ArrangementFlavor<S, T>> {
757        self.arranged.get(key).map(|x| x.clone())
758    }
759}
760
761impl<S, T> CollectionBundle<S, T>
762where
763    T: MzTimestamp,
764    S: Scope,
765    S::Timestamp: Refines<T> + RenderTimestamp,
766{
767    /// Presents `self` as a stream of updates, having been subjected to `mfp`.
768    ///
769    /// This operator is able to apply the logic of `mfp` early, which can substantially
770    /// reduce the amount of data produced when `mfp` is non-trivial.
771    ///
772    /// The `key_val` argument, when present, indicates that a specific arrangement should
773    /// be used, and if, in addition, the `val` component is present,
774    /// that we can seek to the supplied row.
775    pub fn as_collection_core(
776        &self,
777        mut mfp: MapFilterProject,
778        key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
779        until: Antichain<mz_repr::Timestamp>,
780        config_set: &ConfigSet,
781    ) -> (
782        Collection<S, mz_repr::Row, Diff>,
783        Collection<S, DataflowError, Diff>,
784    ) {
785        mfp.optimize();
786        let mfp_plan = mfp.clone().into_plan().unwrap();
787
788        // If the MFP is trivial, we can just call `as_collection`.
789        // In the case that we weren't going to apply the `key_val` optimization,
790        // this path results in a slightly smaller and faster
791        // dataflow graph, and is intended to fix
792        // https://github.com/MaterializeInc/database-issues/issues/3111
793        let has_key_val = if let Some((_key, Some(_val))) = &key_val {
794            true
795        } else {
796            false
797        };
798
799        if mfp_plan.is_identity() && !has_key_val {
800            let key = key_val.map(|(k, _v)| k);
801            return self.as_specific_collection(key.as_deref(), config_set);
802        }
803
804        let max_demand = mfp.demand().iter().max().map(|x| *x + 1).unwrap_or(0);
805        mfp.permute_fn(|c| c, max_demand);
806        mfp.optimize();
807        let mfp_plan = mfp.into_plan().unwrap();
808
809        let mut datum_vec = DatumVec::new();
810        // Wrap in an `Rc` so that lifetimes work out.
811        let until = std::rc::Rc::new(until);
812
813        let (stream, errors) = self.flat_map(key_val, max_demand, move |row_datums, time, diff| {
814            let mut row_builder = SharedRow::get();
815            let until = std::rc::Rc::clone(&until);
816            let temp_storage = RowArena::new();
817            let row_iter = row_datums.iter();
818            let mut datums_local = datum_vec.borrow();
819            datums_local.extend(row_iter);
820            let time = time.clone();
821            let event_time = time.event_time();
822            mfp_plan
823                .evaluate(
824                    &mut datums_local,
825                    &temp_storage,
826                    event_time,
827                    diff.clone(),
828                    move |time| !until.less_equal(time),
829                    &mut row_builder,
830                )
831                .map(move |x| match x {
832                    Ok((row, event_time, diff)) => {
833                        // Copy the whole time, and re-populate event time.
834                        let mut time: S::Timestamp = time.clone();
835                        *time.event_time_mut() = event_time;
836                        (Ok(row), time, diff)
837                    }
838                    Err((e, event_time, diff)) => {
839                        // Copy the whole time, and re-populate event time.
840                        let mut time: S::Timestamp = time.clone();
841                        *time.event_time_mut() = event_time;
842                        (Err(e), time, diff)
843                    }
844                })
845        });
846
847        use differential_dataflow::AsCollection;
848        let (oks, errs) = stream
849            .as_collection()
850            .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>(
851                "OkErr",
852                |x| x,
853            );
854
855        (oks, errors.concat(&errs))
856    }
857    pub fn ensure_collections(
858        mut self,
859        collections: AvailableCollections,
860        input_key: Option<Vec<MirScalarExpr>>,
861        input_mfp: MapFilterProject,
862        until: Antichain<mz_repr::Timestamp>,
863        config_set: &ConfigSet,
864    ) -> Self {
865        if collections == Default::default() {
866            return self;
867        }
868        // Cache collection to avoid reforming it each time.
869        //
870        // TODO(mcsherry): In theory this could be faster run out of another arrangement,
871        // as the `map_fallible` that follows could be run against an arrangement itself.
872        //
873        // Note(btv): If we ever do that, we would then only need to make the raw collection here
874        // if `collections.raw` is true.
875
876        for (key, _, _) in collections.arranged.iter() {
877            soft_assert_or_log!(
878                !self.arranged.contains_key(key),
879                "LIR ArrangeBy tried to create an existing arrangement"
880            );
881        }
882
883        // We need the collection if either (1) it is explicitly demanded, or (2) we are going to render any arrangement
884        let form_raw_collection = collections.raw
885            || collections
886                .arranged
887                .iter()
888                .any(|(key, _, _)| !self.arranged.contains_key(key));
889        if form_raw_collection && self.collection.is_none() {
890            self.collection = Some(self.as_collection_core(
891                input_mfp,
892                input_key.map(|k| (k, None)),
893                until,
894                config_set,
895            ));
896        }
897        for (key, _, thinning) in collections.arranged {
898            if !self.arranged.contains_key(&key) {
899                // TODO: Consider allowing more expressive names.
900                let name = format!("ArrangeBy[{:?}]", key);
901
902                let (oks, errs) = self
903                    .collection
904                    .clone()
905                    .expect("Collection constructed above");
906                let (oks, errs_keyed) =
907                    Self::arrange_collection(&name, oks, key.clone(), thinning.clone());
908                let errs: KeyCollection<_, _, _> = errs.concat(&errs_keyed).into();
909                let errs = errs.mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, ErrSpine<_, _>>(
910                    &format!("{}-errors", name),
911                );
912                self.arranged
913                    .insert(key, ArrangementFlavor::Local(oks, errs));
914            }
915        }
916        self
917    }
918
919    /// Builds an arrangement from a collection, using the specified key and value thinning.
920    ///
921    /// The arrangement's key is based on the `key` expressions, and the value the input with
922    /// the `thinning` applied to it. It selects which of the input columns are included in the
923    /// value of the arrangement. The thinning is in support of permuting arrangements such that
924    /// columns in the key are not included in the value.
925    fn arrange_collection(
926        name: &String,
927        oks: Collection<S, Row, Diff>,
928        key: Vec<MirScalarExpr>,
929        thinning: Vec<usize>,
930    ) -> (
931        Arranged<S, RowRowAgent<S::Timestamp, Diff>>,
932        Collection<S, DataflowError, Diff>,
933    ) {
934        // The following `unary_fallible` implements a `map_fallible`, but produces columnar updates
935        // for the ok stream. The `map_fallible` cannot be used here because the closure cannot
936        // return references, which is what we need to push into columnar streams. Instead, we use
937        // a bespoke operator that also optimizes reuse of allocations across individual updates.
938        let (oks, errs) = oks
939            .inner
940            .unary_fallible::<ColumnBuilder<((Row, Row), S::Timestamp, Diff)>, _, _, _>(
941                Pipeline,
942                "FormArrangementKey",
943                move |_, _| {
944                    Box::new(move |input, ok, err| {
945                        let mut key_buf = Row::default();
946                        let mut val_buf = Row::default();
947                        let mut datums = DatumVec::new();
948                        let mut temp_storage = RowArena::new();
949                        while let Some((time, data)) = input.next() {
950                            let mut ok_session = ok.session_with_builder(&time);
951                            let mut err_session = err.session(&time);
952                            for (row, time, diff) in data.iter() {
953                                temp_storage.clear();
954                                let datums = datums.borrow_with(row);
955                                let key_iter = key.iter().map(|k| k.eval(&datums, &temp_storage));
956                                match key_buf.packer().try_extend(key_iter) {
957                                    Ok(()) => {
958                                        let val_datum_iter = thinning.iter().map(|c| datums[*c]);
959                                        val_buf.packer().extend(val_datum_iter);
960                                        ok_session.give(((&*key_buf, &*val_buf), time, diff));
961                                    }
962                                    Err(e) => {
963                                        err_session.give((e.into(), time.clone(), *diff));
964                                    }
965                                }
966                            }
967                        }
968                    })
969                },
970            );
971        let oks = oks
972            .mz_arrange_core::<_, Col2ValBatcher<_, _,_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
973                ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<Row, Row, S::Timestamp, Diff>),name
974            );
975        (oks, errs.as_collection())
976    }
977}
978
979struct PendingWork<C>
980where
981    C: Cursor,
982{
983    capability: Capability<C::Time>,
984    cursor: C,
985    batch: C::Storage,
986}
987
988impl<C> PendingWork<C>
989where
990    C: Cursor<KeyOwn: PartialEq + Sized>,
991{
992    /// Create a new bundle of pending work, from the capability, cursor, and backing storage.
993    fn new(capability: Capability<C::Time>, cursor: C, batch: C::Storage) -> Self {
994        Self {
995            capability,
996            cursor,
997            batch,
998        }
999    }
1000    /// Perform roughly `fuel` work through the cursor, applying `logic` and sending results to `output`.
1001    fn do_work<I, D, L>(
1002        &mut self,
1003        key: Option<&C::Key<'_>>,
1004        logic: &mut L,
1005        fuel: &mut usize,
1006        output: &mut OutputHandleCore<
1007            '_,
1008            C::Time,
1009            ConsolidatingContainerBuilder<Vec<I::Item>>,
1010            timely::dataflow::channels::pushers::Tee<C::Time, Vec<I::Item>>,
1011        >,
1012    ) where
1013        I: IntoIterator<Item = (D, C::Time, C::Diff)>,
1014        D: Data,
1015        L: FnMut(C::Key<'_>, C::Val<'_>, C::Time, C::Diff) -> I + 'static,
1016    {
1017        use differential_dataflow::consolidation::consolidate;
1018
1019        // Attempt to make progress on this batch.
1020        let mut work: usize = 0;
1021        let mut session = output.session_with_builder(&self.capability);
1022        let mut buffer = Vec::new();
1023        if let Some(key) = key {
1024            let key = C::KeyContainer::reborrow(*key);
1025            if self.cursor.get_key(&self.batch).map(|k| k == key) != Some(true) {
1026                self.cursor.seek_key(&self.batch, key);
1027            }
1028            if self.cursor.get_key(&self.batch).map(|k| k == key) == Some(true) {
1029                let key = self.cursor.key(&self.batch);
1030                while let Some(val) = self.cursor.get_val(&self.batch) {
1031                    self.cursor.map_times(&self.batch, |time, diff| {
1032                        buffer.push((C::owned_time(time), C::owned_diff(diff)));
1033                    });
1034                    consolidate(&mut buffer);
1035                    for (time, diff) in buffer.drain(..) {
1036                        for datum in logic(key, val, time, diff) {
1037                            session.give(datum);
1038                            work += 1;
1039                        }
1040                    }
1041                    self.cursor.step_val(&self.batch);
1042                    if work >= *fuel {
1043                        *fuel = 0;
1044                        return;
1045                    }
1046                }
1047            }
1048        } else {
1049            while let Some(key) = self.cursor.get_key(&self.batch) {
1050                while let Some(val) = self.cursor.get_val(&self.batch) {
1051                    self.cursor.map_times(&self.batch, |time, diff| {
1052                        buffer.push((C::owned_time(time), C::owned_diff(diff)));
1053                    });
1054                    consolidate(&mut buffer);
1055                    for (time, diff) in buffer.drain(..) {
1056                        for datum in logic(key, val, time, diff) {
1057                            session.give(datum);
1058                            work += 1;
1059                        }
1060                    }
1061                    self.cursor.step_val(&self.batch);
1062                    if work >= *fuel {
1063                        *fuel = 0;
1064                        return;
1065                    }
1066                }
1067                self.cursor.step_key(&self.batch);
1068            }
1069        }
1070        *fuel -= work;
1071    }
1072}