mz_compute/render/
context.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Management of dataflow-local state, like arrangements, while building a
11//! dataflow.
12
13use std::collections::BTreeMap;
14use std::rc::Rc;
15
16use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
17use differential_dataflow::operators::arrange::Arranged;
18use differential_dataflow::trace::implementations::BatchContainer;
19use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
20use differential_dataflow::{AsCollection, Data, VecCollection};
21use mz_compute_types::dataflows::DataflowDescription;
22use mz_compute_types::dyncfgs::ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION;
23use mz_compute_types::plan::AvailableCollections;
24use mz_dyncfg::ConfigSet;
25use mz_expr::{Id, MapFilterProject, MirScalarExpr};
26use mz_ore::soft_assert_or_log;
27use mz_repr::fixed_length::ToDatumIter;
28use mz_repr::{DatumVec, DatumVecBorrow, Diff, GlobalId, Row, RowArena, SharedRow};
29use mz_storage_types::controller::CollectionMetadata;
30use mz_storage_types::errors::DataflowError;
31use mz_timely_util::columnar::builder::ColumnBuilder;
32use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
33use mz_timely_util::operator::{CollectionExt, StreamExt};
34use timely::container::CapacityContainerBuilder;
35use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
36use timely::dataflow::operators::Capability;
37use timely::dataflow::operators::generic::OutputBuilderSession;
38use timely::dataflow::scopes::Child;
39use timely::dataflow::{Scope, Stream};
40use timely::progress::timestamp::Refines;
41use timely::progress::{Antichain, Timestamp};
42
43use crate::compute_state::ComputeState;
44use crate::extensions::arrange::{KeyCollection, MzArrange, MzArrangeCore};
45use crate::render::errors::ErrorLogger;
46use crate::render::{LinearJoinSpec, RenderTimestamp};
47use crate::row_spine::{DatumSeq, RowRowBuilder};
48use crate::typedefs::{
49    ErrAgent, ErrBatcher, ErrBuilder, ErrEnter, ErrSpine, MzTimestamp, RowRowAgent, RowRowEnter,
50    RowRowSpine,
51};
52
53/// Dataflow-local collections and arrangements.
54///
55/// A context means to wrap available data assets and present them in an easy-to-use manner.
56/// These assets include dataflow-local collections and arrangements, as well as imported
57/// arrangements from outside the dataflow.
58///
59/// Context has two timestamp types, one from `S::Timestamp` and one from `T`, where the
60/// former must refine the latter. The former is the timestamp used by the scope in question,
61/// and the latter is the timestamp of imported traces. The two may be different in the case
62/// of regions or iteration.
63pub struct Context<S: Scope, T = mz_repr::Timestamp>
64where
65    T: MzTimestamp,
66    S::Timestamp: MzTimestamp + Refines<T>,
67{
68    /// The scope within which all managed collections exist.
69    ///
70    /// It is an error to add any collections not contained in this scope.
71    pub(crate) scope: S,
72    /// The debug name of the dataflow associated with this context.
73    pub debug_name: String,
74    /// The Timely ID of the dataflow associated with this context.
75    pub dataflow_id: usize,
76    /// The collection IDs of exports of the dataflow associated with this context.
77    pub export_ids: Vec<GlobalId>,
78    /// Frontier before which updates should not be emitted.
79    ///
80    /// We *must* apply it to sinks, to ensure correct outputs.
81    /// We *should* apply it to sources and imported traces, because it improves performance.
82    pub as_of_frontier: Antichain<T>,
83    /// Frontier after which updates should not be emitted.
84    /// Used to limit the amount of work done when appropriate.
85    pub until: Antichain<T>,
86    /// Bindings of identifiers to collections.
87    pub bindings: BTreeMap<Id, CollectionBundle<S, T>>,
88    /// The logger, from Timely's logging framework, if logs are enabled.
89    pub(super) compute_logger: Option<crate::logging::compute::Logger>,
90    /// Specification for rendering linear joins.
91    pub(super) linear_join_spec: LinearJoinSpec,
92    /// The expiration time for dataflows in this context. The output's frontier should never advance
93    /// past this frontier, except the empty frontier.
94    pub dataflow_expiration: Antichain<T>,
95    /// The config set for this context.
96    pub config_set: Rc<ConfigSet>,
97}
98
99impl<S: Scope> Context<S>
100where
101    S::Timestamp: MzTimestamp + Refines<mz_repr::Timestamp>,
102{
103    /// Creates a new empty Context.
104    pub fn for_dataflow_in<Plan>(
105        dataflow: &DataflowDescription<Plan, CollectionMetadata>,
106        scope: S,
107        compute_state: &ComputeState,
108        until: Antichain<mz_repr::Timestamp>,
109        dataflow_expiration: Antichain<mz_repr::Timestamp>,
110    ) -> Self {
111        use mz_ore::collections::CollectionExt as IteratorExt;
112        let dataflow_id = *scope.addr().into_first();
113        let as_of_frontier = dataflow
114            .as_of
115            .clone()
116            .unwrap_or_else(|| Antichain::from_elem(Timestamp::minimum()));
117
118        let export_ids = dataflow.export_ids().collect();
119
120        // Skip compute event logging for transient dataflows. We do this to avoid overhead for
121        // slow-path peeks, but it also affects subscribes. For now that seems fine, but we may
122        // want to reconsider in the future.
123        let compute_logger = if dataflow.is_transient() {
124            None
125        } else {
126            compute_state.compute_logger.clone()
127        };
128
129        Self {
130            scope,
131            debug_name: dataflow.debug_name.clone(),
132            dataflow_id,
133            export_ids,
134            as_of_frontier,
135            until,
136            bindings: BTreeMap::new(),
137            compute_logger,
138            linear_join_spec: compute_state.linear_join_spec,
139            dataflow_expiration,
140            config_set: Rc::clone(&compute_state.worker_config),
141        }
142    }
143}
144
145impl<S: Scope, T> Context<S, T>
146where
147    T: MzTimestamp,
148    S::Timestamp: MzTimestamp + Refines<T>,
149{
150    /// Insert a collection bundle by an identifier.
151    ///
152    /// This is expected to be used to install external collections (sources, indexes, other views),
153    /// as well as for `Let` bindings of local collections.
154    pub fn insert_id(
155        &mut self,
156        id: Id,
157        collection: CollectionBundle<S, T>,
158    ) -> Option<CollectionBundle<S, T>> {
159        self.bindings.insert(id, collection)
160    }
161    /// Remove a collection bundle by an identifier.
162    ///
163    /// The primary use of this method is uninstalling `Let` bindings.
164    pub fn remove_id(&mut self, id: Id) -> Option<CollectionBundle<S, T>> {
165        self.bindings.remove(&id)
166    }
167    /// Melds a collection bundle to whatever exists.
168    pub fn update_id(&mut self, id: Id, collection: CollectionBundle<S, T>) {
169        if !self.bindings.contains_key(&id) {
170            self.bindings.insert(id, collection);
171        } else {
172            let binding = self
173                .bindings
174                .get_mut(&id)
175                .expect("Binding verified to exist");
176            if collection.collection.is_some() {
177                binding.collection = collection.collection;
178            }
179            for (key, flavor) in collection.arranged.into_iter() {
180                binding.arranged.insert(key, flavor);
181            }
182        }
183    }
184    /// Look up a collection bundle by an identifier.
185    pub fn lookup_id(&self, id: Id) -> Option<CollectionBundle<S, T>> {
186        self.bindings.get(&id).cloned()
187    }
188
189    pub(super) fn error_logger(&self) -> ErrorLogger {
190        ErrorLogger::new(self.debug_name.clone())
191    }
192}
193
194impl<S: Scope, T> Context<S, T>
195where
196    T: MzTimestamp,
197    S::Timestamp: MzTimestamp + Refines<T>,
198{
199    /// Brings the underlying arrangements and collections into a region.
200    pub fn enter_region<'a>(
201        &self,
202        region: &Child<'a, S, S::Timestamp>,
203        bindings: Option<&std::collections::BTreeSet<Id>>,
204    ) -> Context<Child<'a, S, S::Timestamp>, T> {
205        let bindings = self
206            .bindings
207            .iter()
208            .filter(|(key, _)| bindings.as_ref().map(|b| b.contains(key)).unwrap_or(true))
209            .map(|(key, bundle)| (*key, bundle.enter_region(region)))
210            .collect();
211
212        Context {
213            scope: region.clone(),
214            debug_name: self.debug_name.clone(),
215            dataflow_id: self.dataflow_id.clone(),
216            export_ids: self.export_ids.clone(),
217            as_of_frontier: self.as_of_frontier.clone(),
218            until: self.until.clone(),
219            compute_logger: self.compute_logger.clone(),
220            linear_join_spec: self.linear_join_spec.clone(),
221            bindings,
222            dataflow_expiration: self.dataflow_expiration.clone(),
223            config_set: Rc::clone(&self.config_set),
224        }
225    }
226}
227
228/// Describes flavor of arrangement: local or imported trace.
229#[derive(Clone)]
230pub enum ArrangementFlavor<S: Scope, T = mz_repr::Timestamp>
231where
232    T: MzTimestamp,
233    S::Timestamp: MzTimestamp + Refines<T>,
234{
235    /// A dataflow-local arrangement.
236    Local(
237        Arranged<S, RowRowAgent<S::Timestamp, Diff>>,
238        Arranged<S, ErrAgent<S::Timestamp, Diff>>,
239    ),
240    /// An imported trace from outside the dataflow.
241    ///
242    /// The `GlobalId` identifier exists so that exports of this same trace
243    /// can refer back to and depend on the original instance.
244    Trace(
245        GlobalId,
246        Arranged<S, RowRowEnter<T, Diff, S::Timestamp>>,
247        Arranged<S, ErrEnter<T, S::Timestamp>>,
248    ),
249}
250
251impl<S: Scope, T> ArrangementFlavor<S, T>
252where
253    T: MzTimestamp,
254    S::Timestamp: MzTimestamp + Refines<T>,
255{
256    /// Presents `self` as a stream of updates.
257    ///
258    /// Deprecated: This function is not fueled and hence risks flattening the whole arrangement.
259    ///
260    /// This method presents the contents as they are, without further computation.
261    /// If you have logic that could be applied to each record, consider using the
262    /// `flat_map` methods which allows this and can reduce the work done.
263    #[deprecated(note = "Use `flat_map` instead.")]
264    pub fn as_collection(
265        &self,
266    ) -> (
267        VecCollection<S, Row, Diff>,
268        VecCollection<S, DataflowError, Diff>,
269    ) {
270        let mut datums = DatumVec::new();
271        let logic = move |k: DatumSeq, v: DatumSeq| {
272            let mut datums_borrow = datums.borrow();
273            datums_borrow.extend(k);
274            datums_borrow.extend(v);
275            SharedRow::pack(&**datums_borrow)
276        };
277        match &self {
278            ArrangementFlavor::Local(oks, errs) => (
279                oks.as_collection(logic),
280                errs.as_collection(|k, &()| k.clone()),
281            ),
282            ArrangementFlavor::Trace(_, oks, errs) => (
283                oks.as_collection(logic),
284                errs.as_collection(|k, &()| k.clone()),
285            ),
286        }
287    }
288
289    /// Constructs and applies logic to elements of `self` and returns the results.
290    ///
291    /// The `logic` receives a vector of datums, a timestamp, and a diff, and produces
292    /// an iterator of `(D, S::Timestamp, Diff)` updates.
293    ///
294    /// If `key` is set, this is a promise that `logic` will produce no results on
295    /// records for which the key does not evaluate to the value. This is used to
296    /// leap directly to exactly those records.
297    ///
298    /// The `max_demand` parameter limits the number of columns decoded from the
299    /// input. Only the first `max_demand` columns are decoded. Pass `usize::MAX` to
300    /// decode all columns.
301    pub fn flat_map<D, I, L>(
302        &self,
303        key: Option<&Row>,
304        max_demand: usize,
305        mut logic: L,
306    ) -> (Stream<S, I::Item>, VecCollection<S, DataflowError, Diff>)
307    where
308        I: IntoIterator<Item = (D, S::Timestamp, Diff)>,
309        D: Data,
310        L: for<'a, 'b> FnMut(&'a mut DatumVecBorrow<'b>, S::Timestamp, Diff) -> I + 'static,
311    {
312        // Set a number of tuples after which the operator should yield.
313        // This allows us to remain responsive even when enumerating a substantial
314        // arrangement, as well as provides time to accumulate our produced output.
315        let refuel = 1000000;
316
317        let mut datums = DatumVec::new();
318        let logic = move |k: DatumSeq, v: DatumSeq, t, d| {
319            let mut datums_borrow = datums.borrow();
320            datums_borrow.extend(k.to_datum_iter().take(max_demand));
321            let max_demand = max_demand.saturating_sub(datums_borrow.len());
322            datums_borrow.extend(v.to_datum_iter().take(max_demand));
323            logic(&mut datums_borrow, t, d)
324        };
325
326        match &self {
327            ArrangementFlavor::Local(oks, errs) => {
328                let oks = CollectionBundle::<S, T>::flat_map_core(oks, key, logic, refuel);
329                let errs = errs.as_collection(|k, &()| k.clone());
330                (oks, errs)
331            }
332            ArrangementFlavor::Trace(_, oks, errs) => {
333                let oks = CollectionBundle::<S, T>::flat_map_core(oks, key, logic, refuel);
334                let errs = errs.as_collection(|k, &()| k.clone());
335                (oks, errs)
336            }
337        }
338    }
339}
340impl<S: Scope, T> ArrangementFlavor<S, T>
341where
342    T: MzTimestamp,
343    S::Timestamp: MzTimestamp + Refines<T>,
344{
345    /// The scope containing the collection bundle.
346    pub fn scope(&self) -> S {
347        match self {
348            ArrangementFlavor::Local(oks, _errs) => oks.stream.scope(),
349            ArrangementFlavor::Trace(_gid, oks, _errs) => oks.stream.scope(),
350        }
351    }
352
353    /// Brings the arrangement flavor into a region.
354    pub fn enter_region<'a>(
355        &self,
356        region: &Child<'a, S, S::Timestamp>,
357    ) -> ArrangementFlavor<Child<'a, S, S::Timestamp>, T> {
358        match self {
359            ArrangementFlavor::Local(oks, errs) => {
360                ArrangementFlavor::Local(oks.enter_region(region), errs.enter_region(region))
361            }
362            ArrangementFlavor::Trace(gid, oks, errs) => {
363                ArrangementFlavor::Trace(*gid, oks.enter_region(region), errs.enter_region(region))
364            }
365        }
366    }
367}
368impl<'a, S: Scope, T> ArrangementFlavor<Child<'a, S, S::Timestamp>, T>
369where
370    T: MzTimestamp,
371    S::Timestamp: MzTimestamp + Refines<T>,
372{
373    /// Extracts the arrangement flavor from a region.
374    pub fn leave_region(&self) -> ArrangementFlavor<S, T> {
375        match self {
376            ArrangementFlavor::Local(oks, errs) => {
377                ArrangementFlavor::Local(oks.leave_region(), errs.leave_region())
378            }
379            ArrangementFlavor::Trace(gid, oks, errs) => {
380                ArrangementFlavor::Trace(*gid, oks.leave_region(), errs.leave_region())
381            }
382        }
383    }
384}
385
386/// A bundle of the various ways a collection can be represented.
387///
388/// This type maintains the invariant that it does contain at least one valid
389/// source of data, either a collection or at least one arrangement.
390#[derive(Clone)]
391pub struct CollectionBundle<S: Scope, T = mz_repr::Timestamp>
392where
393    T: MzTimestamp,
394    S::Timestamp: MzTimestamp + Refines<T>,
395{
396    pub collection: Option<(
397        VecCollection<S, Row, Diff>,
398        VecCollection<S, DataflowError, Diff>,
399    )>,
400    pub arranged: BTreeMap<Vec<MirScalarExpr>, ArrangementFlavor<S, T>>,
401}
402
403impl<S: Scope, T> CollectionBundle<S, T>
404where
405    T: MzTimestamp,
406    S::Timestamp: MzTimestamp + Refines<T>,
407{
408    /// Construct a new collection bundle from update streams.
409    pub fn from_collections(
410        oks: VecCollection<S, Row, Diff>,
411        errs: VecCollection<S, DataflowError, Diff>,
412    ) -> Self {
413        Self {
414            collection: Some((oks, errs)),
415            arranged: BTreeMap::default(),
416        }
417    }
418
419    /// Inserts arrangements by the expressions on which they are keyed.
420    pub fn from_expressions(
421        exprs: Vec<MirScalarExpr>,
422        arrangements: ArrangementFlavor<S, T>,
423    ) -> Self {
424        let mut arranged = BTreeMap::new();
425        arranged.insert(exprs, arrangements);
426        Self {
427            collection: None,
428            arranged,
429        }
430    }
431
432    /// Inserts arrangements by the columns on which they are keyed.
433    pub fn from_columns<I: IntoIterator<Item = usize>>(
434        columns: I,
435        arrangements: ArrangementFlavor<S, T>,
436    ) -> Self {
437        let mut keys = Vec::new();
438        for column in columns {
439            keys.push(MirScalarExpr::column(column));
440        }
441        Self::from_expressions(keys, arrangements)
442    }
443
444    /// The scope containing the collection bundle.
445    pub fn scope(&self) -> S {
446        if let Some((oks, _errs)) = &self.collection {
447            oks.inner.scope()
448        } else {
449            self.arranged
450                .values()
451                .next()
452                .expect("Must contain a valid collection")
453                .scope()
454        }
455    }
456
457    /// Brings the collection bundle into a region.
458    pub fn enter_region<'a>(
459        &self,
460        region: &Child<'a, S, S::Timestamp>,
461    ) -> CollectionBundle<Child<'a, S, S::Timestamp>, T> {
462        CollectionBundle {
463            collection: self
464                .collection
465                .as_ref()
466                .map(|(oks, errs)| (oks.enter_region(region), errs.enter_region(region))),
467            arranged: self
468                .arranged
469                .iter()
470                .map(|(key, bundle)| (key.clone(), bundle.enter_region(region)))
471                .collect(),
472        }
473    }
474}
475
476impl<'a, S: Scope, T> CollectionBundle<Child<'a, S, S::Timestamp>, T>
477where
478    T: MzTimestamp,
479    S::Timestamp: MzTimestamp + Refines<T>,
480{
481    /// Extracts the collection bundle from a region.
482    pub fn leave_region(&self) -> CollectionBundle<S, T> {
483        CollectionBundle {
484            collection: self
485                .collection
486                .as_ref()
487                .map(|(oks, errs)| (oks.leave_region(), errs.leave_region())),
488            arranged: self
489                .arranged
490                .iter()
491                .map(|(key, bundle)| (key.clone(), bundle.leave_region()))
492                .collect(),
493        }
494    }
495}
496
497impl<S: Scope, T> CollectionBundle<S, T>
498where
499    T: MzTimestamp,
500    S::Timestamp: MzTimestamp + Refines<T>,
501{
502    /// Asserts that the arrangement for a specific key
503    /// (or the raw collection for no key) exists,
504    /// and returns the corresponding collection.
505    ///
506    /// This returns the collection as-is, without
507    /// doing any unthinning transformation.
508    /// Therefore, it should be used when the appropriate transformation
509    /// was planned as part of a following MFP.
510    ///
511    /// If `key` is specified, the function converts the arrangement to a collection. It uses either
512    /// the fueled `flat_map` or `as_collection` method, depending on the flag
513    /// [`ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION`].
514    pub fn as_specific_collection(
515        &self,
516        key: Option<&[MirScalarExpr]>,
517        config_set: &ConfigSet,
518    ) -> (
519        VecCollection<S, Row, Diff>,
520        VecCollection<S, DataflowError, Diff>,
521    ) {
522        // Any operator that uses this method was told to use a particular
523        // collection during LIR planning, where we should have made
524        // sure that that collection exists.
525        //
526        // If it doesn't, we panic.
527        match key {
528            None => self
529                .collection
530                .clone()
531                .expect("The unarranged collection doesn't exist."),
532            Some(key) => {
533                let arranged = self.arranged.get(key).unwrap_or_else(|| {
534                    panic!("The collection arranged by {:?} doesn't exist.", key)
535                });
536                if ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION.get(config_set) {
537                    // Decode all columns, pass max_demand as usize::MAX.
538                    let (ok, err) = arranged.flat_map(None, usize::MAX, |borrow, t, r| {
539                        Some((SharedRow::pack(borrow.iter()), t, r))
540                    });
541                    (ok.as_collection(), err)
542                } else {
543                    #[allow(deprecated)]
544                    arranged.as_collection()
545                }
546            }
547        }
548    }
549
550    /// Constructs and applies logic to elements of a collection and returns the results.
551    ///
552    /// The function applies `logic` on elements. The logic conceptually receives
553    /// `(&Row, &Row)` pairs in the form of a datum vec in the expected order.
554    ///
555    /// If `key_val` is set, this is a promise that `logic` will produce no results on
556    /// records for which the key does not evaluate to the value. This is used when we
557    /// have an arrangement by that key to leap directly to exactly those records.
558    /// It is important that `logic` still guard against data that does not satisfy
559    /// this constraint, as this method does not statically know that it will have
560    /// that arrangement.
561    ///
562    /// The `max_demand` parameter limits the number of columns decoded from the
563    /// input. Only the first `max_demand` columns are decoded. Pass `usize::MAX` to
564    /// decode all columns.
565    pub fn flat_map<D, I, L>(
566        &self,
567        key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
568        max_demand: usize,
569        mut logic: L,
570    ) -> (Stream<S, I::Item>, VecCollection<S, DataflowError, Diff>)
571    where
572        I: IntoIterator<Item = (D, S::Timestamp, Diff)>,
573        D: Data,
574        L: for<'a> FnMut(&'a mut DatumVecBorrow<'_>, S::Timestamp, Diff) -> I + 'static,
575    {
576        // If `key_val` is set, we should have to use the corresponding arrangement.
577        // If there isn't one, that implies an error in the contract between
578        // key-production and available arrangements.
579        if let Some((key, val)) = key_val {
580            self.arrangement(&key)
581                .expect("Should have ensured during planning that this arrangement exists.")
582                .flat_map(val.as_ref(), max_demand, logic)
583        } else {
584            use timely::dataflow::operators::Map;
585            let (oks, errs) = self
586                .collection
587                .clone()
588                .expect("Invariant violated: CollectionBundle contains no collection.");
589            let mut datums = DatumVec::new();
590            let oks = oks.inner.flat_map(move |(v, t, d)| {
591                logic(&mut datums.borrow_with_limit(&v, max_demand), t, d)
592            });
593            (oks, errs)
594        }
595    }
596
597    /// Factored out common logic for using literal keys in general traces.
598    ///
599    /// This logic is sufficiently interesting that we want to write it only
600    /// once, and thereby avoid any skew in the two uses of the logic.
601    ///
602    /// The function presents the contents of the trace as `(key, value, time, delta)` tuples,
603    /// where key and value are potentially specialized, but convertible into rows.
604    fn flat_map_core<Tr, D, I, L>(
605        trace: &Arranged<S, Tr>,
606        key: Option<&Tr::KeyOwn>,
607        mut logic: L,
608        refuel: usize,
609    ) -> Stream<S, I::Item>
610    where
611        Tr: for<'a> TraceReader<
612                Key<'a>: ToDatumIter,
613                KeyOwn: PartialEq,
614                Val<'a>: ToDatumIter,
615                Time = S::Timestamp,
616                Diff = mz_repr::Diff,
617            > + Clone
618            + 'static,
619        I: IntoIterator<Item = (D, Tr::Time, Tr::Diff)>,
620        D: Data,
621        L: FnMut(Tr::Key<'_>, Tr::Val<'_>, S::Timestamp, mz_repr::Diff) -> I + 'static,
622    {
623        use differential_dataflow::consolidation::ConsolidatingContainerBuilder as CB;
624
625        let mut key_con = Tr::KeyContainer::with_capacity(1);
626        if let Some(key) = &key {
627            key_con.push_own(key);
628        }
629        let mode = if key.is_some() { "index" } else { "scan" };
630        let name = format!("ArrangementFlatMap({})", mode);
631        use timely::dataflow::operators::Operator;
632        trace
633            .stream
634            .unary::<CB<_>, _, _, _>(Pipeline, &name, move |_, info| {
635                // Acquire an activator to reschedule the operator when it has unfinished work.
636                let activator = trace.stream.scope().activator_for(info.address);
637                // Maintain a list of work to do, cursor to navigate and process.
638                let mut todo = std::collections::VecDeque::new();
639                move |input, output| {
640                    let key = key_con.get(0);
641                    // First, dequeue all batches.
642                    input.for_each(|time, data| {
643                        let capability = time.retain();
644                        for batch in data.iter() {
645                            // enqueue a capability, cursor, and batch.
646                            todo.push_back(PendingWork::new(
647                                capability.clone(),
648                                batch.cursor(),
649                                batch.clone(),
650                            ));
651                        }
652                    });
653
654                    // Second, make progress on `todo`.
655                    let mut fuel = refuel;
656                    while !todo.is_empty() && fuel > 0 {
657                        todo.front_mut().unwrap().do_work(
658                            key.as_ref(),
659                            &mut logic,
660                            &mut fuel,
661                            output,
662                        );
663                        if fuel > 0 {
664                            todo.pop_front();
665                        }
666                    }
667                    // If we have not finished all work, re-activate the operator.
668                    if !todo.is_empty() {
669                        activator.activate();
670                    }
671                }
672            })
673    }
674
675    /// Look up an arrangement by the expressions that form the key.
676    ///
677    /// The result may be `None` if no such arrangement exists, or it may be one of many
678    /// "arrangement flavors" that represent the types of arranged data we might have.
679    pub fn arrangement(&self, key: &[MirScalarExpr]) -> Option<ArrangementFlavor<S, T>> {
680        self.arranged.get(key).map(|x| x.clone())
681    }
682}
683
684impl<S, T> CollectionBundle<S, T>
685where
686    T: MzTimestamp,
687    S: Scope,
688    S::Timestamp: Refines<T> + RenderTimestamp,
689{
690    /// Presents `self` as a stream of updates, having been subjected to `mfp`.
691    ///
692    /// This operator is able to apply the logic of `mfp` early, which can substantially
693    /// reduce the amount of data produced when `mfp` is non-trivial.
694    ///
695    /// The `key_val` argument, when present, indicates that a specific arrangement should
696    /// be used, and if, in addition, the `val` component is present,
697    /// that we can seek to the supplied row.
698    pub fn as_collection_core(
699        &self,
700        mut mfp: MapFilterProject,
701        key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
702        until: Antichain<mz_repr::Timestamp>,
703        config_set: &ConfigSet,
704    ) -> (
705        VecCollection<S, mz_repr::Row, Diff>,
706        VecCollection<S, DataflowError, Diff>,
707    ) {
708        mfp.optimize();
709        let mfp_plan = mfp.clone().into_plan().unwrap();
710
711        // If the MFP is trivial, we can just call `as_collection`.
712        // In the case that we weren't going to apply the `key_val` optimization,
713        // this path results in a slightly smaller and faster
714        // dataflow graph, and is intended to fix
715        // https://github.com/MaterializeInc/database-issues/issues/3111
716        let has_key_val = if let Some((_key, Some(_val))) = &key_val {
717            true
718        } else {
719            false
720        };
721
722        if mfp_plan.is_identity() && !has_key_val {
723            let key = key_val.map(|(k, _v)| k);
724            return self.as_specific_collection(key.as_deref(), config_set);
725        }
726
727        let max_demand = mfp.demand().iter().max().map(|x| *x + 1).unwrap_or(0);
728        mfp.permute_fn(|c| c, max_demand);
729        mfp.optimize();
730        let mfp_plan = mfp.into_plan().unwrap();
731
732        let mut datum_vec = DatumVec::new();
733        // Wrap in an `Rc` so that lifetimes work out.
734        let until = std::rc::Rc::new(until);
735
736        let (stream, errors) = self.flat_map(key_val, max_demand, move |row_datums, time, diff| {
737            let mut row_builder = SharedRow::get();
738            let until = std::rc::Rc::clone(&until);
739            let temp_storage = RowArena::new();
740            let row_iter = row_datums.iter();
741            let mut datums_local = datum_vec.borrow();
742            datums_local.extend(row_iter);
743            let time = time.clone();
744            let event_time = time.event_time();
745            mfp_plan
746                .evaluate(
747                    &mut datums_local,
748                    &temp_storage,
749                    event_time,
750                    diff.clone(),
751                    move |time| !until.less_equal(time),
752                    &mut row_builder,
753                )
754                .map(move |x| match x {
755                    Ok((row, event_time, diff)) => {
756                        // Copy the whole time, and re-populate event time.
757                        let mut time: S::Timestamp = time.clone();
758                        *time.event_time_mut() = event_time;
759                        (Ok(row), time, diff)
760                    }
761                    Err((e, event_time, diff)) => {
762                        // Copy the whole time, and re-populate event time.
763                        let mut time: S::Timestamp = time.clone();
764                        *time.event_time_mut() = event_time;
765                        (Err(e), time, diff)
766                    }
767                })
768        });
769
770        use differential_dataflow::AsCollection;
771        let (oks, errs) = stream
772            .as_collection()
773            .map_fallible::<CapacityContainerBuilder<_>, CapacityContainerBuilder<_>, _, _, _>(
774                "OkErr",
775                |x| x,
776            );
777
778        (oks, errors.concat(&errs))
779    }
780    pub fn ensure_collections(
781        mut self,
782        collections: AvailableCollections,
783        input_key: Option<Vec<MirScalarExpr>>,
784        input_mfp: MapFilterProject,
785        until: Antichain<mz_repr::Timestamp>,
786        config_set: &ConfigSet,
787    ) -> Self {
788        if collections == Default::default() {
789            return self;
790        }
791        // Cache collection to avoid reforming it each time.
792        //
793        // TODO(mcsherry): In theory this could be faster run out of another arrangement,
794        // as the `map_fallible` that follows could be run against an arrangement itself.
795        //
796        // Note(btv): If we ever do that, we would then only need to make the raw collection here
797        // if `collections.raw` is true.
798
799        for (key, _, _) in collections.arranged.iter() {
800            soft_assert_or_log!(
801                !self.arranged.contains_key(key),
802                "LIR ArrangeBy tried to create an existing arrangement"
803            );
804        }
805
806        // We need the collection if either (1) it is explicitly demanded, or (2) we are going to render any arrangement
807        let form_raw_collection = collections.raw
808            || collections
809                .arranged
810                .iter()
811                .any(|(key, _, _)| !self.arranged.contains_key(key));
812        if form_raw_collection && self.collection.is_none() {
813            self.collection = Some(self.as_collection_core(
814                input_mfp,
815                input_key.map(|k| (k, None)),
816                until,
817                config_set,
818            ));
819        }
820        for (key, _, thinning) in collections.arranged {
821            if !self.arranged.contains_key(&key) {
822                // TODO: Consider allowing more expressive names.
823                let name = format!("ArrangeBy[{:?}]", key);
824
825                let (oks, errs) = self
826                    .collection
827                    .clone()
828                    .expect("Collection constructed above");
829                let (oks, errs_keyed) =
830                    Self::arrange_collection(&name, oks, key.clone(), thinning.clone());
831                let errs: KeyCollection<_, _, _> = errs.concat(&errs_keyed).into();
832                let errs = errs.mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, ErrSpine<_, _>>(
833                    &format!("{}-errors", name),
834                );
835                self.arranged
836                    .insert(key, ArrangementFlavor::Local(oks, errs));
837            }
838        }
839        self
840    }
841
842    /// Builds an arrangement from a collection, using the specified key and value thinning.
843    ///
844    /// The arrangement's key is based on the `key` expressions, and the value the input with
845    /// the `thinning` applied to it. It selects which of the input columns are included in the
846    /// value of the arrangement. The thinning is in support of permuting arrangements such that
847    /// columns in the key are not included in the value.
848    fn arrange_collection(
849        name: &String,
850        oks: VecCollection<S, Row, Diff>,
851        key: Vec<MirScalarExpr>,
852        thinning: Vec<usize>,
853    ) -> (
854        Arranged<S, RowRowAgent<S::Timestamp, Diff>>,
855        VecCollection<S, DataflowError, Diff>,
856    ) {
857        // The following `unary_fallible` implements a `map_fallible`, but produces columnar updates
858        // for the ok stream. The `map_fallible` cannot be used here because the closure cannot
859        // return references, which is what we need to push into columnar streams. Instead, we use
860        // a bespoke operator that also optimizes reuse of allocations across individual updates.
861        let (oks, errs) = oks
862            .inner
863            .unary_fallible::<ColumnBuilder<((Row, Row), S::Timestamp, Diff)>, _, _, _>(
864                Pipeline,
865                "FormArrangementKey",
866                move |_, _| {
867                    Box::new(move |input, ok, err| {
868                        let mut key_buf = Row::default();
869                        let mut val_buf = Row::default();
870                        let mut datums = DatumVec::new();
871                        let mut temp_storage = RowArena::new();
872                        while let Some((time, data)) = input.next() {
873                            let mut ok_session = ok.session_with_builder(&time);
874                            let mut err_session = err.session(&time);
875                            for (row, time, diff) in data.iter() {
876                                temp_storage.clear();
877                                let datums = datums.borrow_with(row);
878                                let key_iter = key.iter().map(|k| k.eval(&datums, &temp_storage));
879                                match key_buf.packer().try_extend(key_iter) {
880                                    Ok(()) => {
881                                        let val_datum_iter = thinning.iter().map(|c| datums[*c]);
882                                        val_buf.packer().extend(val_datum_iter);
883                                        ok_session.give(((&*key_buf, &*val_buf), time, diff));
884                                    }
885                                    Err(e) => {
886                                        err_session.give((e.into(), time.clone(), *diff));
887                                    }
888                                }
889                            }
890                        }
891                    })
892                },
893            );
894        let oks = oks
895            .mz_arrange_core::<_, Col2ValBatcher<_, _,_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
896                ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<Row, Row, S::Timestamp, Diff>),name
897            );
898        (oks, errs.as_collection())
899    }
900}
901
902struct PendingWork<C>
903where
904    C: Cursor,
905{
906    capability: Capability<C::Time>,
907    cursor: C,
908    batch: C::Storage,
909}
910
911impl<C> PendingWork<C>
912where
913    C: Cursor<KeyOwn: PartialEq + Sized>,
914{
915    /// Create a new bundle of pending work, from the capability, cursor, and backing storage.
916    fn new(capability: Capability<C::Time>, cursor: C, batch: C::Storage) -> Self {
917        Self {
918            capability,
919            cursor,
920            batch,
921        }
922    }
923    /// Perform roughly `fuel` work through the cursor, applying `logic` and sending results to `output`.
924    fn do_work<I, D, L>(
925        &mut self,
926        key: Option<&C::Key<'_>>,
927        logic: &mut L,
928        fuel: &mut usize,
929        output: &mut OutputBuilderSession<'_, C::Time, ConsolidatingContainerBuilder<Vec<I::Item>>>,
930    ) where
931        I: IntoIterator<Item = (D, C::Time, C::Diff)>,
932        D: Data,
933        L: FnMut(C::Key<'_>, C::Val<'_>, C::Time, C::Diff) -> I + 'static,
934    {
935        use differential_dataflow::consolidation::consolidate;
936
937        // Attempt to make progress on this batch.
938        let mut work: usize = 0;
939        let mut session = output.session_with_builder(&self.capability);
940        let mut buffer = Vec::new();
941        if let Some(key) = key {
942            let key = C::KeyContainer::reborrow(*key);
943            if self.cursor.get_key(&self.batch).map(|k| k == key) != Some(true) {
944                self.cursor.seek_key(&self.batch, key);
945            }
946            if self.cursor.get_key(&self.batch).map(|k| k == key) == Some(true) {
947                let key = self.cursor.key(&self.batch);
948                while let Some(val) = self.cursor.get_val(&self.batch) {
949                    self.cursor.map_times(&self.batch, |time, diff| {
950                        buffer.push((C::owned_time(time), C::owned_diff(diff)));
951                    });
952                    consolidate(&mut buffer);
953                    for (time, diff) in buffer.drain(..) {
954                        for datum in logic(key, val, time, diff) {
955                            session.give(datum);
956                            work += 1;
957                        }
958                    }
959                    self.cursor.step_val(&self.batch);
960                    if work >= *fuel {
961                        *fuel = 0;
962                        return;
963                    }
964                }
965            }
966        } else {
967            while let Some(key) = self.cursor.get_key(&self.batch) {
968                while let Some(val) = self.cursor.get_val(&self.batch) {
969                    self.cursor.map_times(&self.batch, |time, diff| {
970                        buffer.push((C::owned_time(time), C::owned_diff(diff)));
971                    });
972                    consolidate(&mut buffer);
973                    for (time, diff) in buffer.drain(..) {
974                        for datum in logic(key, val, time, diff) {
975                            session.give(datum);
976                            work += 1;
977                        }
978                    }
979                    self.cursor.step_val(&self.batch);
980                    if work >= *fuel {
981                        *fuel = 0;
982                        return;
983                    }
984                }
985                self.cursor.step_key(&self.batch);
986            }
987        }
988        *fuel -= work;
989    }
990}