Skip to main content

mz_compute/render/
context.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Management of dataflow-local state, like arrangements, while building a
11//! dataflow.
12
13use std::collections::BTreeMap;
14use std::rc::Rc;
15
16use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
17use differential_dataflow::operators::arrange::Arranged;
18use differential_dataflow::trace::implementations::BatchContainer;
19use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
20use differential_dataflow::{AsCollection, Data, VecCollection};
21use mz_compute_types::dataflows::DataflowDescription;
22use mz_compute_types::dyncfgs::{
23    ENABLE_COLUMN_PAGED_BATCHER, ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION,
24    ENABLE_COMPUTE_TEMPORAL_BUCKETING, TEMPORAL_BUCKETING_SUMMARY,
25};
26use mz_compute_types::plan::{ArrangementStrategy, AvailableCollections};
27use mz_dyncfg::ConfigSet;
28use mz_expr::{Eval, Id, MapFilterProject, MirScalarExpr};
29use mz_ore::soft_assert_or_log;
30use mz_repr::fixed_length::ExtendDatums;
31use mz_repr::{DatumVec, DatumVecBorrow, Diff, GlobalId, Row, RowArena, SharedRow};
32use mz_storage_types::controller::CollectionMetadata;
33use mz_timely_util::columnar::batcher;
34use mz_timely_util::columnar::builder::ColumnBuilder;
35use mz_timely_util::columnar::{Col2ValBatcher, Col2ValPagedBatcher, columnar_exchange};
36use mz_timely_util::columnation::ColumnationChunker;
37use timely::ContainerBuilder;
38use timely::container::{CapacityContainerBuilder, PushInto};
39use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
40use timely::dataflow::operators::Capability;
41use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
42use timely::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession};
43use timely::dataflow::{Scope, Stream};
44use timely::progress::operate::FrontierInterest;
45use timely::progress::{Antichain, Timestamp};
46
47use crate::compute_state::ComputeState;
48use crate::extensions::arrange::{KeyCollection, MzArrange, MzArrangeCore};
49use crate::render::errors::{DataflowErrorSer, ErrorLogger};
50use crate::render::{LinearJoinSpec, MaybeBucketByTime, RenderTimestamp};
51use crate::typedefs::{
52    ErrAgent, ErrBatcher, ErrBuilder, ErrEnter, ErrSpine, RowRowAgent, RowRowEnter, RowRowSpine,
53};
54use mz_row_spine::{DatumSeq, RowRowBuilder, RowRowColPagedBuilder};
55
56/// Dataflow-local collections and arrangements.
57///
58/// A context means to wrap available data assets and present them in an easy-to-use manner.
59/// These assets include dataflow-local collections and arrangements, as well as imported
60/// arrangements from outside the dataflow.
61///
62/// Context has a timestamp type `T`, which is the timestamp used by the scope in question.
63pub struct Context<'scope, T: RenderTimestamp> {
64    /// The scope within which all managed collections exist.
65    ///
66    /// It is an error to add any collections not contained in this scope.
67    pub(crate) scope: Scope<'scope, T>,
68    /// The debug name of the dataflow associated with this context.
69    pub debug_name: String,
70    /// The Timely ID of the dataflow associated with this context.
71    pub dataflow_id: usize,
72    /// The collection IDs of exports of the dataflow associated with this context.
73    pub export_ids: Vec<GlobalId>,
74    /// Frontier before which updates should not be emitted.
75    ///
76    /// We *must* apply it to sinks, to ensure correct outputs.
77    /// We *should* apply it to sources and imported traces, because it improves performance.
78    pub as_of_frontier: Antichain<mz_repr::Timestamp>,
79    /// Frontier after which updates should not be emitted.
80    /// Used to limit the amount of work done when appropriate.
81    pub until: Antichain<mz_repr::Timestamp>,
82    /// Bindings of identifiers to collections.
83    pub bindings: BTreeMap<Id, CollectionBundle<'scope, T>>,
84    /// The logger, from Timely's logging framework, if logs are enabled.
85    pub(super) compute_logger: Option<crate::logging::compute::Logger>,
86    /// Specification for rendering linear joins.
87    pub(super) linear_join_spec: LinearJoinSpec,
88    /// The expiration time for dataflows in this context. The output's frontier should never advance
89    /// past this frontier, except the empty frontier.
90    pub dataflow_expiration: Antichain<mz_repr::Timestamp>,
91    /// The config set for this context.
92    pub config_set: Rc<ConfigSet>,
93}
94
95impl<'scope, T: RenderTimestamp> Context<'scope, T> {
96    /// Creates a new empty Context.
97    pub fn for_dataflow_in<Plan>(
98        dataflow: &DataflowDescription<Plan, CollectionMetadata>,
99        scope: Scope<'scope, T>,
100        compute_state: &ComputeState,
101        until: Antichain<mz_repr::Timestamp>,
102        dataflow_expiration: Antichain<mz_repr::Timestamp>,
103    ) -> Self {
104        use mz_ore::collections::CollectionExt as IteratorExt;
105        let dataflow_id = *scope.addr().into_first();
106        let as_of_frontier = dataflow
107            .as_of
108            .clone()
109            .unwrap_or_else(|| Antichain::from_elem(Timestamp::minimum()));
110
111        let export_ids = dataflow.export_ids().collect();
112
113        // Skip compute event logging for transient dataflows. We do this to avoid overhead for
114        // slow-path peeks, but it also affects subscribes. For now that seems fine, but we may
115        // want to reconsider in the future.
116        let compute_logger = if dataflow.is_transient() {
117            None
118        } else {
119            compute_state.compute_logger.clone()
120        };
121
122        Self {
123            scope,
124            debug_name: dataflow.debug_name.clone(),
125            dataflow_id,
126            export_ids,
127            as_of_frontier,
128            until,
129            bindings: BTreeMap::new(),
130            compute_logger,
131            linear_join_spec: compute_state.linear_join_spec,
132            dataflow_expiration,
133            config_set: Rc::clone(&compute_state.worker_config),
134        }
135    }
136}
137
138impl<'scope, T: RenderTimestamp> Context<'scope, T> {
139    /// Insert a collection bundle by an identifier.
140    ///
141    /// This is expected to be used to install external collections (sources, indexes, other views),
142    /// as well as for `Let` bindings of local collections.
143    pub fn insert_id(
144        &mut self,
145        id: Id,
146        collection: CollectionBundle<'scope, T>,
147    ) -> Option<CollectionBundle<'scope, T>> {
148        self.bindings.insert(id, collection)
149    }
150    /// Remove a collection bundle by an identifier.
151    ///
152    /// The primary use of this method is uninstalling `Let` bindings.
153    pub fn remove_id(&mut self, id: Id) -> Option<CollectionBundle<'scope, T>> {
154        self.bindings.remove(&id)
155    }
156    /// Melds a collection bundle to whatever exists.
157    pub fn update_id(&mut self, id: Id, collection: CollectionBundle<'scope, T>) {
158        if !self.bindings.contains_key(&id) {
159            self.bindings.insert(id, collection);
160        } else {
161            let binding = self
162                .bindings
163                .get_mut(&id)
164                .expect("Binding verified to exist");
165            if collection.collection.is_some() {
166                binding.collection = collection.collection;
167            }
168            for (key, flavor) in collection.arranged.into_iter() {
169                binding.arranged.insert(key, flavor);
170            }
171        }
172    }
173    /// Look up a collection bundle by an identifier.
174    pub fn lookup_id(&self, id: Id) -> Option<CollectionBundle<'scope, T>> {
175        self.bindings.get(&id).cloned()
176    }
177
178    pub(super) fn error_logger(&self) -> ErrorLogger {
179        ErrorLogger::new(self.debug_name.clone())
180    }
181}
182
183impl<'scope, T: RenderTimestamp> Context<'scope, T> {
184    /// Brings the underlying arrangements and collections into a region.
185    pub fn enter_region<'a>(
186        &self,
187        region: Scope<'a, T>,
188        bindings: Option<&std::collections::BTreeSet<Id>>,
189    ) -> Context<'a, T> {
190        let bindings = self
191            .bindings
192            .iter()
193            .filter(|(key, _)| bindings.as_ref().map(|b| b.contains(key)).unwrap_or(true))
194            .map(|(key, bundle)| (*key, bundle.enter_region(region)))
195            .collect();
196
197        Context {
198            scope: region,
199            debug_name: self.debug_name.clone(),
200            dataflow_id: self.dataflow_id.clone(),
201            export_ids: self.export_ids.clone(),
202            as_of_frontier: self.as_of_frontier.clone(),
203            until: self.until.clone(),
204            compute_logger: self.compute_logger.clone(),
205            linear_join_spec: self.linear_join_spec.clone(),
206            bindings,
207            dataflow_expiration: self.dataflow_expiration.clone(),
208            config_set: Rc::clone(&self.config_set),
209        }
210    }
211}
212
213/// Describes flavor of arrangement: local or imported trace.
214#[derive(Clone)]
215pub enum ArrangementFlavor<'scope, T: RenderTimestamp> {
216    /// A dataflow-local arrangement.
217    Local(
218        Arranged<'scope, RowRowAgent<T, Diff>>,
219        Arranged<'scope, ErrAgent<T, Diff>>,
220    ),
221    /// An imported trace from outside the dataflow.
222    ///
223    /// The `GlobalId` identifier exists so that exports of this same trace
224    /// can refer back to and depend on the original instance.
225    Trace(
226        GlobalId,
227        Arranged<'scope, RowRowEnter<mz_repr::Timestamp, Diff, T>>,
228        Arranged<'scope, ErrEnter<mz_repr::Timestamp, T>>,
229    ),
230}
231
232impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> {
233    /// Presents `self` as a stream of updates.
234    ///
235    /// Deprecated: This function is not fueled and hence risks flattening the whole arrangement.
236    ///
237    /// This method presents the contents as they are, without further computation.
238    /// If you have logic that could be applied to each record, consider using the
239    /// `flat_map` methods which allows this and can reduce the work done.
240    #[deprecated(note = "Use `flat_map` instead.")]
241    pub fn as_collection(
242        &self,
243    ) -> (
244        VecCollection<'scope, T, Row, Diff>,
245        VecCollection<'scope, T, DataflowErrorSer, Diff>,
246    ) {
247        let mut datums = DatumVec::new();
248        let logic = move |k: DatumSeq, v: DatumSeq| {
249            let temp_storage = RowArena::new();
250            let mut datums_borrow = datums.borrow();
251            k.extend_datums(&temp_storage, &mut datums_borrow, None);
252            v.extend_datums(&temp_storage, &mut datums_borrow, None);
253            SharedRow::pack(&**datums_borrow)
254        };
255        match &self {
256            ArrangementFlavor::Local(oks, errs) => (
257                oks.clone().as_collection(logic),
258                errs.clone().as_collection(|k, &()| k.clone()),
259            ),
260            ArrangementFlavor::Trace(_, oks, errs) => (
261                oks.clone().as_collection(logic),
262                errs.clone().as_collection(|k, &()| k.clone()),
263            ),
264        }
265    }
266
267    /// Constructs and applies logic to elements of `self` and returns the results.
268    ///
269    /// The `logic` callback receives a borrow of the decoded datum vector, a timestamp, a
270    /// diff, and two output sessions: one for `ok` updates of type `(D, T, Diff)` and one for
271    /// MFP-style `DataflowErrorSer` updates. It must return the number of records *produced*
272    /// (written to either session), not the number of input tuples consumed.
273    ///
274    /// # Fuel
275    ///
276    /// The operator accumulates the returned counts as fuel and yields when the total reaches
277    /// an internal refuel threshold. The metric is output-produced (not input-consumed) on
278    /// purpose: it regulates two asymmetric pressures.
279    ///
280    /// * **Drain inputs.** The operator holds a clone of each pending `Batch` until its work
281    ///   item pops; we want to release that memory back to the upstream arrangement as soon
282    ///   as possible. A `filter(false)` MFP returns 0 for every tuple, so fuel never trips
283    ///   and the cursor runs to end-of-batch in one activation.
284    /// * **Throttle outputs.** A `map("1KB-string")` MFP produces large records per input;
285    ///   stopping when emit count hits the threshold caps how much data a single activation
286    ///   dumps on the next operator.
287    ///
288    /// The refuel constant is a pragmatic compromise: large enough to be a non-event in
289    /// steady-state, small enough that one activation can't flood downstream. There is no
290    /// universal value across MFP shapes.
291    ///
292    /// If `key` is set, this is a promise that `logic` will produce no results on
293    /// records for which the key does not evaluate to the value. This is used to
294    /// leap directly to exactly those records.
295    ///
296    /// The `max_demand` parameter limits the number of columns decoded from the
297    /// input. Only the first `max_demand` columns are decoded. Pass `usize::MAX` to
298    /// decode all columns.
299    pub fn flat_map<D, DCB, L>(
300        &self,
301        key: Option<&Row>,
302        max_demand: usize,
303        logic: L,
304    ) -> (
305        Stream<'scope, T, DCB::Container>,
306        VecCollection<'scope, T, DataflowErrorSer, Diff>,
307    )
308    where
309        D: Data,
310        DCB: ContainerBuilder + PushInto<(D, T, Diff)>,
311        L: for<'a, 'b> FnMut(
312                &'a mut DatumVecBorrow<'b>,
313                T,
314                Diff,
315                &mut Session<T, DCB>,
316                &mut Session<T, ECB<T>>,
317            ) -> usize
318            + 'static,
319    {
320        // `logic` is passed straight through to `flat_map_core_fallible`, which owns the per-row
321        // decode (and the activation-scoped arena it decodes into).
322        match &self {
323            ArrangementFlavor::Local(oks, errs) => {
324                let (oks, mfp_errs) = CollectionBundle::<T>::flat_map_core_fallible::<_, _, DCB, _>(
325                    oks.clone(),
326                    key,
327                    max_demand,
328                    logic,
329                    REFUEL,
330                );
331                let errs = errs.clone().as_collection(|k, &()| k.clone());
332                let errs = errs.concat(mfp_errs.as_collection());
333                (oks, errs)
334            }
335            ArrangementFlavor::Trace(_, oks, errs) => {
336                let (oks, mfp_errs) = CollectionBundle::<T>::flat_map_core_fallible::<_, _, DCB, _>(
337                    oks.clone(),
338                    key,
339                    max_demand,
340                    logic,
341                    REFUEL,
342                );
343                let errs = errs.clone().as_collection(|k, &()| k.clone());
344                let errs = errs.concat(mfp_errs.as_collection());
345                (oks, errs)
346            }
347        }
348    }
349
350    /// Ok-only variant of [`Self::flat_map`]. The `logic` callback receives a single output
351    /// session, cannot produce errors, and returns the number of records produced (see
352    /// [`Self::flat_map`] for fuel semantics). The returned err collection comes solely from
353    /// the arrangement; no extra operator is built to carry an empty MFP-error stream.
354    pub fn flat_map_ok<D, DCB, L>(
355        &self,
356        key: Option<&Row>,
357        max_demand: usize,
358        logic: L,
359    ) -> (
360        Stream<'scope, T, DCB::Container>,
361        VecCollection<'scope, T, DataflowErrorSer, Diff>,
362    )
363    where
364        D: Data,
365        DCB: ContainerBuilder + PushInto<(D, T, Diff)>,
366        L: for<'a, 'b> FnMut(&'a mut DatumVecBorrow<'b>, T, Diff, &mut Session<T, DCB>) -> usize
367            + 'static,
368    {
369        match &self {
370            ArrangementFlavor::Local(oks, errs) => {
371                let oks = CollectionBundle::<T>::flat_map_core_ok::<_, _, DCB, _>(
372                    oks.clone(),
373                    key,
374                    max_demand,
375                    logic,
376                    REFUEL,
377                );
378                let errs = errs.clone().as_collection(|k, &()| k.clone());
379                (oks, errs)
380            }
381            ArrangementFlavor::Trace(_, oks, errs) => {
382                let oks = CollectionBundle::<T>::flat_map_core_ok::<_, _, DCB, _>(
383                    oks.clone(),
384                    key,
385                    max_demand,
386                    logic,
387                    REFUEL,
388                );
389                let errs = errs.clone().as_collection(|k, &()| k.clone());
390                (oks, errs)
391            }
392        }
393    }
394}
395impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> {
396    /// The scope containing the collection bundle.
397    pub fn scope(&self) -> Scope<'scope, T> {
398        match self {
399            ArrangementFlavor::Local(oks, _errs) => oks.stream.scope(),
400            ArrangementFlavor::Trace(_gid, oks, _errs) => oks.stream.scope(),
401        }
402    }
403
404    /// Brings the arrangement flavor into a region.
405    pub fn enter_region<'a>(&self, region: Scope<'a, T>) -> ArrangementFlavor<'a, T> {
406        match self {
407            ArrangementFlavor::Local(oks, errs) => ArrangementFlavor::Local(
408                oks.clone().enter_region(region),
409                errs.clone().enter_region(region),
410            ),
411            ArrangementFlavor::Trace(gid, oks, errs) => ArrangementFlavor::Trace(
412                *gid,
413                oks.clone().enter_region(region),
414                errs.clone().enter_region(region),
415            ),
416        }
417    }
418}
419impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> {
420    /// Extracts the arrangement flavor from a region.
421    pub fn leave_region<'outer>(&self, outer: Scope<'outer, T>) -> ArrangementFlavor<'outer, T> {
422        match self {
423            ArrangementFlavor::Local(oks, errs) => ArrangementFlavor::Local(
424                oks.clone().leave_region(outer),
425                errs.clone().leave_region(outer),
426            ),
427            ArrangementFlavor::Trace(gid, oks, errs) => ArrangementFlavor::Trace(
428                *gid,
429                oks.clone().leave_region(outer),
430                errs.clone().leave_region(outer),
431            ),
432        }
433    }
434}
435
436/// A bundle of the various ways a collection can be represented.
437///
438/// This type maintains the invariant that it does contain at least one valid
439/// source of data, either a collection or at least one arrangement.
440#[derive(Clone)]
441pub struct CollectionBundle<'scope, T: RenderTimestamp> {
442    pub collection: Option<(
443        VecCollection<'scope, T, Row, Diff>,
444        VecCollection<'scope, T, DataflowErrorSer, Diff>,
445    )>,
446    pub arranged: BTreeMap<Vec<MirScalarExpr>, ArrangementFlavor<'scope, T>>,
447}
448
449impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
450    /// Construct a new collection bundle from update streams.
451    pub fn from_collections(
452        oks: VecCollection<'scope, T, Row, Diff>,
453        errs: VecCollection<'scope, T, DataflowErrorSer, Diff>,
454    ) -> Self {
455        Self {
456            collection: Some((oks, errs)),
457            arranged: BTreeMap::default(),
458        }
459    }
460
461    /// Inserts arrangements by the expressions on which they are keyed.
462    pub fn from_expressions(
463        exprs: Vec<MirScalarExpr>,
464        arrangements: ArrangementFlavor<'scope, T>,
465    ) -> Self {
466        let mut arranged = BTreeMap::new();
467        arranged.insert(exprs, arrangements);
468        Self {
469            collection: None,
470            arranged,
471        }
472    }
473
474    /// Inserts arrangements by the columns on which they are keyed.
475    pub fn from_columns<I: IntoIterator<Item = usize>>(
476        columns: I,
477        arrangements: ArrangementFlavor<'scope, T>,
478    ) -> Self {
479        let mut keys = Vec::new();
480        for column in columns {
481            keys.push(MirScalarExpr::column(column));
482        }
483        Self::from_expressions(keys, arrangements)
484    }
485
486    /// The scope containing the collection bundle.
487    pub fn scope(&self) -> Scope<'scope, T> {
488        if let Some((oks, _errs)) = &self.collection {
489            oks.inner.scope()
490        } else {
491            self.arranged
492                .values()
493                .next()
494                .expect("Must contain a valid collection")
495                .scope()
496        }
497    }
498
499    /// Brings the collection bundle into a region.
500    pub fn enter_region<'inner>(&self, region: Scope<'inner, T>) -> CollectionBundle<'inner, T> {
501        CollectionBundle {
502            collection: self.collection.as_ref().map(|(oks, errs)| {
503                (
504                    oks.clone().enter_region(region),
505                    errs.clone().enter_region(region),
506                )
507            }),
508            arranged: self
509                .arranged
510                .iter()
511                .map(|(key, bundle)| (key.clone(), bundle.enter_region(region)))
512                .collect(),
513        }
514    }
515}
516
517impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
518    /// Extracts the collection bundle from a region.
519    pub fn leave_region<'outer>(&self, outer: Scope<'outer, T>) -> CollectionBundle<'outer, T> {
520        CollectionBundle {
521            collection: self.collection.as_ref().map(|(oks, errs)| {
522                (
523                    oks.clone().leave_region(outer),
524                    errs.clone().leave_region(outer),
525                )
526            }),
527            arranged: self
528                .arranged
529                .iter()
530                .map(|(key, bundle)| (key.clone(), bundle.leave_region(outer)))
531                .collect(),
532        }
533    }
534}
535
536impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
537    /// Asserts that the arrangement for a specific key
538    /// (or the raw collection for no key) exists,
539    /// and returns the corresponding collection.
540    ///
541    /// This returns the collection as-is, without
542    /// doing any unthinning transformation.
543    /// Therefore, it should be used when the appropriate transformation
544    /// was planned as part of a following MFP.
545    ///
546    /// If `key` is specified, the function converts the arrangement to a collection. It uses either
547    /// the fueled `flat_map` or `as_collection` method, depending on the flag
548    /// [`ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION`].
549    pub fn as_specific_collection(
550        &self,
551        key: Option<&[MirScalarExpr]>,
552        config_set: &ConfigSet,
553    ) -> (
554        VecCollection<'scope, T, Row, Diff>,
555        VecCollection<'scope, T, DataflowErrorSer, Diff>,
556    ) {
557        // Any operator that uses this method was told to use a particular
558        // collection during LIR planning, where we should have made
559        // sure that that collection exists.
560        //
561        // If it doesn't, we panic.
562        match key {
563            None => self
564                .collection
565                .clone()
566                .expect("The unarranged collection doesn't exist."),
567            Some(key) => {
568                let arranged = self.arranged.get(key).unwrap_or_else(|| {
569                    panic!("The collection arranged by {:?} doesn't exist.", key)
570                });
571                if ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION.get(config_set) {
572                    // Decode all columns, pass max_demand as usize::MAX. Output is 1:1 from the
573                    // cursor (no duplicates), so a non-consolidating container builder is the
574                    // right choice.
575                    let (ok, err) = arranged
576                        .flat_map_ok::<_, CapacityContainerBuilder<Vec<(Row, T, Diff)>>, _>(
577                            None,
578                            usize::MAX,
579                            |borrow, t, r, ok_session| {
580                                ok_session.give((SharedRow::pack(borrow.iter()), t, r));
581                                1
582                            },
583                        );
584                    (ok.as_collection(), err)
585                } else {
586                    #[allow(deprecated)]
587                    arranged.as_collection()
588                }
589            }
590        }
591    }
592
593    /// Constructs and applies logic to elements of a collection and returns the results.
594    ///
595    /// The function applies `logic` on elements. The logic conceptually receives
596    /// `(&Row, &Row)` pairs in the form of a datum vec in the expected order.
597    ///
598    /// If `key_val` is set, this is a promise that `logic` will produce no results on
599    /// records for which the key does not evaluate to the value. This is used when we
600    /// have an arrangement by that key to leap directly to exactly those records.
601    /// It is important that `logic` still guard against data that does not satisfy
602    /// this constraint, as this method does not statically know that it will have
603    /// that arrangement.
604    ///
605    /// The `max_demand` parameter limits the number of columns decoded from the
606    /// input. Only the first `max_demand` columns are decoded. Pass `usize::MAX` to
607    /// decode all columns.
608    pub fn flat_map<D, DCB, L>(
609        &self,
610        key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
611        max_demand: usize,
612        mut logic: L,
613    ) -> (
614        Stream<'scope, T, DCB::Container>,
615        VecCollection<'scope, T, DataflowErrorSer, Diff>,
616    )
617    where
618        D: Data,
619        DCB: ContainerBuilder + PushInto<(D, T, Diff)>,
620        L: for<'a> FnMut(
621                &'a mut DatumVecBorrow<'_>,
622                T,
623                Diff,
624                &mut Session<T, DCB>,
625                &mut Session<T, ECB<T>>,
626            ) -> usize
627            + 'static,
628    {
629        // If `key_val` is set, we should have to use the corresponding arrangement.
630        // If there isn't one, that implies an error in the contract between
631        // key-production and available arrangements.
632        if let Some((key, val)) = key_val {
633            self.arrangement(&key)
634                .expect("Should have ensured during planning that this arrangement exists.")
635                .flat_map::<_, DCB, _>(val.as_ref(), max_demand, logic)
636        } else {
637            let (oks, errs) = self
638                .collection
639                .clone()
640                .expect("Invariant violated: CollectionBundle contains no collection.");
641            let scope = oks.inner.scope();
642            let mut builder = OperatorBuilder::new("CollectionFlatMap".to_string(), scope);
643            let (ok_output, ok_stream) = builder.new_output();
644            let mut ok_output = OutputBuilder::<_, DCB>::from(ok_output);
645            let (err_output, err_stream) = builder.new_output();
646            let mut err_output = OutputBuilder::<_, ECB<T>>::from(err_output);
647            let mut input = builder.new_input(oks.inner, Pipeline);
648            builder.build(move |_capabilities| {
649                let mut datums = DatumVec::new();
650                move |_frontiers| {
651                    let mut ok_output = ok_output.activate();
652                    let mut err_output = err_output.activate();
653                    input.for_each(|time, data| {
654                        // Retain the input capability to derive a `Capability` for each output;
655                        // the `Session` type alias is fixed to `Capability<T>`.
656                        let ok_cap = time.retain(0);
657                        let err_cap = time.retain(1);
658                        let mut ok_session = ok_output.session_with_builder(&ok_cap);
659                        let mut err_session = err_output.session_with_builder(&err_cap);
660                        for (v, t, d) in data.drain(..) {
661                            logic(
662                                &mut datums.borrow_with_limit(&v, max_demand),
663                                t,
664                                d,
665                                &mut ok_session,
666                                &mut err_session,
667                            );
668                        }
669                    });
670                }
671            });
672            let errs = errs.concat(err_stream.as_collection());
673            (ok_stream, errs)
674        }
675    }
676
677    /// Factored out common logic for using literal keys in general traces.
678    ///
679    /// This logic is sufficiently interesting that we want to write it only
680    /// once, and thereby avoid any skew in the two uses of the logic.
681    ///
682    /// The function presents the contents of the trace as `(key, value, time, delta)` tuples,
683    /// where key and value are potentially specialized, but convertible into rows. The `logic`
684    /// callback writes ok results into the first session and errors into the second, returning
685    /// the number of records produced. See [`ArrangementFlavor::flat_map`] for the fuel
686    /// rationale.
687    fn flat_map_core_fallible<Tr, D, DCB, L>(
688        trace: Arranged<'scope, Tr>,
689        key: Option<&<Tr::KeyContainer as BatchContainer>::Owned>,
690        max_demand: usize,
691        mut logic: L,
692        refuel: usize,
693    ) -> (
694        Stream<'scope, T, DCB::Container>,
695        Stream<'scope, T, Vec<(DataflowErrorSer, T, Diff)>>,
696    )
697    where
698        Tr: for<'a> TraceReader<
699                Key<'a>: ExtendDatums,
700                Val<'a>: ExtendDatums,
701                Time = T,
702                Diff = mz_repr::Diff,
703            > + Clone
704            + 'static,
705        <Tr::KeyContainer as BatchContainer>::Owned: PartialEq,
706        D: Data,
707        DCB: ContainerBuilder + PushInto<(D, T, Diff)>,
708        // `logic` receives the key and value already decoded into a `DatumVecBorrow`. The decode
709        // (and its arena/`DatumVec`) lives in the per-activation closure below, so it is scoped to
710        // a single scheduling invocation rather than to the operator.
711        L: for<'a, 'b> FnMut(
712                &'a mut DatumVecBorrow<'b>,
713                T,
714                mz_repr::Diff,
715                &mut Session<T, DCB>,
716                &mut Session<T, ECB<T>>,
717            ) -> usize
718            + 'static,
719    {
720        let scope = trace.stream.scope();
721
722        let mut key_con = Tr::KeyContainer::with_capacity(1);
723        if let Some(key) = &key {
724            key_con.push_own(key);
725        }
726        let mode = if key.is_some() { "index" } else { "scan" };
727        let name = format!("ArrangementFlatMap({})", mode);
728
729        let mut builder = OperatorBuilder::new(name, scope.clone());
730        let (ok_output, ok_stream) = builder.new_output();
731        let mut ok_output = OutputBuilder::<_, DCB>::from(ok_output);
732        let (err_output, err_stream) = builder.new_output();
733        let mut err_output = OutputBuilder::<_, ECB<T>>::from(err_output);
734        let mut input = builder.new_input(trace.stream.clone(), Pipeline);
735        let operator_info = builder.operator_info();
736
737        builder.build(move |_capabilities| {
738            // Acquire an activator to reschedule the operator when it has unfinished work.
739            let activator = scope.activator_for(operator_info.address);
740            // Maintain a list of work to do, cursor to navigate and process.
741            let mut todo = std::collections::VecDeque::new();
742            move |_frontiers| {
743                let key = key_con.get(0);
744                let mut ok_output = ok_output.activate();
745                let mut err_output = err_output.activate();
746
747                // First, dequeue all batches.
748                input.for_each(|time, data| {
749                    // Retain a capability for each output, as the work may complete across
750                    // multiple activations.
751                    let ok_cap = time.retain(0);
752                    let err_cap = time.retain(1);
753                    for batch in data.iter() {
754                        todo.push_back(PendingWork::new(
755                            ok_cap.clone(),
756                            err_cap.clone(),
757                            batch.cursor(),
758                            batch.clone(),
759                        ));
760                    }
761                });
762
763                // Decode the key/value of each record into datums for `logic`. The arena and datum
764                // buffer are created here, so they are scoped to this activation (dropped when it
765                // returns) rather than retained for the operator's lifetime; both are reused across
766                // the records processed within the activation.
767                let mut temp_storage = RowArena::new();
768                let mut datums = DatumVec::new();
769                let mut decode_logic =
770                    |k: Tr::Key<'_>,
771                     v: Tr::Val<'_>,
772                     t: T,
773                     d: mz_repr::Diff,
774                     ok_session: &mut Session<T, DCB>,
775                     err_session: &mut Session<T, ECB<T>>| {
776                        temp_storage.clear();
777                        let mut datums_borrow = datums.borrow();
778                        k.extend_datums(&temp_storage, &mut datums_borrow, Some(max_demand));
779                        let remaining = max_demand.saturating_sub(datums_borrow.len());
780                        v.extend_datums(&temp_storage, &mut datums_borrow, Some(remaining));
781                        logic(&mut datums_borrow, t, d, ok_session, err_session)
782                    };
783
784                // Second, make progress on `todo`.
785                let mut fuel = refuel;
786                while !todo.is_empty() && fuel > 0 {
787                    todo.front_mut().unwrap().do_work(
788                        key.as_ref(),
789                        &mut decode_logic,
790                        &mut fuel,
791                        &mut ok_output,
792                        &mut err_output,
793                    );
794                    if fuel > 0 {
795                        todo.pop_front();
796                    }
797                }
798                // If we have not finished all work, re-activate the operator.
799                if !todo.is_empty() {
800                    activator.activate();
801                }
802            }
803        });
804
805        (ok_stream, err_stream)
806    }
807
808    /// Ok-only variant of [`Self::flat_map_core_fallible`]. The `logic` callback writes results
809    /// into a single output session and returns the number of records produced (see the
810    /// fallible variant for fuel semantics). Use this when the caller statically knows it
811    /// will never produce `DataflowErrorSer` records, to avoid building a second output port
812    /// and the empty err stream that would follow it.
813    fn flat_map_core_ok<Tr, D, DCB, L>(
814        trace: Arranged<'scope, Tr>,
815        key: Option<&<Tr::KeyContainer as BatchContainer>::Owned>,
816        max_demand: usize,
817        mut logic: L,
818        refuel: usize,
819    ) -> Stream<'scope, T, DCB::Container>
820    where
821        Tr: for<'a> TraceReader<
822                Key<'a>: ExtendDatums,
823                Val<'a>: ExtendDatums,
824                Time = T,
825                Diff = mz_repr::Diff,
826            > + Clone
827            + 'static,
828        <Tr::KeyContainer as BatchContainer>::Owned: PartialEq,
829        D: Data,
830        DCB: ContainerBuilder + PushInto<(D, T, Diff)>,
831        // See `flat_map_core_fallible`: `logic` takes already-decoded datums; the decode lives in
832        // the per-activation closure below.
833        L: for<'a, 'b> FnMut(
834                &'a mut DatumVecBorrow<'b>,
835                T,
836                mz_repr::Diff,
837                &mut Session<T, DCB>,
838            ) -> usize
839            + 'static,
840    {
841        let scope = trace.stream.scope();
842
843        let mut key_con = Tr::KeyContainer::with_capacity(1);
844        if let Some(key) = &key {
845            key_con.push_own(key);
846        }
847        let mode = if key.is_some() { "index" } else { "scan" };
848        let name = format!("ArrangementFlatMapOk({})", mode);
849
850        let mut builder = OperatorBuilder::new(name, scope.clone());
851        let (ok_output, ok_stream) = builder.new_output();
852        let mut ok_output = OutputBuilder::<_, DCB>::from(ok_output);
853        let mut input = builder.new_input(trace.stream.clone(), Pipeline);
854        let operator_info = builder.operator_info();
855
856        builder.build(move |_capabilities| {
857            let activator = scope.activator_for(operator_info.address);
858            let mut todo = std::collections::VecDeque::new();
859            move |_frontiers| {
860                let key = key_con.get(0);
861                let mut ok_output = ok_output.activate();
862
863                input.for_each(|time, data| {
864                    let cap = time.retain(0);
865                    for batch in data.iter() {
866                        todo.push_back(PendingWorkOk::new(
867                            cap.clone(),
868                            batch.cursor(),
869                            batch.clone(),
870                        ));
871                    }
872                });
873
874                // Activation-scoped decode storage; see `flat_map_core_fallible`.
875                let mut temp_storage = RowArena::new();
876                let mut datums = DatumVec::new();
877                let mut decode_logic =
878                    |k: Tr::Key<'_>,
879                     v: Tr::Val<'_>,
880                     t: T,
881                     d: mz_repr::Diff,
882                     ok_session: &mut Session<T, DCB>| {
883                        temp_storage.clear();
884                        let mut datums_borrow = datums.borrow();
885                        k.extend_datums(&temp_storage, &mut datums_borrow, Some(max_demand));
886                        let remaining = max_demand.saturating_sub(datums_borrow.len());
887                        v.extend_datums(&temp_storage, &mut datums_borrow, Some(remaining));
888                        logic(&mut datums_borrow, t, d, ok_session)
889                    };
890
891                let mut fuel = refuel;
892                while !todo.is_empty() && fuel > 0 {
893                    todo.front_mut().unwrap().do_work(
894                        key.as_ref(),
895                        &mut decode_logic,
896                        &mut fuel,
897                        &mut ok_output,
898                    );
899                    if fuel > 0 {
900                        todo.pop_front();
901                    }
902                }
903                if !todo.is_empty() {
904                    activator.activate();
905                }
906            }
907        });
908
909        ok_stream
910    }
911
912    /// Look up an arrangement by the expressions that form the key.
913    ///
914    /// The result may be `None` if no such arrangement exists, or it may be one of many
915    /// "arrangement flavors" that represent the types of arranged data we might have.
916    pub fn arrangement(&self, key: &[MirScalarExpr]) -> Option<ArrangementFlavor<'scope, T>> {
917        self.arranged.get(key).map(|x| x.clone())
918    }
919}
920
921impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
922    /// Presents `self` as a stream of updates, having been subjected to `mfp`.
923    ///
924    /// This operator is able to apply the logic of `mfp` early, which can substantially
925    /// reduce the amount of data produced when `mfp` is non-trivial.
926    ///
927    /// The `key_val` argument, when present, indicates that a specific arrangement should
928    /// be used, and if, in addition, the `val` component is present,
929    /// that we can seek to the supplied row.
930    pub fn as_collection_core(
931        &self,
932        mut mfp: MapFilterProject,
933        key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
934        until: Antichain<mz_repr::Timestamp>,
935        config_set: &ConfigSet,
936    ) -> (
937        VecCollection<'scope, T, mz_repr::Row, Diff>,
938        VecCollection<'scope, T, DataflowErrorSer, Diff>,
939    ) {
940        mfp.optimize();
941        let mfp_plan = mfp.clone().into_plan().unwrap();
942
943        // If the MFP is trivial, we can just call `as_collection`.
944        // In the case that we weren't going to apply the `key_val` optimization,
945        // this path results in a slightly smaller and faster
946        // dataflow graph, and is intended to fix
947        // https://github.com/MaterializeInc/database-issues/issues/3111
948        let has_key_val = if let Some((_key, Some(_val))) = &key_val {
949            true
950        } else {
951            false
952        };
953
954        if mfp_plan.is_identity() && !has_key_val {
955            let key = key_val.map(|(k, _v)| k);
956            return self.as_specific_collection(key.as_deref(), config_set);
957        }
958
959        let max_demand = mfp.demand().last().map(|x| *x + 1).unwrap_or(0);
960        mfp.permute_fn(|c| c, max_demand);
961        mfp.optimize();
962        let mfp_plan = mfp.into_plan().unwrap();
963
964        let mut datum_vec = DatumVec::new();
965        // Wrap in an `Rc` so that lifetimes work out.
966        let until = std::rc::Rc::new(until);
967
968        let (stream, errors) = self
969            .flat_map::<_, ConsolidatingContainerBuilder<Vec<(Row, T, Diff)>>, _>(
970                key_val,
971                max_demand,
972                move |row_datums, time, diff, ok_session, err_session| {
973                    let mut row_builder = SharedRow::get();
974                    let until = std::rc::Rc::clone(&until);
975                    let temp_storage = RowArena::new();
976                    let row_iter = row_datums.iter();
977                    let mut datums_local = datum_vec.borrow();
978                    datums_local.extend(row_iter);
979                    let event_time = time.event_time();
980                    let mut work: usize = 0;
981                    for result in mfp_plan.evaluate(
982                        &mut datums_local,
983                        &temp_storage,
984                        event_time,
985                        diff.clone(),
986                        move |time| !until.less_equal(time),
987                        &mut row_builder,
988                    ) {
989                        work += 1;
990                        match result {
991                            Ok((row, event_time, diff)) => {
992                                // Copy the whole time, and re-populate event time.
993                                let mut time: T = time.clone();
994                                *time.event_time_mut() = event_time;
995                                ok_session.give((row, time, diff));
996                            }
997                            Err((e, event_time, diff)) => {
998                                // Copy the whole time, and re-populate event time.
999                                let mut time: T = time.clone();
1000                                *time.event_time_mut() = event_time;
1001                                err_session.give((e, time, diff));
1002                            }
1003                        }
1004                    }
1005                    work
1006                },
1007            );
1008
1009        (stream.as_collection(), errors)
1010    }
1011    pub fn ensure_collections(
1012        mut self,
1013        collections: AvailableCollections,
1014        input_key: Option<Vec<MirScalarExpr>>,
1015        input_mfp: MapFilterProject,
1016        as_of: Antichain<mz_repr::Timestamp>,
1017        until: Antichain<mz_repr::Timestamp>,
1018        config_set: &ConfigSet,
1019        strategy: ArrangementStrategy,
1020    ) -> Self
1021    where
1022        T: MaybeBucketByTime,
1023    {
1024        if collections == Default::default() {
1025            return self;
1026        }
1027        // Cache collection to avoid reforming it each time.
1028        //
1029        // TODO(mcsherry): In theory this could be faster run out of another arrangement,
1030        // as the `map_fallible` that follows could be run against an arrangement itself.
1031        //
1032        // Note(btv): If we ever do that, we would then only need to make the raw collection here
1033        // if `collections.raw` is true.
1034
1035        for (key, _, _) in collections.arranged.iter() {
1036            soft_assert_or_log!(
1037                !self.arranged.contains_key(key),
1038                "LIR ArrangeBy tried to create an existing arrangement"
1039            );
1040        }
1041
1042        // Track whether we already applied temporal bucketing in this call, to
1043        // avoid bucketing the same updates twice.
1044        let mut bucketed = false;
1045
1046        // True iff at least one new arrangement will actually be built below. Bucketing only
1047        // pays off when something downstream merges/compacts the future-stamped updates; on a
1048        // pure raw collection (no new arrangement) the work is wasted.
1049        let will_create_arrangement = collections
1050            .arranged
1051            .iter()
1052            .any(|(key, _, _)| !self.arranged.contains_key(key));
1053
1054        // We need the collection if either (1) it is explicitly demanded, or (2) we are going to render any arrangement
1055        let form_raw_collection = collections.raw || will_create_arrangement;
1056        if form_raw_collection && self.collection.is_none() {
1057            let (oks, errs) =
1058                self.as_collection_core(input_mfp, input_key.map(|k| (k, None)), until, config_set);
1059            // Apply temporal bucketing when the lowering selected `TemporalBucketing` and
1060            // we will build at least one arrangement. This path fires when the collection
1061            // must be formed from scratch (e.g., from an arrangement via as_collection_core).
1062            let effective_strategy = if will_create_arrangement {
1063                strategy
1064            } else {
1065                ArrangementStrategy::Direct
1066            };
1067            let oks = if matches!(effective_strategy, ArrangementStrategy::TemporalBucketing)
1068                && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(config_set)
1069            {
1070                let summary: mz_repr::Timestamp = TEMPORAL_BUCKETING_SUMMARY
1071                    .get(config_set)
1072                    .try_into()
1073                    .expect("must fit");
1074                bucketed = true;
1075                T::maybe_apply_temporal_bucketing(oks.inner, as_of.clone(), summary)
1076            } else {
1077                oks
1078            };
1079            self.collection = Some((oks, errs));
1080        }
1081        for (key, _, thinning) in collections.arranged {
1082            if !self.arranged.contains_key(&key) {
1083                // TODO: Consider allowing more expressive names.
1084                let name = format!("ArrangeBy[{:?}]", key);
1085
1086                let (oks, errs) = self
1087                    .collection
1088                    .take()
1089                    .expect("Collection constructed above");
1090                // Apply temporal bucketing if the collection already existed on
1091                // the bundle (e.g., from an upstream temporal Mfp or Get) and we
1092                // haven't bucketed yet. This is the common path for temporal-MFP
1093                // → ArrangeBy flows.
1094                let effective_strategy = if bucketed {
1095                    ArrangementStrategy::Direct
1096                } else {
1097                    strategy
1098                };
1099                let oks = if matches!(effective_strategy, ArrangementStrategy::TemporalBucketing)
1100                    && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(config_set)
1101                {
1102                    let summary: mz_repr::Timestamp = TEMPORAL_BUCKETING_SUMMARY
1103                        .get(config_set)
1104                        .try_into()
1105                        .expect("must fit");
1106                    bucketed = true;
1107                    T::maybe_apply_temporal_bucketing(oks.inner, as_of.clone(), summary)
1108                } else {
1109                    oks
1110                };
1111                let use_paged_path = ENABLE_COLUMN_PAGED_BATCHER.get(config_set);
1112                let (oks, errs_keyed, passthrough) = Self::arrange_collection(
1113                    &name,
1114                    oks,
1115                    key.clone(),
1116                    thinning.clone(),
1117                    use_paged_path,
1118                );
1119                let errs_concat: KeyCollection<_, _, _> = errs.clone().concat(errs_keyed).into();
1120                self.collection = Some((passthrough, errs));
1121                let errs =
1122                    errs_concat.mz_arrange::<
1123                        ColumnationChunker<_>,
1124                        ErrBatcher<_, _>,
1125                        ErrBuilder<_, _>,
1126                        ErrSpine<_, _>,
1127                    >(
1128                        &format!("{}-errors", name),
1129                    );
1130                self.arranged
1131                    .insert(key, ArrangementFlavor::Local(oks, errs));
1132            }
1133        }
1134        self
1135    }
1136
1137    /// Builds an arrangement from a collection, using the specified key and value thinning.
1138    ///
1139    /// The arrangement's key is based on the `key` expressions, and the value the input with
1140    /// the `thinning` applied to it. It selects which of the input columns are included in the
1141    /// value of the arrangement. The thinning is in support of permuting arrangements such that
1142    /// columns in the key are not included in the value.
1143    ///
1144    /// In addition to the ok and err streams, we produce a passthrough stream that forwards
1145    /// the input as-is, which allows downstream consumers to reuse the collection without
1146    /// teeing the stream.
1147    fn arrange_collection(
1148        name: &String,
1149        oks: VecCollection<'scope, T, Row, Diff>,
1150        key: Vec<MirScalarExpr>,
1151        thinning: Vec<usize>,
1152        use_paged_path: bool,
1153    ) -> (
1154        Arranged<'scope, RowRowAgent<T, Diff>>,
1155        VecCollection<'scope, T, DataflowErrorSer, Diff>,
1156        VecCollection<'scope, T, Row, Diff>,
1157    ) {
1158        // This operator implements a `map_fallible`, but produces columnar updates for the ok
1159        // stream. The `map_fallible` cannot be used here because the closure cannot return
1160        // references, which is what we need to push into columnar streams. Instead, we use a
1161        // bespoke operator that also optimizes reuse of allocations across individual updates.
1162        let mut builder = OperatorBuilder::new("FormArrangementKey".to_string(), oks.inner.scope());
1163        let (ok_output, ok_stream) = builder.new_output();
1164        let mut ok_output =
1165            OutputBuilder::<_, ColumnBuilder<((Row, Row), T, Diff)>>::from(ok_output);
1166        let (err_output, err_stream) = builder.new_output();
1167        let mut err_output = OutputBuilder::from(err_output);
1168        let (passthrough_output, passthrough_stream) = builder.new_output();
1169        let mut passthrough_output = OutputBuilder::from(passthrough_output);
1170        let mut input = builder.new_input(oks.inner, Pipeline);
1171        builder.set_notify_for(0, FrontierInterest::Never);
1172        builder.build(move |_capabilities| {
1173            let mut key_buf = Row::default();
1174            let mut val_buf = Row::default();
1175            let mut datums = DatumVec::new();
1176            move |_frontiers| {
1177                // Scoped to the activation so the arena's retained capacity does not outlive a
1178                // single scheduling invocation; cleared per row to reuse it within the batch.
1179                let mut temp_storage = RowArena::new();
1180                let mut ok_output = ok_output.activate();
1181                let mut err_output = err_output.activate();
1182                let mut passthrough_output = passthrough_output.activate();
1183                input.for_each(|time, data| {
1184                    let mut ok_session = ok_output.session_with_builder(&time);
1185                    let mut err_session = err_output.session(&time);
1186                    for (row, time, diff) in data.iter() {
1187                        temp_storage.clear();
1188                        let datums = datums.borrow_with(row);
1189                        let key_iter = key.iter().map(|k| k.eval(&datums, &temp_storage));
1190                        match key_buf.packer().try_extend(key_iter) {
1191                            Ok(()) => {
1192                                let val_datum_iter = thinning.iter().map(|c| datums[*c]);
1193                                val_buf.packer().extend(val_datum_iter);
1194                                ok_session.give(((&*key_buf, &*val_buf), time, diff));
1195                            }
1196                            Err(e) => {
1197                                err_session.give((e.into(), time.clone(), *diff));
1198                            }
1199                        }
1200                    }
1201                    passthrough_output.session(&time).give_container(data);
1202                });
1203            }
1204        });
1205
1206        let exchange =
1207            ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<Row, Row, T, Diff>);
1208        let oks = if use_paged_path {
1209            ok_stream.mz_arrange_core::<
1210                _,
1211                batcher::ColumnChunker<_>,
1212                Col2ValPagedBatcher<_, _, _, _>,
1213                RowRowColPagedBuilder<_, _>,
1214                RowRowSpine<_, _>,
1215            >(exchange, name)
1216        } else {
1217            ok_stream.mz_arrange_core::<
1218                _,
1219                batcher::Chunker<_>,
1220                Col2ValBatcher<_, _, _, _>,
1221                RowRowBuilder<_, _>,
1222                RowRowSpine<_, _>,
1223            >(exchange, name)
1224        };
1225        (
1226            oks,
1227            err_stream.as_collection(),
1228            passthrough_stream.as_collection(),
1229        )
1230    }
1231}
1232
1233/// Type alias for a timely output `Session` whose capability is a `Capability<T>`. The container
1234/// builder `CB` is left to the caller; sessions can therefore drive consolidating, capacity, or
1235/// (in the future) columnar output builders without changing call sites.
1236type Session<'a, 'b, T, CB> =
1237    timely::dataflow::operators::generic::Session<'a, 'b, T, CB, Capability<T>>;
1238
1239/// Container builder used for the err output of every flat_map variant. Pre-refactor the
1240/// merged Ok/Err stream flowed through a [`ConsolidatingContainerBuilder`] before the
1241/// `map_fallible` demux split it; we preserve that consolidation here so errors with the
1242/// same `(error, time)` cancel within a batch rather than propagating to downstream.
1243type ECB<T> = ConsolidatingContainerBuilder<Vec<(DataflowErrorSer, T, Diff)>>;
1244
1245/// Number of output records the arrangement flat_map operators may produce before yielding.
1246/// See [`ArrangementFlavor::flat_map`] for the fuel rationale; the constant is a pragmatic
1247/// compromise and not tuned empirically.
1248const REFUEL: usize = 1_000_000;
1249
1250struct PendingWork<C>
1251where
1252    C: Cursor,
1253{
1254    /// Capability for the `ok` output (output port 0).
1255    ok_capability: Capability<C::Time>,
1256    /// Capability for the `err` output (output port 1).
1257    err_capability: Capability<C::Time>,
1258    cursor: C,
1259    batch: C::Storage,
1260}
1261
1262impl<C> PendingWork<C>
1263where
1264    C: Cursor<KeyContainer: BatchContainer<Owned: PartialEq + Sized>>,
1265{
1266    /// Create a new bundle of pending work, from a pair of capabilities (one per output),
1267    /// a cursor, and backing storage.
1268    fn new(
1269        ok_capability: Capability<C::Time>,
1270        err_capability: Capability<C::Time>,
1271        cursor: C,
1272        batch: C::Storage,
1273    ) -> Self {
1274        Self {
1275            ok_capability,
1276            err_capability,
1277            cursor,
1278            batch,
1279        }
1280    }
1281    /// Perform roughly `fuel` work through the cursor, applying `logic` and sending results to
1282    /// the two output sessions.
1283    fn do_work<D, DCB, L>(
1284        &mut self,
1285        key: Option<&C::Key<'_>>,
1286        logic: &mut L,
1287        fuel: &mut usize,
1288        ok_output: &mut OutputBuilderSession<'_, C::Time, DCB>,
1289        err_output: &mut OutputBuilderSession<'_, C::Time, ECB<C::Time>>,
1290    ) where
1291        D: Data,
1292        DCB: ContainerBuilder + PushInto<(D, C::Time, C::Diff)>,
1293        L: FnMut(
1294            C::Key<'_>,
1295            C::Val<'_>,
1296            C::Time,
1297            C::Diff,
1298            &mut Session<C::Time, DCB>,
1299            &mut Session<C::Time, ECB<C::Time>>,
1300        ) -> usize,
1301    {
1302        let mut ok_session = ok_output.session_with_builder(&self.ok_capability);
1303        let mut err_session = err_output.session_with_builder(&self.err_capability);
1304        walk_cursor(&mut self.cursor, &self.batch, key, fuel, |k, v, t, d| {
1305            logic(k, v, t, d, &mut ok_session, &mut err_session)
1306        });
1307    }
1308}
1309
1310/// Pending work for the Ok-only variant of `flat_map_core_fallible`. Holds a single capability since
1311/// the operator has only one output port.
1312struct PendingWorkOk<C>
1313where
1314    C: Cursor,
1315{
1316    capability: Capability<C::Time>,
1317    cursor: C,
1318    batch: C::Storage,
1319}
1320
1321impl<C> PendingWorkOk<C>
1322where
1323    C: Cursor<KeyContainer: BatchContainer<Owned: PartialEq + Sized>>,
1324{
1325    fn new(capability: Capability<C::Time>, cursor: C, batch: C::Storage) -> Self {
1326        Self {
1327            capability,
1328            cursor,
1329            batch,
1330        }
1331    }
1332
1333    /// Perform roughly `fuel` work through the cursor, applying `logic` and sending results to
1334    /// the single output session.
1335    fn do_work<D, DCB, L>(
1336        &mut self,
1337        key: Option<&C::Key<'_>>,
1338        logic: &mut L,
1339        fuel: &mut usize,
1340        ok_output: &mut OutputBuilderSession<'_, C::Time, DCB>,
1341    ) where
1342        D: Data,
1343        DCB: ContainerBuilder + PushInto<(D, C::Time, C::Diff)>,
1344        L: FnMut(C::Key<'_>, C::Val<'_>, C::Time, C::Diff, &mut Session<C::Time, DCB>) -> usize,
1345    {
1346        let mut ok_session = ok_output.session_with_builder(&self.capability);
1347        walk_cursor(&mut self.cursor, &self.batch, key, fuel, |k, v, t, d| {
1348            logic(k, v, t, d, &mut ok_session)
1349        });
1350    }
1351}
1352
1353/// Walk a cursor, calling `emit` for each consolidated `(key, val, time, diff)` tuple. If
1354/// `key` is set, the cursor is seeked to it and only values for that key are produced.
1355///
1356/// `emit` returns the number of records it produced for the given input tuple. The cursor
1357/// stops as soon as the accumulated emit count reaches `*fuel`, leaving the cursor in place
1358/// so work can resume on a later call. Within a batch, both the inner val loop and the
1359/// outer key loop are bounded only by emit count, so selective filters (`emit` returns 0)
1360/// run to batch completion in a single activation — see [`ArrangementFlavor::flat_map`]
1361/// for why fuel counts output rather than input.
1362fn walk_cursor<C, F>(
1363    cursor: &mut C,
1364    batch: &C::Storage,
1365    key: Option<&C::Key<'_>>,
1366    fuel: &mut usize,
1367    mut emit: F,
1368) where
1369    C: Cursor<KeyContainer: BatchContainer<Owned: PartialEq + Sized>>,
1370    F: FnMut(C::Key<'_>, C::Val<'_>, C::Time, C::Diff) -> usize,
1371{
1372    use differential_dataflow::consolidation::consolidate;
1373
1374    let mut work: usize = 0;
1375    let mut buffer = Vec::new();
1376    if let Some(key) = key {
1377        let key = C::KeyContainer::reborrow(*key);
1378        if cursor.get_key(batch).map(|k| k == key) != Some(true) {
1379            cursor.seek_key(batch, key);
1380        }
1381        if cursor.get_key(batch).map(|k| k == key) == Some(true) {
1382            let key = cursor.key(batch);
1383            while let Some(val) = cursor.get_val(batch) {
1384                cursor.map_times(batch, |time, diff| {
1385                    buffer.push((C::owned_time(time), C::owned_diff(diff)));
1386                });
1387                consolidate(&mut buffer);
1388                for (time, diff) in buffer.drain(..) {
1389                    work += emit(key, val, time, diff);
1390                }
1391                cursor.step_val(batch);
1392                if work >= *fuel {
1393                    *fuel = 0;
1394                    return;
1395                }
1396            }
1397        }
1398    } else {
1399        while let Some(key) = cursor.get_key(batch) {
1400            while let Some(val) = cursor.get_val(batch) {
1401                cursor.map_times(batch, |time, diff| {
1402                    buffer.push((C::owned_time(time), C::owned_diff(diff)));
1403                });
1404                consolidate(&mut buffer);
1405                for (time, diff) in buffer.drain(..) {
1406                    work += emit(key, val, time, diff);
1407                }
1408                cursor.step_val(batch);
1409                if work >= *fuel {
1410                    *fuel = 0;
1411                    return;
1412                }
1413            }
1414            cursor.step_key(batch);
1415        }
1416    }
1417    *fuel -= work;
1418}