mz_compute/render/
context.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Management of dataflow-local state, like arrangements, while building a
11//! dataflow.
12
13use std::collections::BTreeMap;
14use std::rc::{Rc, Weak};
15use std::sync::mpsc;
16
17use columnar::Columnar;
18use differential_dataflow::IntoOwned;
19use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
20use differential_dataflow::containers::Columnation;
21use differential_dataflow::lattice::Lattice;
22use differential_dataflow::operators::arrange::Arranged;
23use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
24use differential_dataflow::{AsCollection, Collection, Data};
25use mz_compute_types::dataflows::DataflowDescription;
26use mz_compute_types::dyncfgs::ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION;
27use mz_compute_types::plan::{AvailableCollections, LirId};
28use mz_dyncfg::ConfigSet;
29use mz_expr::{Id, MapFilterProject, MirScalarExpr};
30use mz_repr::fixed_length::ToDatumIter;
31use mz_repr::{DatumVec, DatumVecBorrow, Diff, GlobalId, Row, RowArena, SharedRow};
32use mz_storage_types::controller::CollectionMetadata;
33use mz_storage_types::errors::DataflowError;
34use mz_timely_util::containers::{Col2ValBatcher, ColumnBuilder, columnar_exchange};
35use mz_timely_util::operator::{CollectionExt, StreamExt};
36use timely::Container;
37use timely::container::CapacityContainerBuilder;
38use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
39use timely::dataflow::operators::Capability;
40use timely::dataflow::operators::generic::OutputHandleCore;
41use timely::dataflow::scopes::Child;
42use timely::dataflow::{Scope, Stream};
43use timely::progress::timestamp::Refines;
44use timely::progress::{Antichain, Timestamp};
45use tracing::error;
46
47use crate::compute_state::{ComputeState, HydrationEvent};
48use crate::extensions::arrange::{KeyCollection, MzArrange, MzArrangeCore};
49use crate::render::errors::ErrorLogger;
50use crate::render::{LinearJoinSpec, RenderTimestamp};
51use crate::row_spine::{DatumSeq, RowRowBuilder};
52use crate::typedefs::{
53    ErrAgent, ErrBatcher, ErrBuilder, ErrEnter, ErrSpine, RowRowAgent, RowRowEnter, RowRowSpine,
54};
55
56/// Dataflow-local collections and arrangements.
57///
58/// A context means to wrap available data assets and present them in an easy-to-use manner.
59/// These assets include dataflow-local collections and arrangements, as well as imported
60/// arrangements from outside the dataflow.
61///
62/// Context has two timestamp types, one from `S::Timestamp` and one from `T`, where the
63/// former must refine the latter. The former is the timestamp used by the scope in question,
64/// and the latter is the timestamp of imported traces. The two may be different in the case
65/// of regions or iteration.
66pub struct Context<S: Scope, T = mz_repr::Timestamp>
67where
68    T: Timestamp + Lattice + Columnation,
69    S::Timestamp: Lattice + Refines<T> + Columnation,
70{
71    /// The scope within which all managed collections exist.
72    ///
73    /// It is an error to add any collections not contained in this scope.
74    pub(crate) scope: S,
75    /// The debug name of the dataflow associated with this context.
76    pub debug_name: String,
77    /// The Timely ID of the dataflow associated with this context.
78    pub dataflow_id: usize,
79    /// Frontier before which updates should not be emitted.
80    ///
81    /// We *must* apply it to sinks, to ensure correct outputs.
82    /// We *should* apply it to sources and imported traces, because it improves performance.
83    pub as_of_frontier: Antichain<T>,
84    /// Frontier after which updates should not be emitted.
85    /// Used to limit the amount of work done when appropriate.
86    pub until: Antichain<T>,
87    /// Bindings of identifiers to collections.
88    pub bindings: BTreeMap<Id, CollectionBundle<S, T>>,
89    /// A token that operators can probe to know whether the dataflow is shutting down.
90    pub(super) shutdown_token: ShutdownToken,
91    /// A logger that operators can use to report hydration events.
92    ///
93    /// `None` if no hydration events should be logged in this context.
94    pub(super) hydration_logger: Option<HydrationLogger>,
95    /// The logger, from Timely's logging framework, if logs are enabled.
96    pub(super) compute_logger: Option<crate::logging::compute::Logger>,
97    /// Specification for rendering linear joins.
98    pub(super) linear_join_spec: LinearJoinSpec,
99    /// The expiration time for dataflows in this context. The output's frontier should never advance
100    /// past this frontier, except the empty frontier.
101    pub dataflow_expiration: Antichain<T>,
102    /// The config set for this context.
103    pub config_set: Rc<ConfigSet>,
104}
105
106impl<S: Scope> Context<S>
107where
108    S::Timestamp: Lattice + Refines<mz_repr::Timestamp> + Columnation,
109{
110    /// Creates a new empty Context.
111    pub fn for_dataflow_in<Plan>(
112        dataflow: &DataflowDescription<Plan, CollectionMetadata>,
113        scope: S,
114        compute_state: &ComputeState,
115        until: Antichain<mz_repr::Timestamp>,
116        dataflow_expiration: Antichain<mz_repr::Timestamp>,
117    ) -> Self {
118        use mz_ore::collections::CollectionExt as IteratorExt;
119        let dataflow_id = *scope.addr().into_first();
120        let as_of_frontier = dataflow
121            .as_of
122            .clone()
123            .unwrap_or_else(|| Antichain::from_elem(Timestamp::minimum()));
124
125        // Skip operator hydration logging for transient dataflows. We do this to avoid overhead
126        // for slow-path peeks, but it also affects subscribes. For now that seems fine, but we may
127        // want to reconsider in the future.
128        //
129        // Similarly, we won't capture a compute_logger for logging LIR->address mappings for transient dataflows.
130        let (hydration_logger, compute_logger) = if dataflow.is_transient() {
131            (None, None)
132        } else {
133            (
134                Some(HydrationLogger {
135                    export_ids: dataflow.export_ids().collect(),
136                    tx: compute_state.hydration_tx.clone(),
137                }),
138                compute_state.compute_logger.clone(),
139            )
140        };
141
142        Self {
143            scope,
144            debug_name: dataflow.debug_name.clone(),
145            dataflow_id,
146            as_of_frontier,
147            until,
148            bindings: BTreeMap::new(),
149            shutdown_token: Default::default(),
150            hydration_logger,
151            compute_logger,
152            linear_join_spec: compute_state.linear_join_spec,
153            dataflow_expiration,
154            config_set: Rc::clone(&compute_state.worker_config),
155        }
156    }
157}
158
159impl<S: Scope, T> Context<S, T>
160where
161    T: Timestamp + Lattice + Columnation,
162    S::Timestamp: Lattice + Refines<T> + Columnation,
163{
164    /// Insert a collection bundle by an identifier.
165    ///
166    /// This is expected to be used to install external collections (sources, indexes, other views),
167    /// as well as for `Let` bindings of local collections.
168    pub fn insert_id(
169        &mut self,
170        id: Id,
171        collection: CollectionBundle<S, T>,
172    ) -> Option<CollectionBundle<S, T>> {
173        self.bindings.insert(id, collection)
174    }
175    /// Remove a collection bundle by an identifier.
176    ///
177    /// The primary use of this method is uninstalling `Let` bindings.
178    pub fn remove_id(&mut self, id: Id) -> Option<CollectionBundle<S, T>> {
179        self.bindings.remove(&id)
180    }
181    /// Melds a collection bundle to whatever exists.
182    pub fn update_id(&mut self, id: Id, collection: CollectionBundle<S, T>) {
183        if !self.bindings.contains_key(&id) {
184            self.bindings.insert(id, collection);
185        } else {
186            let binding = self
187                .bindings
188                .get_mut(&id)
189                .expect("Binding verified to exist");
190            if collection.collection.is_some() {
191                binding.collection = collection.collection;
192            }
193            for (key, flavor) in collection.arranged.into_iter() {
194                binding.arranged.insert(key, flavor);
195            }
196        }
197    }
198    /// Look up a collection bundle by an identifier.
199    pub fn lookup_id(&self, id: Id) -> Option<CollectionBundle<S, T>> {
200        self.bindings.get(&id).cloned()
201    }
202
203    pub(super) fn error_logger(&self) -> ErrorLogger {
204        ErrorLogger::new(self.shutdown_token.clone(), self.debug_name.clone())
205    }
206}
207
208impl<S: Scope, T> Context<S, T>
209where
210    T: Timestamp + Lattice + Columnation,
211    S::Timestamp: Lattice + Refines<T> + Columnation,
212{
213    /// Brings the underlying arrangements and collections into a region.
214    pub fn enter_region<'a>(
215        &self,
216        region: &Child<'a, S, S::Timestamp>,
217        bindings: Option<&std::collections::BTreeSet<Id>>,
218    ) -> Context<Child<'a, S, S::Timestamp>, T> {
219        let bindings = self
220            .bindings
221            .iter()
222            .filter(|(key, _)| bindings.as_ref().map(|b| b.contains(key)).unwrap_or(true))
223            .map(|(key, bundle)| (*key, bundle.enter_region(region)))
224            .collect();
225
226        Context {
227            scope: region.clone(),
228            debug_name: self.debug_name.clone(),
229            dataflow_id: self.dataflow_id.clone(),
230            as_of_frontier: self.as_of_frontier.clone(),
231            until: self.until.clone(),
232            shutdown_token: self.shutdown_token.clone(),
233            hydration_logger: self.hydration_logger.clone(),
234            compute_logger: self.compute_logger.clone(),
235            linear_join_spec: self.linear_join_spec.clone(),
236            bindings,
237            dataflow_expiration: self.dataflow_expiration.clone(),
238            config_set: Rc::clone(&self.config_set),
239        }
240    }
241}
242
243/// Convenient wrapper around an optional `Weak` instance that can be used to check whether a
244/// datalow is shutting down.
245///
246/// Instances created through the `Default` impl act as if the dataflow never shuts down.
247/// Instances created through [`ShutdownToken::new`] defer to the wrapped token.
248#[derive(Clone, Default)]
249pub(super) struct ShutdownToken(Option<Weak<()>>);
250
251impl ShutdownToken {
252    /// Construct a `ShutdownToken` instance that defers to `token`.
253    pub(super) fn new(token: Weak<()>) -> Self {
254        Self(Some(token))
255    }
256
257    /// Probe the token for dataflow shutdown.
258    ///
259    /// This method is meant to be used with the `?` operator: It returns `None` if the dataflow is
260    /// in the process of shutting down and `Some` otherwise.
261    pub(super) fn probe(&self) -> Option<()> {
262        match &self.0 {
263            Some(t) => t.upgrade().map(|_| ()),
264            None => Some(()),
265        }
266    }
267
268    /// Returns whether the dataflow is in the process of shutting down.
269    pub(super) fn in_shutdown(&self) -> bool {
270        self.probe().is_none()
271    }
272
273    /// Returns a reference to the wrapped `Weak`.
274    pub(crate) fn get_inner(&self) -> Option<&Weak<()>> {
275        self.0.as_ref()
276    }
277}
278
279/// A logger for operator hydration events emitted for a dataflow export.
280#[derive(Clone)]
281pub(super) struct HydrationLogger {
282    export_ids: Vec<GlobalId>,
283    tx: mpsc::Sender<HydrationEvent>,
284}
285
286impl HydrationLogger {
287    /// Log a hydration event for the identified LIR node.
288    ///
289    /// The expectation is that rendering code arranges for `hydrated = false` to be logged for
290    /// each LIR node when a dataflow is first created. Then `hydrated = true` should be logged as
291    /// operators become hydrated.
292    pub fn log(&self, lir_id: LirId, hydrated: bool) {
293        for &export_id in &self.export_ids {
294            let event = HydrationEvent {
295                export_id,
296                lir_id,
297                hydrated,
298            };
299            if self.tx.send(event).is_err() {
300                error!("hydration event receiver dropped unexpectely");
301            }
302        }
303    }
304}
305
306/// Describes flavor of arrangement: local or imported trace.
307#[derive(Clone)]
308pub enum ArrangementFlavor<S: Scope, T = mz_repr::Timestamp>
309where
310    T: Timestamp + Lattice + Columnation,
311    S::Timestamp: Lattice + Refines<T> + Columnation,
312{
313    /// A dataflow-local arrangement.
314    Local(
315        Arranged<S, RowRowAgent<S::Timestamp, Diff>>,
316        Arranged<S, ErrAgent<S::Timestamp, Diff>>,
317    ),
318    /// An imported trace from outside the dataflow.
319    ///
320    /// The `GlobalId` identifier exists so that exports of this same trace
321    /// can refer back to and depend on the original instance.
322    Trace(
323        GlobalId,
324        Arranged<S, RowRowEnter<T, Diff, S::Timestamp>>,
325        Arranged<S, ErrEnter<T, S::Timestamp>>,
326    ),
327}
328
329impl<S: Scope, T> ArrangementFlavor<S, T>
330where
331    T: Timestamp + Lattice + Columnation,
332    S::Timestamp: Lattice + Refines<T> + Columnation,
333{
334    /// Presents `self` as a stream of updates.
335    ///
336    /// Deprecated: This function is not fueled and hence risks flattening the whole arrangement.
337    ///
338    /// This method presents the contents as they are, without further computation.
339    /// If you have logic that could be applied to each record, consider using the
340    /// `flat_map` methods which allows this and can reduce the work done.
341    #[deprecated(note = "Use `flat_map` instead.")]
342    pub fn as_collection(&self) -> (Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>) {
343        let mut datums = DatumVec::new();
344        let logic = move |k: DatumSeq, v: DatumSeq| {
345            let mut datums_borrow = datums.borrow();
346            datums_borrow.extend(k);
347            datums_borrow.extend(v);
348            SharedRow::pack(&**datums_borrow)
349        };
350        match &self {
351            ArrangementFlavor::Local(oks, errs) => (
352                oks.as_collection(logic),
353                errs.as_collection(|k, &()| k.clone()),
354            ),
355            ArrangementFlavor::Trace(_, oks, errs) => (
356                oks.as_collection(logic),
357                errs.as_collection(|k, &()| k.clone()),
358            ),
359        }
360    }
361
362    /// Constructs and applies logic to elements of `self` and returns the results.
363    ///
364    /// The `logic` receives a vector of datums, a timestamp, and a diff, and produces
365    /// an iterator of `(D, S::Timestamp, Diff)` updates.
366    ///
367    /// If `key` is set, this is a promise that `logic` will produce no results on
368    /// records for which the key does not evaluate to the value. This is used to
369    /// leap directly to exactly those records.
370    ///
371    /// The `max_demand` parameter limits the number of columns decoded from the
372    /// input. Only the first `max_demand` columns are decoded. Pass `usize::MAX` to
373    /// decode all columns.
374    pub fn flat_map<D, I, L>(
375        &self,
376        key: Option<Row>,
377        max_demand: usize,
378        mut logic: L,
379    ) -> (Stream<S, I::Item>, Collection<S, DataflowError, Diff>)
380    where
381        I: IntoIterator<Item = (D, S::Timestamp, Diff)>,
382        D: Data,
383        L: for<'a, 'b> FnMut(&'a mut DatumVecBorrow<'b>, S::Timestamp, Diff) -> I + 'static,
384    {
385        // Set a number of tuples after which the operator should yield.
386        // This allows us to remain responsive even when enumerating a substantial
387        // arrangement, as well as provides time to accumulate our produced output.
388        let refuel = 1000000;
389
390        let mut datums = DatumVec::new();
391        let logic = move |k: DatumSeq, v: DatumSeq, t, d| {
392            let mut datums_borrow = datums.borrow();
393            datums_borrow.extend(k.to_datum_iter().take(max_demand));
394            let max_demand = max_demand.saturating_sub(datums_borrow.len());
395            datums_borrow.extend(v.to_datum_iter().take(max_demand));
396            logic(&mut datums_borrow, t, d)
397        };
398
399        match &self {
400            ArrangementFlavor::Local(oks, errs) => {
401                let oks = CollectionBundle::<S, T>::flat_map_core(oks, key, logic, refuel);
402                let errs = errs.as_collection(|k, &()| k.clone());
403                (oks, errs)
404            }
405            ArrangementFlavor::Trace(_, oks, errs) => {
406                let oks = CollectionBundle::<S, T>::flat_map_core(oks, key, logic, refuel);
407                let errs = errs.as_collection(|k, &()| k.clone());
408                (oks, errs)
409            }
410        }
411    }
412}
413impl<S: Scope, T> ArrangementFlavor<S, T>
414where
415    T: Timestamp + Lattice + Columnation,
416    S::Timestamp: Lattice + Refines<T> + Columnation,
417{
418    /// The scope containing the collection bundle.
419    pub fn scope(&self) -> S {
420        match self {
421            ArrangementFlavor::Local(oks, _errs) => oks.stream.scope(),
422            ArrangementFlavor::Trace(_gid, oks, _errs) => oks.stream.scope(),
423        }
424    }
425
426    /// Brings the arrangement flavor into a region.
427    pub fn enter_region<'a>(
428        &self,
429        region: &Child<'a, S, S::Timestamp>,
430    ) -> ArrangementFlavor<Child<'a, S, S::Timestamp>, T> {
431        match self {
432            ArrangementFlavor::Local(oks, errs) => {
433                ArrangementFlavor::Local(oks.enter_region(region), errs.enter_region(region))
434            }
435            ArrangementFlavor::Trace(gid, oks, errs) => {
436                ArrangementFlavor::Trace(*gid, oks.enter_region(region), errs.enter_region(region))
437            }
438        }
439    }
440}
441impl<'a, S: Scope, T> ArrangementFlavor<Child<'a, S, S::Timestamp>, T>
442where
443    T: Timestamp + Lattice + Columnation,
444    S::Timestamp: Lattice + Refines<T> + Columnation,
445{
446    /// Extracts the arrangement flavor from a region.
447    pub fn leave_region(&self) -> ArrangementFlavor<S, T> {
448        match self {
449            ArrangementFlavor::Local(oks, errs) => {
450                ArrangementFlavor::Local(oks.leave_region(), errs.leave_region())
451            }
452            ArrangementFlavor::Trace(gid, oks, errs) => {
453                ArrangementFlavor::Trace(*gid, oks.leave_region(), errs.leave_region())
454            }
455        }
456    }
457}
458
459/// A bundle of the various ways a collection can be represented.
460///
461/// This type maintains the invariant that it does contain at least one valid
462/// source of data, either a collection or at least one arrangement.
463#[derive(Clone)]
464pub struct CollectionBundle<S: Scope, T = mz_repr::Timestamp>
465where
466    T: Timestamp + Lattice + Columnation,
467    S::Timestamp: Lattice + Refines<T> + Columnation,
468{
469    pub collection: Option<(Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>)>,
470    pub arranged: BTreeMap<Vec<MirScalarExpr>, ArrangementFlavor<S, T>>,
471}
472
473impl<S: Scope, T: Lattice> CollectionBundle<S, T>
474where
475    T: Timestamp + Lattice + Columnation,
476    S::Timestamp: Lattice + Refines<T> + Columnation,
477{
478    /// Construct a new collection bundle from update streams.
479    pub fn from_collections(
480        oks: Collection<S, Row, Diff>,
481        errs: Collection<S, DataflowError, Diff>,
482    ) -> Self {
483        Self {
484            collection: Some((oks, errs)),
485            arranged: BTreeMap::default(),
486        }
487    }
488
489    /// Inserts arrangements by the expressions on which they are keyed.
490    pub fn from_expressions(
491        exprs: Vec<MirScalarExpr>,
492        arrangements: ArrangementFlavor<S, T>,
493    ) -> Self {
494        let mut arranged = BTreeMap::new();
495        arranged.insert(exprs, arrangements);
496        Self {
497            collection: None,
498            arranged,
499        }
500    }
501
502    /// Inserts arrangements by the columns on which they are keyed.
503    pub fn from_columns<I: IntoIterator<Item = usize>>(
504        columns: I,
505        arrangements: ArrangementFlavor<S, T>,
506    ) -> Self {
507        let mut keys = Vec::new();
508        for column in columns {
509            keys.push(MirScalarExpr::Column(column));
510        }
511        Self::from_expressions(keys, arrangements)
512    }
513
514    /// The scope containing the collection bundle.
515    pub fn scope(&self) -> S {
516        if let Some((oks, _errs)) = &self.collection {
517            oks.inner.scope()
518        } else {
519            self.arranged
520                .values()
521                .next()
522                .expect("Must contain a valid collection")
523                .scope()
524        }
525    }
526
527    /// Brings the collection bundle into a region.
528    pub fn enter_region<'a>(
529        &self,
530        region: &Child<'a, S, S::Timestamp>,
531    ) -> CollectionBundle<Child<'a, S, S::Timestamp>, T> {
532        CollectionBundle {
533            collection: self
534                .collection
535                .as_ref()
536                .map(|(oks, errs)| (oks.enter_region(region), errs.enter_region(region))),
537            arranged: self
538                .arranged
539                .iter()
540                .map(|(key, bundle)| (key.clone(), bundle.enter_region(region)))
541                .collect(),
542        }
543    }
544}
545
546impl<'a, S: Scope, T> CollectionBundle<Child<'a, S, S::Timestamp>, T>
547where
548    T: Timestamp + Lattice + Columnation,
549    S::Timestamp: Lattice + Refines<T> + Columnation,
550{
551    /// Extracts the collection bundle from a region.
552    pub fn leave_region(&self) -> CollectionBundle<S, T> {
553        CollectionBundle {
554            collection: self
555                .collection
556                .as_ref()
557                .map(|(oks, errs)| (oks.leave_region(), errs.leave_region())),
558            arranged: self
559                .arranged
560                .iter()
561                .map(|(key, bundle)| (key.clone(), bundle.leave_region()))
562                .collect(),
563        }
564    }
565}
566
567impl<S: Scope, T> CollectionBundle<S, T>
568where
569    T: Timestamp + Lattice + Columnation,
570    S::Timestamp: Lattice + Refines<T> + Columnation,
571{
572    /// Asserts that the arrangement for a specific key
573    /// (or the raw collection for no key) exists,
574    /// and returns the corresponding collection.
575    ///
576    /// This returns the collection as-is, without
577    /// doing any unthinning transformation.
578    /// Therefore, it should be used when the appropriate transformation
579    /// was planned as part of a following MFP.
580    ///
581    /// If `key` is specified, the function converts the arrangement to a collection. It uses either
582    /// the fueled `flat_map` or `as_collection` method, depending on the flag
583    /// [`ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION`].
584    pub fn as_specific_collection(
585        &self,
586        key: Option<&[MirScalarExpr]>,
587        config_set: &ConfigSet,
588    ) -> (Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>) {
589        // Any operator that uses this method was told to use a particular
590        // collection during LIR planning, where we should have made
591        // sure that that collection exists.
592        //
593        // If it doesn't, we panic.
594        match key {
595            None => self
596                .collection
597                .clone()
598                .expect("The unarranged collection doesn't exist."),
599            Some(key) => {
600                let arranged = self.arranged.get(key).unwrap_or_else(|| {
601                    panic!("The collection arranged by {:?} doesn't exist.", key)
602                });
603                if ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION.get(config_set) {
604                    // Decode all columns, pass max_demand as usize::MAX.
605                    let (ok, err) = arranged.flat_map(None, usize::MAX, |borrow, t, r| {
606                        Some((SharedRow::pack(borrow.iter()), t, r))
607                    });
608                    (ok.as_collection(), err)
609                } else {
610                    #[allow(deprecated)]
611                    arranged.as_collection()
612                }
613            }
614        }
615    }
616
617    /// Constructs and applies logic to elements of a collection and returns the results.
618    ///
619    /// The function applies `logic` on elements. The logic conceptually receives
620    /// `(&Row, &Row)` pairs in the form of a datum vec in the expected order.
621    ///
622    /// If `key_val` is set, this is a promise that `logic` will produce no results on
623    /// records for which the key does not evaluate to the value. This is used when we
624    /// have an arrangement by that key to leap directly to exactly those records.
625    /// It is important that `logic` still guard against data that does not satisfy
626    /// this constraint, as this method does not statically know that it will have
627    /// that arrangement.
628    ///
629    /// The `max_demand` parameter limits the number of columns decoded from the
630    /// input. Only the first `max_demand` columns are decoded. Pass `usize::MAX` to
631    /// decode all columns.
632    pub fn flat_map<D, I, L>(
633        &self,
634        key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
635        max_demand: usize,
636        mut logic: L,
637    ) -> (Stream<S, I::Item>, Collection<S, DataflowError, Diff>)
638    where
639        I: IntoIterator<Item = (D, S::Timestamp, Diff)>,
640        D: Data,
641        L: for<'a> FnMut(&'a mut DatumVecBorrow<'_>, S::Timestamp, Diff) -> I + 'static,
642    {
643        // If `key_val` is set, we should have to use the corresponding arrangement.
644        // If there isn't one, that implies an error in the contract between
645        // key-production and available arrangements.
646        if let Some((key, val)) = key_val {
647            self.arrangement(&key)
648                .expect("Should have ensured during planning that this arrangement exists.")
649                .flat_map(val, max_demand, logic)
650        } else {
651            use timely::dataflow::operators::Map;
652            let (oks, errs) = self
653                .collection
654                .clone()
655                .expect("Invariant violated: CollectionBundle contains no collection.");
656            let mut datums = DatumVec::new();
657            let oks = oks.inner.flat_map(move |(v, t, d)| {
658                logic(&mut datums.borrow_with_limit(&v, max_demand), t, d)
659            });
660            (oks, errs)
661        }
662    }
663
664    /// Factored out common logic for using literal keys in general traces.
665    ///
666    /// This logic is sufficiently interesting that we want to write it only
667    /// once, and thereby avoid any skew in the two uses of the logic.
668    ///
669    /// The function presents the contents of the trace as `(key, value, time, delta)` tuples,
670    /// where key and value are potentially specialized, but convertible into rows.
671    fn flat_map_core<Tr, K, D, I, L>(
672        trace: &Arranged<S, Tr>,
673        key: Option<K>,
674        mut logic: L,
675        refuel: usize,
676    ) -> Stream<S, I::Item>
677    where
678        for<'a> Tr::Key<'a>: ToDatumIter + IntoOwned<'a, Owned = K>,
679        for<'a> Tr::Val<'a>: ToDatumIter,
680        Tr: TraceReader<Time = S::Timestamp, Diff = mz_repr::Diff> + Clone + 'static,
681        K: PartialEq + 'static,
682        I: IntoIterator<Item = (D, Tr::Time, Tr::Diff)>,
683        D: Data,
684        L: FnMut(Tr::Key<'_>, Tr::Val<'_>, S::Timestamp, mz_repr::Diff) -> I + 'static,
685    {
686        use differential_dataflow::consolidation::ConsolidatingContainerBuilder as CB;
687
688        let mode = if key.is_some() { "index" } else { "scan" };
689        let name = format!("ArrangementFlatMap({})", mode);
690        use timely::dataflow::operators::Operator;
691        trace
692            .stream
693            .unary::<CB<_>, _, _, _>(Pipeline, &name, move |_, info| {
694                // Acquire an activator to reschedule the operator when it has unfinished work.
695                let activator = trace.stream.scope().activator_for(info.address);
696                // Maintain a list of work to do, cursor to navigate and process.
697                let mut todo = std::collections::VecDeque::new();
698                move |input, output| {
699                    // First, dequeue all batches.
700                    input.for_each(|time, data| {
701                        let capability = time.retain();
702                        for batch in data.iter() {
703                            // enqueue a capability, cursor, and batch.
704                            todo.push_back(PendingWork::new(
705                                capability.clone(),
706                                batch.cursor(),
707                                batch.clone(),
708                            ));
709                        }
710                    });
711
712                    // Second, make progress on `todo`.
713                    let mut fuel = refuel;
714                    while !todo.is_empty() && fuel > 0 {
715                        todo.front_mut()
716                            .unwrap()
717                            .do_work(&key, &mut logic, &mut fuel, output);
718                        if fuel > 0 {
719                            todo.pop_front();
720                        }
721                    }
722                    // If we have not finished all work, re-activate the operator.
723                    if !todo.is_empty() {
724                        activator.activate();
725                    }
726                }
727            })
728    }
729
730    /// Look up an arrangement by the expressions that form the key.
731    ///
732    /// The result may be `None` if no such arrangement exists, or it may be one of many
733    /// "arrangement flavors" that represent the types of arranged data we might have.
734    pub fn arrangement(&self, key: &[MirScalarExpr]) -> Option<ArrangementFlavor<S, T>> {
735        self.arranged.get(key).map(|x| x.clone())
736    }
737}
738
739impl<S, T> CollectionBundle<S, T>
740where
741    T: Timestamp + Lattice + Columnation,
742    S: Scope,
743    S::Timestamp: Refines<T> + RenderTimestamp,
744    <S::Timestamp as Columnar>::Container: Clone + Send,
745    for<'a> <S::Timestamp as Columnar>::Ref<'a>: Ord + Copy,
746{
747    /// Presents `self` as a stream of updates, having been subjected to `mfp`.
748    ///
749    /// This operator is able to apply the logic of `mfp` early, which can substantially
750    /// reduce the amount of data produced when `mfp` is non-trivial.
751    ///
752    /// The `key_val` argument, when present, indicates that a specific arrangement should
753    /// be used, and if, in addition, the `val` component is present,
754    /// that we can seek to the supplied row.
755    pub fn as_collection_core(
756        &self,
757        mut mfp: MapFilterProject,
758        key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
759        until: Antichain<mz_repr::Timestamp>,
760        config_set: &ConfigSet,
761    ) -> (
762        Collection<S, mz_repr::Row, Diff>,
763        Collection<S, DataflowError, Diff>,
764    ) {
765        mfp.optimize();
766        let mfp_plan = mfp.clone().into_plan().unwrap();
767
768        // If the MFP is trivial, we can just call `as_collection`.
769        // In the case that we weren't going to apply the `key_val` optimization,
770        // this path results in a slightly smaller and faster
771        // dataflow graph, and is intended to fix
772        // https://github.com/MaterializeInc/database-issues/issues/3111
773        let has_key_val = if let Some((_key, Some(_val))) = &key_val {
774            true
775        } else {
776            false
777        };
778
779        if mfp_plan.is_identity() && !has_key_val {
780            let key = key_val.map(|(k, _v)| k);
781            return self.as_specific_collection(key.as_deref(), config_set);
782        }
783
784        let max_demand = mfp.demand().iter().max().map(|x| *x + 1).unwrap_or(0);
785        mfp.permute_fn(|c| c, max_demand);
786        mfp.optimize();
787        let mfp_plan = mfp.into_plan().unwrap();
788
789        let mut datum_vec = DatumVec::new();
790        // Wrap in an `Rc` so that lifetimes work out.
791        let until = std::rc::Rc::new(until);
792
793        let (stream, errors) = self.flat_map(key_val, max_demand, move |row_datums, time, diff| {
794            let binding = SharedRow::get();
795            let mut row_builder = binding.borrow_mut();
796            let until = std::rc::Rc::clone(&until);
797            let temp_storage = RowArena::new();
798            let row_iter = row_datums.iter();
799            let mut datums_local = datum_vec.borrow();
800            datums_local.extend(row_iter);
801            let time = time.clone();
802            let event_time = time.event_time();
803            mfp_plan
804                .evaluate(
805                    &mut datums_local,
806                    &temp_storage,
807                    event_time,
808                    diff.clone(),
809                    move |time| !until.less_equal(time),
810                    &mut row_builder,
811                )
812                .map(move |x| match x {
813                    Ok((row, event_time, diff)) => {
814                        // Copy the whole time, and re-populate event time.
815                        let mut time: S::Timestamp = time.clone();
816                        *time.event_time_mut() = event_time;
817                        (Ok(row), time, diff)
818                    }
819                    Err((e, event_time, diff)) => {
820                        // Copy the whole time, and re-populate event time.
821                        let mut time: S::Timestamp = time.clone();
822                        *time.event_time_mut() = event_time;
823                        (Err(e), time, diff)
824                    }
825                })
826        });
827
828        use differential_dataflow::AsCollection;
829        let (oks, errs) = stream
830            .as_collection()
831            .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>(
832                "OkErr",
833                |x| x,
834            );
835
836        (oks, errors.concat(&errs))
837    }
838    pub fn ensure_collections(
839        mut self,
840        collections: AvailableCollections,
841        input_key: Option<Vec<MirScalarExpr>>,
842        input_mfp: MapFilterProject,
843        until: Antichain<mz_repr::Timestamp>,
844        config_set: &ConfigSet,
845    ) -> Self {
846        if collections == Default::default() {
847            return self;
848        }
849        // Cache collection to avoid reforming it each time.
850        //
851        // TODO(mcsherry): In theory this could be faster run out of another arrangement,
852        // as the `map_fallible` that follows could be run against an arrangement itself.
853        //
854        // Note(btv): If we ever do that, we would then only need to make the raw collection here
855        // if `collections.raw` is true.
856
857        // We need the collection if either (1) it is explicitly demanded, or (2) we are going to render any arrangement
858        let form_raw_collection = collections.raw
859            || collections
860                .arranged
861                .iter()
862                .any(|(key, _, _)| !self.arranged.contains_key(key));
863        if form_raw_collection && self.collection.is_none() {
864            self.collection = Some(self.as_collection_core(
865                input_mfp,
866                input_key.map(|k| (k, None)),
867                until,
868                config_set,
869            ));
870        }
871        for (key, _, thinning) in collections.arranged {
872            if !self.arranged.contains_key(&key) {
873                // TODO: Consider allowing more expressive names.
874                let name = format!("ArrangeBy[{:?}]", key);
875
876                let (oks, errs) = self
877                    .collection
878                    .clone()
879                    .expect("Collection constructed above");
880                let (oks, errs_keyed) =
881                    Self::arrange_collection(&name, oks, key.clone(), thinning.clone());
882                let errs: KeyCollection<_, _, _> = errs.concat(&errs_keyed).into();
883                let errs = errs.mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, ErrSpine<_, _>>(
884                    &format!("{}-errors", name),
885                );
886                self.arranged
887                    .insert(key, ArrangementFlavor::Local(oks, errs));
888            }
889        }
890        self
891    }
892
893    /// Builds an arrangement from a collection, using the specified key and value thinning.
894    ///
895    /// The arrangement's key is based on the `key` expressions, and the value the input with
896    /// the `thinning` applied to it. It selects which of the input columns are included in the
897    /// value of the arrangement. The thinning is in support of permuting arrangements such that
898    /// columns in the key are not included in the value.
899    fn arrange_collection(
900        name: &String,
901        oks: Collection<S, Row, Diff>,
902        key: Vec<MirScalarExpr>,
903        thinning: Vec<usize>,
904    ) -> (
905        Arranged<S, RowRowAgent<S::Timestamp, Diff>>,
906        Collection<S, DataflowError, Diff>,
907    ) {
908        // The following `unary_fallible` implements a `map_fallible`, but produces columnar updates
909        // for the ok stream. The `map_fallible` cannot be used here because the closure cannot
910        // return references, which is what we need to push into columnar streams. Instead, we use
911        // a bespoke operator that also optimizes reuse of allocations across individual updates.
912        let (oks, errs) = oks
913            .inner
914            .unary_fallible::<ColumnBuilder<((Row, Row), S::Timestamp, Diff)>, _, _, _>(
915                Pipeline,
916                "FormArrangementKey",
917                move |_, _| {
918                    Box::new(move |input, ok, err| {
919                        let mut key_buf = Row::default();
920                        let mut val_buf = Row::default();
921                        let mut datums = DatumVec::new();
922                        let mut temp_storage = RowArena::new();
923                        while let Some((time, data)) = input.next() {
924                            let mut ok_session = ok.session_with_builder(&time);
925                            let mut err_session = err.session(&time);
926                            for (row, time, diff) in data.iter() {
927                                temp_storage.clear();
928                                let datums = datums.borrow_with(row);
929                                let key_iter = key.iter().map(|k| k.eval(&datums, &temp_storage));
930                                match key_buf.packer().try_extend(key_iter) {
931                                    Ok(()) => {
932                                        let val_datum_iter = thinning.iter().map(|c| datums[*c]);
933                                        val_buf.packer().extend(val_datum_iter);
934                                        ok_session.give(((&*key_buf, &*val_buf), time, diff));
935                                    }
936                                    Err(e) => {
937                                        err_session.give((e.into(), time.clone(), *diff));
938                                    }
939                                }
940                            }
941                        }
942                    })
943                },
944            );
945        let oks = oks
946            .mz_arrange_core::<_, Col2ValBatcher<_, _,_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
947                ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<Row, Row, S::Timestamp, Diff>),name
948            );
949        (oks, errs.as_collection())
950    }
951}
952
953struct PendingWork<C>
954where
955    C: Cursor,
956    C::Time: Timestamp,
957{
958    capability: Capability<C::Time>,
959    cursor: C,
960    batch: C::Storage,
961}
962
963impl<C> PendingWork<C>
964where
965    C: Cursor,
966    C::Time: Timestamp,
967{
968    /// Create a new bundle of pending work, from the capability, cursor, and backing storage.
969    fn new(capability: Capability<C::Time>, cursor: C, batch: C::Storage) -> Self {
970        Self {
971            capability,
972            cursor,
973            batch,
974        }
975    }
976    /// Perform roughly `fuel` work through the cursor, applying `logic` and sending results to `output`.
977    fn do_work<I, D, L, K>(
978        &mut self,
979        key: &Option<K>,
980        logic: &mut L,
981        fuel: &mut usize,
982        output: &mut OutputHandleCore<
983            '_,
984            C::Time,
985            ConsolidatingContainerBuilder<Vec<I::Item>>,
986            timely::dataflow::channels::pushers::Tee<C::Time, Vec<I::Item>>,
987        >,
988    ) where
989        I: IntoIterator<Item = (D, C::Time, C::Diff)>,
990        D: Data,
991        L: FnMut(C::Key<'_>, C::Val<'_>, C::Time, C::Diff) -> I + 'static,
992        K: PartialEq + Sized,
993        for<'a> C::Key<'a>: IntoOwned<'a, Owned = K>,
994    {
995        use differential_dataflow::consolidation::consolidate;
996
997        // Attempt to make progress on this batch.
998        let mut work: usize = 0;
999        let mut session = output.session_with_builder(&self.capability);
1000        let mut buffer = Vec::new();
1001        if let Some(key) = key {
1002            if self
1003                .cursor
1004                .get_key(&self.batch)
1005                .map(|k| k == IntoOwned::borrow_as(key))
1006                != Some(true)
1007            {
1008                self.cursor.seek_key(&self.batch, IntoOwned::borrow_as(key));
1009            }
1010            if self
1011                .cursor
1012                .get_key(&self.batch)
1013                .map(|k| k == IntoOwned::borrow_as(key))
1014                == Some(true)
1015            {
1016                let key = self.cursor.key(&self.batch);
1017                while let Some(val) = self.cursor.get_val(&self.batch) {
1018                    self.cursor.map_times(&self.batch, |time, diff| {
1019                        buffer.push((time.into_owned(), diff.into_owned()));
1020                    });
1021                    consolidate(&mut buffer);
1022                    for (time, diff) in buffer.drain(..) {
1023                        for datum in logic(key, val, time, diff) {
1024                            session.give(datum);
1025                            work += 1;
1026                        }
1027                    }
1028                    self.cursor.step_val(&self.batch);
1029                    if work >= *fuel {
1030                        *fuel = 0;
1031                        return;
1032                    }
1033                }
1034            }
1035        } else {
1036            while let Some(key) = self.cursor.get_key(&self.batch) {
1037                while let Some(val) = self.cursor.get_val(&self.batch) {
1038                    self.cursor.map_times(&self.batch, |time, diff| {
1039                        buffer.push((time.into_owned(), diff.into_owned()));
1040                    });
1041                    consolidate(&mut buffer);
1042                    for (time, diff) in buffer.drain(..) {
1043                        for datum in logic(key, val, time, diff) {
1044                            session.give(datum);
1045                            work += 1;
1046                        }
1047                    }
1048                    self.cursor.step_val(&self.batch);
1049                    if work >= *fuel {
1050                        *fuel = 0;
1051                        return;
1052                    }
1053                }
1054                self.cursor.step_key(&self.batch);
1055            }
1056        }
1057        *fuel -= work;
1058    }
1059}