Skip to main content

mz_compute/render/
context.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Management of dataflow-local state, like arrangements, while building a
11//! dataflow.
12
13use std::collections::BTreeMap;
14use std::rc::Rc;
15
16use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
17use differential_dataflow::operators::arrange::Arranged;
18use differential_dataflow::trace::implementations::BatchContainer;
19use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
20use differential_dataflow::{AsCollection, Data, VecCollection};
21use mz_compute_types::dataflows::DataflowDescription;
22use mz_compute_types::dyncfgs::{
23    ENABLE_COLUMN_PAGED_BATCHER, ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION,
24    ENABLE_COMPUTE_TEMPORAL_BUCKETING, TEMPORAL_BUCKETING_SUMMARY,
25};
26use mz_compute_types::plan::{ArrangementStrategy, AvailableCollections};
27use mz_dyncfg::ConfigSet;
28use mz_expr::{Eval, Id, MapFilterProject, MirScalarExpr};
29use mz_ore::soft_assert_or_log;
30use mz_repr::fixed_length::ToDatumIter;
31use mz_repr::{DatumVec, DatumVecBorrow, Diff, GlobalId, Row, RowArena, SharedRow};
32use mz_storage_types::controller::CollectionMetadata;
33use mz_timely_util::columnar::batcher;
34use mz_timely_util::columnar::builder::ColumnBuilder;
35use mz_timely_util::columnar::{Col2ValBatcher, Col2ValPagedBatcher, columnar_exchange};
36use mz_timely_util::columnation::ColumnationChunker;
37use timely::ContainerBuilder;
38use timely::container::{CapacityContainerBuilder, PushInto};
39use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
40use timely::dataflow::operators::Capability;
41use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
42use timely::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession};
43use timely::dataflow::{Scope, Stream};
44use timely::progress::operate::FrontierInterest;
45use timely::progress::{Antichain, Timestamp};
46
47use crate::compute_state::ComputeState;
48use crate::extensions::arrange::{KeyCollection, MzArrange, MzArrangeCore};
49use crate::render::errors::{DataflowErrorSer, ErrorLogger};
50use crate::render::{LinearJoinSpec, MaybeBucketByTime, RenderTimestamp};
51use crate::typedefs::{
52    ErrAgent, ErrBatcher, ErrBuilder, ErrEnter, ErrSpine, RowRowAgent, RowRowEnter, RowRowSpine,
53};
54use mz_row_spine::{DatumSeq, RowRowBuilder, RowRowColPagedBuilder};
55
56/// Dataflow-local collections and arrangements.
57///
58/// A context means to wrap available data assets and present them in an easy-to-use manner.
59/// These assets include dataflow-local collections and arrangements, as well as imported
60/// arrangements from outside the dataflow.
61///
62/// Context has a timestamp type `T`, which is the timestamp used by the scope in question.
63pub struct Context<'scope, T: RenderTimestamp> {
64    /// The scope within which all managed collections exist.
65    ///
66    /// It is an error to add any collections not contained in this scope.
67    pub(crate) scope: Scope<'scope, T>,
68    /// The debug name of the dataflow associated with this context.
69    pub debug_name: String,
70    /// The Timely ID of the dataflow associated with this context.
71    pub dataflow_id: usize,
72    /// The collection IDs of exports of the dataflow associated with this context.
73    pub export_ids: Vec<GlobalId>,
74    /// Frontier before which updates should not be emitted.
75    ///
76    /// We *must* apply it to sinks, to ensure correct outputs.
77    /// We *should* apply it to sources and imported traces, because it improves performance.
78    pub as_of_frontier: Antichain<mz_repr::Timestamp>,
79    /// Frontier after which updates should not be emitted.
80    /// Used to limit the amount of work done when appropriate.
81    pub until: Antichain<mz_repr::Timestamp>,
82    /// Bindings of identifiers to collections.
83    pub bindings: BTreeMap<Id, CollectionBundle<'scope, T>>,
84    /// The logger, from Timely's logging framework, if logs are enabled.
85    pub(super) compute_logger: Option<crate::logging::compute::Logger>,
86    /// Specification for rendering linear joins.
87    pub(super) linear_join_spec: LinearJoinSpec,
88    /// The expiration time for dataflows in this context. The output's frontier should never advance
89    /// past this frontier, except the empty frontier.
90    pub dataflow_expiration: Antichain<mz_repr::Timestamp>,
91    /// The config set for this context.
92    pub config_set: Rc<ConfigSet>,
93}
94
95impl<'scope, T: RenderTimestamp> Context<'scope, T> {
96    /// Creates a new empty Context.
97    pub fn for_dataflow_in<Plan>(
98        dataflow: &DataflowDescription<Plan, CollectionMetadata>,
99        scope: Scope<'scope, T>,
100        compute_state: &ComputeState,
101        until: Antichain<mz_repr::Timestamp>,
102        dataflow_expiration: Antichain<mz_repr::Timestamp>,
103    ) -> Self {
104        use mz_ore::collections::CollectionExt as IteratorExt;
105        let dataflow_id = *scope.addr().into_first();
106        let as_of_frontier = dataflow
107            .as_of
108            .clone()
109            .unwrap_or_else(|| Antichain::from_elem(Timestamp::minimum()));
110
111        let export_ids = dataflow.export_ids().collect();
112
113        // Skip compute event logging for transient dataflows. We do this to avoid overhead for
114        // slow-path peeks, but it also affects subscribes. For now that seems fine, but we may
115        // want to reconsider in the future.
116        let compute_logger = if dataflow.is_transient() {
117            None
118        } else {
119            compute_state.compute_logger.clone()
120        };
121
122        Self {
123            scope,
124            debug_name: dataflow.debug_name.clone(),
125            dataflow_id,
126            export_ids,
127            as_of_frontier,
128            until,
129            bindings: BTreeMap::new(),
130            compute_logger,
131            linear_join_spec: compute_state.linear_join_spec,
132            dataflow_expiration,
133            config_set: Rc::clone(&compute_state.worker_config),
134        }
135    }
136}
137
138impl<'scope, T: RenderTimestamp> Context<'scope, T> {
139    /// Insert a collection bundle by an identifier.
140    ///
141    /// This is expected to be used to install external collections (sources, indexes, other views),
142    /// as well as for `Let` bindings of local collections.
143    pub fn insert_id(
144        &mut self,
145        id: Id,
146        collection: CollectionBundle<'scope, T>,
147    ) -> Option<CollectionBundle<'scope, T>> {
148        self.bindings.insert(id, collection)
149    }
150    /// Remove a collection bundle by an identifier.
151    ///
152    /// The primary use of this method is uninstalling `Let` bindings.
153    pub fn remove_id(&mut self, id: Id) -> Option<CollectionBundle<'scope, T>> {
154        self.bindings.remove(&id)
155    }
156    /// Melds a collection bundle to whatever exists.
157    pub fn update_id(&mut self, id: Id, collection: CollectionBundle<'scope, T>) {
158        if !self.bindings.contains_key(&id) {
159            self.bindings.insert(id, collection);
160        } else {
161            let binding = self
162                .bindings
163                .get_mut(&id)
164                .expect("Binding verified to exist");
165            if collection.collection.is_some() {
166                binding.collection = collection.collection;
167            }
168            for (key, flavor) in collection.arranged.into_iter() {
169                binding.arranged.insert(key, flavor);
170            }
171        }
172    }
173    /// Look up a collection bundle by an identifier.
174    pub fn lookup_id(&self, id: Id) -> Option<CollectionBundle<'scope, T>> {
175        self.bindings.get(&id).cloned()
176    }
177
178    pub(super) fn error_logger(&self) -> ErrorLogger {
179        ErrorLogger::new(self.debug_name.clone())
180    }
181}
182
183impl<'scope, T: RenderTimestamp> Context<'scope, T> {
184    /// Brings the underlying arrangements and collections into a region.
185    pub fn enter_region<'a>(
186        &self,
187        region: Scope<'a, T>,
188        bindings: Option<&std::collections::BTreeSet<Id>>,
189    ) -> Context<'a, T> {
190        let bindings = self
191            .bindings
192            .iter()
193            .filter(|(key, _)| bindings.as_ref().map(|b| b.contains(key)).unwrap_or(true))
194            .map(|(key, bundle)| (*key, bundle.enter_region(region)))
195            .collect();
196
197        Context {
198            scope: region,
199            debug_name: self.debug_name.clone(),
200            dataflow_id: self.dataflow_id.clone(),
201            export_ids: self.export_ids.clone(),
202            as_of_frontier: self.as_of_frontier.clone(),
203            until: self.until.clone(),
204            compute_logger: self.compute_logger.clone(),
205            linear_join_spec: self.linear_join_spec.clone(),
206            bindings,
207            dataflow_expiration: self.dataflow_expiration.clone(),
208            config_set: Rc::clone(&self.config_set),
209        }
210    }
211}
212
213/// Describes flavor of arrangement: local or imported trace.
214#[derive(Clone)]
215pub enum ArrangementFlavor<'scope, T: RenderTimestamp> {
216    /// A dataflow-local arrangement.
217    Local(
218        Arranged<'scope, RowRowAgent<T, Diff>>,
219        Arranged<'scope, ErrAgent<T, Diff>>,
220    ),
221    /// An imported trace from outside the dataflow.
222    ///
223    /// The `GlobalId` identifier exists so that exports of this same trace
224    /// can refer back to and depend on the original instance.
225    Trace(
226        GlobalId,
227        Arranged<'scope, RowRowEnter<mz_repr::Timestamp, Diff, T>>,
228        Arranged<'scope, ErrEnter<mz_repr::Timestamp, T>>,
229    ),
230}
231
232impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> {
233    /// Presents `self` as a stream of updates.
234    ///
235    /// Deprecated: This function is not fueled and hence risks flattening the whole arrangement.
236    ///
237    /// This method presents the contents as they are, without further computation.
238    /// If you have logic that could be applied to each record, consider using the
239    /// `flat_map` methods which allows this and can reduce the work done.
240    #[deprecated(note = "Use `flat_map` instead.")]
241    pub fn as_collection(
242        &self,
243    ) -> (
244        VecCollection<'scope, T, Row, Diff>,
245        VecCollection<'scope, T, DataflowErrorSer, Diff>,
246    ) {
247        let mut datums = DatumVec::new();
248        let logic = move |k: DatumSeq, v: DatumSeq| {
249            let mut datums_borrow = datums.borrow();
250            datums_borrow.extend(k);
251            datums_borrow.extend(v);
252            SharedRow::pack(&**datums_borrow)
253        };
254        match &self {
255            ArrangementFlavor::Local(oks, errs) => (
256                oks.clone().as_collection(logic),
257                errs.clone().as_collection(|k, &()| k.clone()),
258            ),
259            ArrangementFlavor::Trace(_, oks, errs) => (
260                oks.clone().as_collection(logic),
261                errs.clone().as_collection(|k, &()| k.clone()),
262            ),
263        }
264    }
265
266    /// Constructs and applies logic to elements of `self` and returns the results.
267    ///
268    /// The `logic` callback receives a borrow of the decoded datum vector, a timestamp, a
269    /// diff, and two output sessions: one for `ok` updates of type `(D, T, Diff)` and one for
270    /// MFP-style `DataflowErrorSer` updates. It must return the number of records *produced*
271    /// (written to either session), not the number of input tuples consumed.
272    ///
273    /// # Fuel
274    ///
275    /// The operator accumulates the returned counts as fuel and yields when the total reaches
276    /// an internal refuel threshold. The metric is output-produced (not input-consumed) on
277    /// purpose: it regulates two asymmetric pressures.
278    ///
279    /// * **Drain inputs.** The operator holds a clone of each pending `Batch` until its work
280    ///   item pops; we want to release that memory back to the upstream arrangement as soon
281    ///   as possible. A `filter(false)` MFP returns 0 for every tuple, so fuel never trips
282    ///   and the cursor runs to end-of-batch in one activation.
283    /// * **Throttle outputs.** A `map("1KB-string")` MFP produces large records per input;
284    ///   stopping when emit count hits the threshold caps how much data a single activation
285    ///   dumps on the next operator.
286    ///
287    /// The refuel constant is a pragmatic compromise: large enough to be a non-event in
288    /// steady-state, small enough that one activation can't flood downstream. There is no
289    /// universal value across MFP shapes.
290    ///
291    /// If `key` is set, this is a promise that `logic` will produce no results on
292    /// records for which the key does not evaluate to the value. This is used to
293    /// leap directly to exactly those records.
294    ///
295    /// The `max_demand` parameter limits the number of columns decoded from the
296    /// input. Only the first `max_demand` columns are decoded. Pass `usize::MAX` to
297    /// decode all columns.
298    pub fn flat_map<D, DCB, L>(
299        &self,
300        key: Option<&Row>,
301        max_demand: usize,
302        mut logic: L,
303    ) -> (
304        Stream<'scope, T, DCB::Container>,
305        VecCollection<'scope, T, DataflowErrorSer, Diff>,
306    )
307    where
308        D: Data,
309        DCB: ContainerBuilder + PushInto<(D, T, Diff)>,
310        L: for<'a, 'b> FnMut(
311                &'a mut DatumVecBorrow<'b>,
312                T,
313                Diff,
314                &mut Session<T, DCB>,
315                &mut Session<T, ECB<T>>,
316            ) -> usize
317            + 'static,
318    {
319        let mut datums = DatumVec::new();
320        let logic = move |k: DatumSeq,
321                          v: DatumSeq,
322                          t,
323                          d,
324                          ok_session: &mut Session<T, DCB>,
325                          err_session: &mut Session<T, ECB<T>>| {
326            let mut datums_borrow = datums.borrow();
327            datums_borrow.extend(k.to_datum_iter().take(max_demand));
328            let max_demand = max_demand.saturating_sub(datums_borrow.len());
329            datums_borrow.extend(v.to_datum_iter().take(max_demand));
330            logic(&mut datums_borrow, t, d, ok_session, err_session)
331        };
332
333        match &self {
334            ArrangementFlavor::Local(oks, errs) => {
335                let (oks, mfp_errs) = CollectionBundle::<T>::flat_map_core_fallible::<_, _, DCB, _>(
336                    oks.clone(),
337                    key,
338                    logic,
339                    REFUEL,
340                );
341                let errs = errs.clone().as_collection(|k, &()| k.clone());
342                let errs = errs.concat(mfp_errs.as_collection());
343                (oks, errs)
344            }
345            ArrangementFlavor::Trace(_, oks, errs) => {
346                let (oks, mfp_errs) = CollectionBundle::<T>::flat_map_core_fallible::<_, _, DCB, _>(
347                    oks.clone(),
348                    key,
349                    logic,
350                    REFUEL,
351                );
352                let errs = errs.clone().as_collection(|k, &()| k.clone());
353                let errs = errs.concat(mfp_errs.as_collection());
354                (oks, errs)
355            }
356        }
357    }
358
359    /// Ok-only variant of [`Self::flat_map`]. The `logic` callback receives a single output
360    /// session, cannot produce errors, and returns the number of records produced (see
361    /// [`Self::flat_map`] for fuel semantics). The returned err collection comes solely from
362    /// the arrangement; no extra operator is built to carry an empty MFP-error stream.
363    pub fn flat_map_ok<D, DCB, L>(
364        &self,
365        key: Option<&Row>,
366        max_demand: usize,
367        mut logic: L,
368    ) -> (
369        Stream<'scope, T, DCB::Container>,
370        VecCollection<'scope, T, DataflowErrorSer, Diff>,
371    )
372    where
373        D: Data,
374        DCB: ContainerBuilder + PushInto<(D, T, Diff)>,
375        L: for<'a, 'b> FnMut(&'a mut DatumVecBorrow<'b>, T, Diff, &mut Session<T, DCB>) -> usize
376            + 'static,
377    {
378        let mut datums = DatumVec::new();
379        let logic = move |k: DatumSeq, v: DatumSeq, t, d, ok_session: &mut Session<T, DCB>| {
380            let mut datums_borrow = datums.borrow();
381            datums_borrow.extend(k.to_datum_iter().take(max_demand));
382            let max_demand = max_demand.saturating_sub(datums_borrow.len());
383            datums_borrow.extend(v.to_datum_iter().take(max_demand));
384            logic(&mut datums_borrow, t, d, ok_session)
385        };
386
387        match &self {
388            ArrangementFlavor::Local(oks, errs) => {
389                let oks = CollectionBundle::<T>::flat_map_core_ok::<_, _, DCB, _>(
390                    oks.clone(),
391                    key,
392                    logic,
393                    REFUEL,
394                );
395                let errs = errs.clone().as_collection(|k, &()| k.clone());
396                (oks, errs)
397            }
398            ArrangementFlavor::Trace(_, oks, errs) => {
399                let oks = CollectionBundle::<T>::flat_map_core_ok::<_, _, DCB, _>(
400                    oks.clone(),
401                    key,
402                    logic,
403                    REFUEL,
404                );
405                let errs = errs.clone().as_collection(|k, &()| k.clone());
406                (oks, errs)
407            }
408        }
409    }
410}
411impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> {
412    /// The scope containing the collection bundle.
413    pub fn scope(&self) -> Scope<'scope, T> {
414        match self {
415            ArrangementFlavor::Local(oks, _errs) => oks.stream.scope(),
416            ArrangementFlavor::Trace(_gid, oks, _errs) => oks.stream.scope(),
417        }
418    }
419
420    /// Brings the arrangement flavor into a region.
421    pub fn enter_region<'a>(&self, region: Scope<'a, T>) -> ArrangementFlavor<'a, T> {
422        match self {
423            ArrangementFlavor::Local(oks, errs) => ArrangementFlavor::Local(
424                oks.clone().enter_region(region),
425                errs.clone().enter_region(region),
426            ),
427            ArrangementFlavor::Trace(gid, oks, errs) => ArrangementFlavor::Trace(
428                *gid,
429                oks.clone().enter_region(region),
430                errs.clone().enter_region(region),
431            ),
432        }
433    }
434}
435impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> {
436    /// Extracts the arrangement flavor from a region.
437    pub fn leave_region<'outer>(&self, outer: Scope<'outer, T>) -> ArrangementFlavor<'outer, T> {
438        match self {
439            ArrangementFlavor::Local(oks, errs) => ArrangementFlavor::Local(
440                oks.clone().leave_region(outer),
441                errs.clone().leave_region(outer),
442            ),
443            ArrangementFlavor::Trace(gid, oks, errs) => ArrangementFlavor::Trace(
444                *gid,
445                oks.clone().leave_region(outer),
446                errs.clone().leave_region(outer),
447            ),
448        }
449    }
450}
451
452/// A bundle of the various ways a collection can be represented.
453///
454/// This type maintains the invariant that it does contain at least one valid
455/// source of data, either a collection or at least one arrangement.
456#[derive(Clone)]
457pub struct CollectionBundle<'scope, T: RenderTimestamp> {
458    pub collection: Option<(
459        VecCollection<'scope, T, Row, Diff>,
460        VecCollection<'scope, T, DataflowErrorSer, Diff>,
461    )>,
462    pub arranged: BTreeMap<Vec<MirScalarExpr>, ArrangementFlavor<'scope, T>>,
463}
464
465impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
466    /// Construct a new collection bundle from update streams.
467    pub fn from_collections(
468        oks: VecCollection<'scope, T, Row, Diff>,
469        errs: VecCollection<'scope, T, DataflowErrorSer, Diff>,
470    ) -> Self {
471        Self {
472            collection: Some((oks, errs)),
473            arranged: BTreeMap::default(),
474        }
475    }
476
477    /// Inserts arrangements by the expressions on which they are keyed.
478    pub fn from_expressions(
479        exprs: Vec<MirScalarExpr>,
480        arrangements: ArrangementFlavor<'scope, T>,
481    ) -> Self {
482        let mut arranged = BTreeMap::new();
483        arranged.insert(exprs, arrangements);
484        Self {
485            collection: None,
486            arranged,
487        }
488    }
489
490    /// Inserts arrangements by the columns on which they are keyed.
491    pub fn from_columns<I: IntoIterator<Item = usize>>(
492        columns: I,
493        arrangements: ArrangementFlavor<'scope, T>,
494    ) -> Self {
495        let mut keys = Vec::new();
496        for column in columns {
497            keys.push(MirScalarExpr::column(column));
498        }
499        Self::from_expressions(keys, arrangements)
500    }
501
502    /// The scope containing the collection bundle.
503    pub fn scope(&self) -> Scope<'scope, T> {
504        if let Some((oks, _errs)) = &self.collection {
505            oks.inner.scope()
506        } else {
507            self.arranged
508                .values()
509                .next()
510                .expect("Must contain a valid collection")
511                .scope()
512        }
513    }
514
515    /// Brings the collection bundle into a region.
516    pub fn enter_region<'inner>(&self, region: Scope<'inner, T>) -> CollectionBundle<'inner, T> {
517        CollectionBundle {
518            collection: self.collection.as_ref().map(|(oks, errs)| {
519                (
520                    oks.clone().enter_region(region),
521                    errs.clone().enter_region(region),
522                )
523            }),
524            arranged: self
525                .arranged
526                .iter()
527                .map(|(key, bundle)| (key.clone(), bundle.enter_region(region)))
528                .collect(),
529        }
530    }
531}
532
533impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
534    /// Extracts the collection bundle from a region.
535    pub fn leave_region<'outer>(&self, outer: Scope<'outer, T>) -> CollectionBundle<'outer, T> {
536        CollectionBundle {
537            collection: self.collection.as_ref().map(|(oks, errs)| {
538                (
539                    oks.clone().leave_region(outer),
540                    errs.clone().leave_region(outer),
541                )
542            }),
543            arranged: self
544                .arranged
545                .iter()
546                .map(|(key, bundle)| (key.clone(), bundle.leave_region(outer)))
547                .collect(),
548        }
549    }
550}
551
552impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
553    /// Asserts that the arrangement for a specific key
554    /// (or the raw collection for no key) exists,
555    /// and returns the corresponding collection.
556    ///
557    /// This returns the collection as-is, without
558    /// doing any unthinning transformation.
559    /// Therefore, it should be used when the appropriate transformation
560    /// was planned as part of a following MFP.
561    ///
562    /// If `key` is specified, the function converts the arrangement to a collection. It uses either
563    /// the fueled `flat_map` or `as_collection` method, depending on the flag
564    /// [`ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION`].
565    pub fn as_specific_collection(
566        &self,
567        key: Option<&[MirScalarExpr]>,
568        config_set: &ConfigSet,
569    ) -> (
570        VecCollection<'scope, T, Row, Diff>,
571        VecCollection<'scope, T, DataflowErrorSer, Diff>,
572    ) {
573        // Any operator that uses this method was told to use a particular
574        // collection during LIR planning, where we should have made
575        // sure that that collection exists.
576        //
577        // If it doesn't, we panic.
578        match key {
579            None => self
580                .collection
581                .clone()
582                .expect("The unarranged collection doesn't exist."),
583            Some(key) => {
584                let arranged = self.arranged.get(key).unwrap_or_else(|| {
585                    panic!("The collection arranged by {:?} doesn't exist.", key)
586                });
587                if ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION.get(config_set) {
588                    // Decode all columns, pass max_demand as usize::MAX. Output is 1:1 from the
589                    // cursor (no duplicates), so a non-consolidating container builder is the
590                    // right choice.
591                    let (ok, err) = arranged
592                        .flat_map_ok::<_, CapacityContainerBuilder<Vec<(Row, T, Diff)>>, _>(
593                            None,
594                            usize::MAX,
595                            |borrow, t, r, ok_session| {
596                                ok_session.give((SharedRow::pack(borrow.iter()), t, r));
597                                1
598                            },
599                        );
600                    (ok.as_collection(), err)
601                } else {
602                    #[allow(deprecated)]
603                    arranged.as_collection()
604                }
605            }
606        }
607    }
608
609    /// Constructs and applies logic to elements of a collection and returns the results.
610    ///
611    /// The function applies `logic` on elements. The logic conceptually receives
612    /// `(&Row, &Row)` pairs in the form of a datum vec in the expected order.
613    ///
614    /// If `key_val` is set, this is a promise that `logic` will produce no results on
615    /// records for which the key does not evaluate to the value. This is used when we
616    /// have an arrangement by that key to leap directly to exactly those records.
617    /// It is important that `logic` still guard against data that does not satisfy
618    /// this constraint, as this method does not statically know that it will have
619    /// that arrangement.
620    ///
621    /// The `max_demand` parameter limits the number of columns decoded from the
622    /// input. Only the first `max_demand` columns are decoded. Pass `usize::MAX` to
623    /// decode all columns.
624    pub fn flat_map<D, DCB, L>(
625        &self,
626        key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
627        max_demand: usize,
628        mut logic: L,
629    ) -> (
630        Stream<'scope, T, DCB::Container>,
631        VecCollection<'scope, T, DataflowErrorSer, Diff>,
632    )
633    where
634        D: Data,
635        DCB: ContainerBuilder + PushInto<(D, T, Diff)>,
636        L: for<'a> FnMut(
637                &'a mut DatumVecBorrow<'_>,
638                T,
639                Diff,
640                &mut Session<T, DCB>,
641                &mut Session<T, ECB<T>>,
642            ) -> usize
643            + 'static,
644    {
645        // If `key_val` is set, we should have to use the corresponding arrangement.
646        // If there isn't one, that implies an error in the contract between
647        // key-production and available arrangements.
648        if let Some((key, val)) = key_val {
649            self.arrangement(&key)
650                .expect("Should have ensured during planning that this arrangement exists.")
651                .flat_map::<_, DCB, _>(val.as_ref(), max_demand, logic)
652        } else {
653            let (oks, errs) = self
654                .collection
655                .clone()
656                .expect("Invariant violated: CollectionBundle contains no collection.");
657            let scope = oks.inner.scope();
658            let mut builder = OperatorBuilder::new("CollectionFlatMap".to_string(), scope);
659            let (ok_output, ok_stream) = builder.new_output();
660            let mut ok_output = OutputBuilder::<_, DCB>::from(ok_output);
661            let (err_output, err_stream) = builder.new_output();
662            let mut err_output = OutputBuilder::<_, ECB<T>>::from(err_output);
663            let mut input = builder.new_input(oks.inner, Pipeline);
664            builder.build(move |_capabilities| {
665                let mut datums = DatumVec::new();
666                move |_frontiers| {
667                    let mut ok_output = ok_output.activate();
668                    let mut err_output = err_output.activate();
669                    input.for_each(|time, data| {
670                        // Retain the input capability to derive a `Capability` for each output;
671                        // the `Session` type alias is fixed to `Capability<T>`.
672                        let ok_cap = time.retain(0);
673                        let err_cap = time.retain(1);
674                        let mut ok_session = ok_output.session_with_builder(&ok_cap);
675                        let mut err_session = err_output.session_with_builder(&err_cap);
676                        for (v, t, d) in data.drain(..) {
677                            logic(
678                                &mut datums.borrow_with_limit(&v, max_demand),
679                                t,
680                                d,
681                                &mut ok_session,
682                                &mut err_session,
683                            );
684                        }
685                    });
686                }
687            });
688            let errs = errs.concat(err_stream.as_collection());
689            (ok_stream, errs)
690        }
691    }
692
693    /// Factored out common logic for using literal keys in general traces.
694    ///
695    /// This logic is sufficiently interesting that we want to write it only
696    /// once, and thereby avoid any skew in the two uses of the logic.
697    ///
698    /// The function presents the contents of the trace as `(key, value, time, delta)` tuples,
699    /// where key and value are potentially specialized, but convertible into rows. The `logic`
700    /// callback writes ok results into the first session and errors into the second, returning
701    /// the number of records produced. See [`ArrangementFlavor::flat_map`] for the fuel
702    /// rationale.
703    fn flat_map_core_fallible<Tr, D, DCB, L>(
704        trace: Arranged<'scope, Tr>,
705        key: Option<&<Tr::KeyContainer as BatchContainer>::Owned>,
706        mut logic: L,
707        refuel: usize,
708    ) -> (
709        Stream<'scope, T, DCB::Container>,
710        Stream<'scope, T, Vec<(DataflowErrorSer, T, Diff)>>,
711    )
712    where
713        Tr: for<'a> TraceReader<
714                Key<'a>: ToDatumIter,
715                Val<'a>: ToDatumIter,
716                Time = T,
717                Diff = mz_repr::Diff,
718            > + Clone
719            + 'static,
720        <Tr::KeyContainer as BatchContainer>::Owned: PartialEq,
721        D: Data,
722        DCB: ContainerBuilder + PushInto<(D, T, Diff)>,
723        L: FnMut(
724                Tr::Key<'_>,
725                Tr::Val<'_>,
726                T,
727                mz_repr::Diff,
728                &mut Session<T, DCB>,
729                &mut Session<T, ECB<T>>,
730            ) -> usize
731            + 'static,
732    {
733        let scope = trace.stream.scope();
734
735        let mut key_con = Tr::KeyContainer::with_capacity(1);
736        if let Some(key) = &key {
737            key_con.push_own(key);
738        }
739        let mode = if key.is_some() { "index" } else { "scan" };
740        let name = format!("ArrangementFlatMap({})", mode);
741
742        let mut builder = OperatorBuilder::new(name, scope.clone());
743        let (ok_output, ok_stream) = builder.new_output();
744        let mut ok_output = OutputBuilder::<_, DCB>::from(ok_output);
745        let (err_output, err_stream) = builder.new_output();
746        let mut err_output = OutputBuilder::<_, ECB<T>>::from(err_output);
747        let mut input = builder.new_input(trace.stream.clone(), Pipeline);
748        let operator_info = builder.operator_info();
749
750        builder.build(move |_capabilities| {
751            // Acquire an activator to reschedule the operator when it has unfinished work.
752            let activator = scope.activator_for(operator_info.address);
753            // Maintain a list of work to do, cursor to navigate and process.
754            let mut todo = std::collections::VecDeque::new();
755            move |_frontiers| {
756                let key = key_con.get(0);
757                let mut ok_output = ok_output.activate();
758                let mut err_output = err_output.activate();
759
760                // First, dequeue all batches.
761                input.for_each(|time, data| {
762                    // Retain a capability for each output, as the work may complete across
763                    // multiple activations.
764                    let ok_cap = time.retain(0);
765                    let err_cap = time.retain(1);
766                    for batch in data.iter() {
767                        todo.push_back(PendingWork::new(
768                            ok_cap.clone(),
769                            err_cap.clone(),
770                            batch.cursor(),
771                            batch.clone(),
772                        ));
773                    }
774                });
775
776                // Second, make progress on `todo`.
777                let mut fuel = refuel;
778                while !todo.is_empty() && fuel > 0 {
779                    todo.front_mut().unwrap().do_work(
780                        key.as_ref(),
781                        &mut logic,
782                        &mut fuel,
783                        &mut ok_output,
784                        &mut err_output,
785                    );
786                    if fuel > 0 {
787                        todo.pop_front();
788                    }
789                }
790                // If we have not finished all work, re-activate the operator.
791                if !todo.is_empty() {
792                    activator.activate();
793                }
794            }
795        });
796
797        (ok_stream, err_stream)
798    }
799
800    /// Ok-only variant of [`Self::flat_map_core_fallible`]. The `logic` callback writes results
801    /// into a single output session and returns the number of records produced (see the
802    /// fallible variant for fuel semantics). Use this when the caller statically knows it
803    /// will never produce `DataflowErrorSer` records, to avoid building a second output port
804    /// and the empty err stream that would follow it.
805    fn flat_map_core_ok<Tr, D, DCB, L>(
806        trace: Arranged<'scope, Tr>,
807        key: Option<&<Tr::KeyContainer as BatchContainer>::Owned>,
808        mut logic: L,
809        refuel: usize,
810    ) -> Stream<'scope, T, DCB::Container>
811    where
812        Tr: for<'a> TraceReader<
813                Key<'a>: ToDatumIter,
814                Val<'a>: ToDatumIter,
815                Time = T,
816                Diff = mz_repr::Diff,
817            > + Clone
818            + 'static,
819        <Tr::KeyContainer as BatchContainer>::Owned: PartialEq,
820        D: Data,
821        DCB: ContainerBuilder + PushInto<(D, T, Diff)>,
822        L: FnMut(Tr::Key<'_>, Tr::Val<'_>, T, mz_repr::Diff, &mut Session<T, DCB>) -> usize
823            + 'static,
824    {
825        let scope = trace.stream.scope();
826
827        let mut key_con = Tr::KeyContainer::with_capacity(1);
828        if let Some(key) = &key {
829            key_con.push_own(key);
830        }
831        let mode = if key.is_some() { "index" } else { "scan" };
832        let name = format!("ArrangementFlatMapOk({})", mode);
833
834        let mut builder = OperatorBuilder::new(name, scope.clone());
835        let (ok_output, ok_stream) = builder.new_output();
836        let mut ok_output = OutputBuilder::<_, DCB>::from(ok_output);
837        let mut input = builder.new_input(trace.stream.clone(), Pipeline);
838        let operator_info = builder.operator_info();
839
840        builder.build(move |_capabilities| {
841            let activator = scope.activator_for(operator_info.address);
842            let mut todo = std::collections::VecDeque::new();
843            move |_frontiers| {
844                let key = key_con.get(0);
845                let mut ok_output = ok_output.activate();
846
847                input.for_each(|time, data| {
848                    let cap = time.retain(0);
849                    for batch in data.iter() {
850                        todo.push_back(PendingWorkOk::new(
851                            cap.clone(),
852                            batch.cursor(),
853                            batch.clone(),
854                        ));
855                    }
856                });
857
858                let mut fuel = refuel;
859                while !todo.is_empty() && fuel > 0 {
860                    todo.front_mut().unwrap().do_work(
861                        key.as_ref(),
862                        &mut logic,
863                        &mut fuel,
864                        &mut ok_output,
865                    );
866                    if fuel > 0 {
867                        todo.pop_front();
868                    }
869                }
870                if !todo.is_empty() {
871                    activator.activate();
872                }
873            }
874        });
875
876        ok_stream
877    }
878
879    /// Look up an arrangement by the expressions that form the key.
880    ///
881    /// The result may be `None` if no such arrangement exists, or it may be one of many
882    /// "arrangement flavors" that represent the types of arranged data we might have.
883    pub fn arrangement(&self, key: &[MirScalarExpr]) -> Option<ArrangementFlavor<'scope, T>> {
884        self.arranged.get(key).map(|x| x.clone())
885    }
886}
887
888impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
889    /// Presents `self` as a stream of updates, having been subjected to `mfp`.
890    ///
891    /// This operator is able to apply the logic of `mfp` early, which can substantially
892    /// reduce the amount of data produced when `mfp` is non-trivial.
893    ///
894    /// The `key_val` argument, when present, indicates that a specific arrangement should
895    /// be used, and if, in addition, the `val` component is present,
896    /// that we can seek to the supplied row.
897    pub fn as_collection_core(
898        &self,
899        mut mfp: MapFilterProject,
900        key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
901        until: Antichain<mz_repr::Timestamp>,
902        config_set: &ConfigSet,
903    ) -> (
904        VecCollection<'scope, T, mz_repr::Row, Diff>,
905        VecCollection<'scope, T, DataflowErrorSer, Diff>,
906    ) {
907        mfp.optimize();
908        let mfp_plan = mfp.clone().into_plan().unwrap();
909
910        // If the MFP is trivial, we can just call `as_collection`.
911        // In the case that we weren't going to apply the `key_val` optimization,
912        // this path results in a slightly smaller and faster
913        // dataflow graph, and is intended to fix
914        // https://github.com/MaterializeInc/database-issues/issues/3111
915        let has_key_val = if let Some((_key, Some(_val))) = &key_val {
916            true
917        } else {
918            false
919        };
920
921        if mfp_plan.is_identity() && !has_key_val {
922            let key = key_val.map(|(k, _v)| k);
923            return self.as_specific_collection(key.as_deref(), config_set);
924        }
925
926        let max_demand = mfp.demand().last().map(|x| *x + 1).unwrap_or(0);
927        mfp.permute_fn(|c| c, max_demand);
928        mfp.optimize();
929        let mfp_plan = mfp.into_plan().unwrap();
930
931        let mut datum_vec = DatumVec::new();
932        // Wrap in an `Rc` so that lifetimes work out.
933        let until = std::rc::Rc::new(until);
934
935        let (stream, errors) = self
936            .flat_map::<_, ConsolidatingContainerBuilder<Vec<(Row, T, Diff)>>, _>(
937                key_val,
938                max_demand,
939                move |row_datums, time, diff, ok_session, err_session| {
940                    let mut row_builder = SharedRow::get();
941                    let until = std::rc::Rc::clone(&until);
942                    let temp_storage = RowArena::new();
943                    let row_iter = row_datums.iter();
944                    let mut datums_local = datum_vec.borrow();
945                    datums_local.extend(row_iter);
946                    let event_time = time.event_time();
947                    let mut work: usize = 0;
948                    for result in mfp_plan.evaluate(
949                        &mut datums_local,
950                        &temp_storage,
951                        event_time,
952                        diff.clone(),
953                        move |time| !until.less_equal(time),
954                        &mut row_builder,
955                    ) {
956                        work += 1;
957                        match result {
958                            Ok((row, event_time, diff)) => {
959                                // Copy the whole time, and re-populate event time.
960                                let mut time: T = time.clone();
961                                *time.event_time_mut() = event_time;
962                                ok_session.give((row, time, diff));
963                            }
964                            Err((e, event_time, diff)) => {
965                                // Copy the whole time, and re-populate event time.
966                                let mut time: T = time.clone();
967                                *time.event_time_mut() = event_time;
968                                err_session.give((e, time, diff));
969                            }
970                        }
971                    }
972                    work
973                },
974            );
975
976        (stream.as_collection(), errors)
977    }
978    pub fn ensure_collections(
979        mut self,
980        collections: AvailableCollections,
981        input_key: Option<Vec<MirScalarExpr>>,
982        input_mfp: MapFilterProject,
983        as_of: Antichain<mz_repr::Timestamp>,
984        until: Antichain<mz_repr::Timestamp>,
985        config_set: &ConfigSet,
986        strategy: ArrangementStrategy,
987    ) -> Self
988    where
989        T: MaybeBucketByTime,
990    {
991        if collections == Default::default() {
992            return self;
993        }
994        // Cache collection to avoid reforming it each time.
995        //
996        // TODO(mcsherry): In theory this could be faster run out of another arrangement,
997        // as the `map_fallible` that follows could be run against an arrangement itself.
998        //
999        // Note(btv): If we ever do that, we would then only need to make the raw collection here
1000        // if `collections.raw` is true.
1001
1002        for (key, _, _) in collections.arranged.iter() {
1003            soft_assert_or_log!(
1004                !self.arranged.contains_key(key),
1005                "LIR ArrangeBy tried to create an existing arrangement"
1006            );
1007        }
1008
1009        // Track whether we already applied temporal bucketing in this call, to
1010        // avoid bucketing the same updates twice.
1011        let mut bucketed = false;
1012
1013        // True iff at least one new arrangement will actually be built below. Bucketing only
1014        // pays off when something downstream merges/compacts the future-stamped updates; on a
1015        // pure raw collection (no new arrangement) the work is wasted.
1016        let will_create_arrangement = collections
1017            .arranged
1018            .iter()
1019            .any(|(key, _, _)| !self.arranged.contains_key(key));
1020
1021        // We need the collection if either (1) it is explicitly demanded, or (2) we are going to render any arrangement
1022        let form_raw_collection = collections.raw || will_create_arrangement;
1023        if form_raw_collection && self.collection.is_none() {
1024            let (oks, errs) =
1025                self.as_collection_core(input_mfp, input_key.map(|k| (k, None)), until, config_set);
1026            // Apply temporal bucketing when the lowering selected `TemporalBucketing` and
1027            // we will build at least one arrangement. This path fires when the collection
1028            // must be formed from scratch (e.g., from an arrangement via as_collection_core).
1029            let effective_strategy = if will_create_arrangement {
1030                strategy
1031            } else {
1032                ArrangementStrategy::Direct
1033            };
1034            let oks = if matches!(effective_strategy, ArrangementStrategy::TemporalBucketing)
1035                && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(config_set)
1036            {
1037                let summary: mz_repr::Timestamp = TEMPORAL_BUCKETING_SUMMARY
1038                    .get(config_set)
1039                    .try_into()
1040                    .expect("must fit");
1041                bucketed = true;
1042                T::maybe_apply_temporal_bucketing(oks.inner, as_of.clone(), summary)
1043            } else {
1044                oks
1045            };
1046            self.collection = Some((oks, errs));
1047        }
1048        for (key, _, thinning) in collections.arranged {
1049            if !self.arranged.contains_key(&key) {
1050                // TODO: Consider allowing more expressive names.
1051                let name = format!("ArrangeBy[{:?}]", key);
1052
1053                let (oks, errs) = self
1054                    .collection
1055                    .take()
1056                    .expect("Collection constructed above");
1057                // Apply temporal bucketing if the collection already existed on
1058                // the bundle (e.g., from an upstream temporal Mfp or Get) and we
1059                // haven't bucketed yet. This is the common path for temporal-MFP
1060                // → ArrangeBy flows.
1061                let effective_strategy = if bucketed {
1062                    ArrangementStrategy::Direct
1063                } else {
1064                    strategy
1065                };
1066                let oks = if matches!(effective_strategy, ArrangementStrategy::TemporalBucketing)
1067                    && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(config_set)
1068                {
1069                    let summary: mz_repr::Timestamp = TEMPORAL_BUCKETING_SUMMARY
1070                        .get(config_set)
1071                        .try_into()
1072                        .expect("must fit");
1073                    bucketed = true;
1074                    T::maybe_apply_temporal_bucketing(oks.inner, as_of.clone(), summary)
1075                } else {
1076                    oks
1077                };
1078                let use_paged_path = ENABLE_COLUMN_PAGED_BATCHER.get(config_set);
1079                let (oks, errs_keyed, passthrough) = Self::arrange_collection(
1080                    &name,
1081                    oks,
1082                    key.clone(),
1083                    thinning.clone(),
1084                    use_paged_path,
1085                );
1086                let errs_concat: KeyCollection<_, _, _> = errs.clone().concat(errs_keyed).into();
1087                self.collection = Some((passthrough, errs));
1088                let errs =
1089                    errs_concat.mz_arrange::<
1090                        ColumnationChunker<_>,
1091                        ErrBatcher<_, _>,
1092                        ErrBuilder<_, _>,
1093                        ErrSpine<_, _>,
1094                    >(
1095                        &format!("{}-errors", name),
1096                    );
1097                self.arranged
1098                    .insert(key, ArrangementFlavor::Local(oks, errs));
1099            }
1100        }
1101        self
1102    }
1103
1104    /// Builds an arrangement from a collection, using the specified key and value thinning.
1105    ///
1106    /// The arrangement's key is based on the `key` expressions, and the value the input with
1107    /// the `thinning` applied to it. It selects which of the input columns are included in the
1108    /// value of the arrangement. The thinning is in support of permuting arrangements such that
1109    /// columns in the key are not included in the value.
1110    ///
1111    /// In addition to the ok and err streams, we produce a passthrough stream that forwards
1112    /// the input as-is, which allows downstream consumers to reuse the collection without
1113    /// teeing the stream.
1114    fn arrange_collection(
1115        name: &String,
1116        oks: VecCollection<'scope, T, Row, Diff>,
1117        key: Vec<MirScalarExpr>,
1118        thinning: Vec<usize>,
1119        use_paged_path: bool,
1120    ) -> (
1121        Arranged<'scope, RowRowAgent<T, Diff>>,
1122        VecCollection<'scope, T, DataflowErrorSer, Diff>,
1123        VecCollection<'scope, T, Row, Diff>,
1124    ) {
1125        // This operator implements a `map_fallible`, but produces columnar updates for the ok
1126        // stream. The `map_fallible` cannot be used here because the closure cannot return
1127        // references, which is what we need to push into columnar streams. Instead, we use a
1128        // bespoke operator that also optimizes reuse of allocations across individual updates.
1129        let mut builder = OperatorBuilder::new("FormArrangementKey".to_string(), oks.inner.scope());
1130        let (ok_output, ok_stream) = builder.new_output();
1131        let mut ok_output =
1132            OutputBuilder::<_, ColumnBuilder<((Row, Row), T, Diff)>>::from(ok_output);
1133        let (err_output, err_stream) = builder.new_output();
1134        let mut err_output = OutputBuilder::from(err_output);
1135        let (passthrough_output, passthrough_stream) = builder.new_output();
1136        let mut passthrough_output = OutputBuilder::from(passthrough_output);
1137        let mut input = builder.new_input(oks.inner, Pipeline);
1138        builder.set_notify_for(0, FrontierInterest::Never);
1139        builder.build(move |_capabilities| {
1140            let mut key_buf = Row::default();
1141            let mut val_buf = Row::default();
1142            let mut datums = DatumVec::new();
1143            let mut temp_storage = RowArena::new();
1144            move |_frontiers| {
1145                let mut ok_output = ok_output.activate();
1146                let mut err_output = err_output.activate();
1147                let mut passthrough_output = passthrough_output.activate();
1148                input.for_each(|time, data| {
1149                    let mut ok_session = ok_output.session_with_builder(&time);
1150                    let mut err_session = err_output.session(&time);
1151                    for (row, time, diff) in data.iter() {
1152                        temp_storage.clear();
1153                        let datums = datums.borrow_with(row);
1154                        let key_iter = key.iter().map(|k| k.eval(&datums, &temp_storage));
1155                        match key_buf.packer().try_extend(key_iter) {
1156                            Ok(()) => {
1157                                let val_datum_iter = thinning.iter().map(|c| datums[*c]);
1158                                val_buf.packer().extend(val_datum_iter);
1159                                ok_session.give(((&*key_buf, &*val_buf), time, diff));
1160                            }
1161                            Err(e) => {
1162                                err_session.give((e.into(), time.clone(), *diff));
1163                            }
1164                        }
1165                    }
1166                    passthrough_output.session(&time).give_container(data);
1167                });
1168            }
1169        });
1170
1171        let exchange =
1172            ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<Row, Row, T, Diff>);
1173        let oks = if use_paged_path {
1174            ok_stream.mz_arrange_core::<
1175                _,
1176                batcher::ColumnChunker<_>,
1177                Col2ValPagedBatcher<_, _, _, _>,
1178                RowRowColPagedBuilder<_, _>,
1179                RowRowSpine<_, _>,
1180            >(exchange, name)
1181        } else {
1182            ok_stream.mz_arrange_core::<
1183                _,
1184                batcher::Chunker<_>,
1185                Col2ValBatcher<_, _, _, _>,
1186                RowRowBuilder<_, _>,
1187                RowRowSpine<_, _>,
1188            >(exchange, name)
1189        };
1190        (
1191            oks,
1192            err_stream.as_collection(),
1193            passthrough_stream.as_collection(),
1194        )
1195    }
1196}
1197
1198/// Type alias for a timely output `Session` whose capability is a `Capability<T>`. The container
1199/// builder `CB` is left to the caller; sessions can therefore drive consolidating, capacity, or
1200/// (in the future) columnar output builders without changing call sites.
1201type Session<'a, 'b, T, CB> =
1202    timely::dataflow::operators::generic::Session<'a, 'b, T, CB, Capability<T>>;
1203
1204/// Container builder used for the err output of every flat_map variant. Pre-refactor the
1205/// merged Ok/Err stream flowed through a [`ConsolidatingContainerBuilder`] before the
1206/// `map_fallible` demux split it; we preserve that consolidation here so errors with the
1207/// same `(error, time)` cancel within a batch rather than propagating to downstream.
1208type ECB<T> = ConsolidatingContainerBuilder<Vec<(DataflowErrorSer, T, Diff)>>;
1209
1210/// Number of output records the arrangement flat_map operators may produce before yielding.
1211/// See [`ArrangementFlavor::flat_map`] for the fuel rationale; the constant is a pragmatic
1212/// compromise and not tuned empirically.
1213const REFUEL: usize = 1_000_000;
1214
1215struct PendingWork<C>
1216where
1217    C: Cursor,
1218{
1219    /// Capability for the `ok` output (output port 0).
1220    ok_capability: Capability<C::Time>,
1221    /// Capability for the `err` output (output port 1).
1222    err_capability: Capability<C::Time>,
1223    cursor: C,
1224    batch: C::Storage,
1225}
1226
1227impl<C> PendingWork<C>
1228where
1229    C: Cursor<KeyContainer: BatchContainer<Owned: PartialEq + Sized>>,
1230{
1231    /// Create a new bundle of pending work, from a pair of capabilities (one per output),
1232    /// a cursor, and backing storage.
1233    fn new(
1234        ok_capability: Capability<C::Time>,
1235        err_capability: Capability<C::Time>,
1236        cursor: C,
1237        batch: C::Storage,
1238    ) -> Self {
1239        Self {
1240            ok_capability,
1241            err_capability,
1242            cursor,
1243            batch,
1244        }
1245    }
1246    /// Perform roughly `fuel` work through the cursor, applying `logic` and sending results to
1247    /// the two output sessions.
1248    fn do_work<D, DCB, L>(
1249        &mut self,
1250        key: Option<&C::Key<'_>>,
1251        logic: &mut L,
1252        fuel: &mut usize,
1253        ok_output: &mut OutputBuilderSession<'_, C::Time, DCB>,
1254        err_output: &mut OutputBuilderSession<'_, C::Time, ECB<C::Time>>,
1255    ) where
1256        D: Data,
1257        DCB: ContainerBuilder + PushInto<(D, C::Time, C::Diff)>,
1258        L: FnMut(
1259                C::Key<'_>,
1260                C::Val<'_>,
1261                C::Time,
1262                C::Diff,
1263                &mut Session<C::Time, DCB>,
1264                &mut Session<C::Time, ECB<C::Time>>,
1265            ) -> usize
1266            + 'static,
1267    {
1268        let mut ok_session = ok_output.session_with_builder(&self.ok_capability);
1269        let mut err_session = err_output.session_with_builder(&self.err_capability);
1270        walk_cursor(&mut self.cursor, &self.batch, key, fuel, |k, v, t, d| {
1271            logic(k, v, t, d, &mut ok_session, &mut err_session)
1272        });
1273    }
1274}
1275
1276/// Pending work for the Ok-only variant of `flat_map_core_fallible`. Holds a single capability since
1277/// the operator has only one output port.
1278struct PendingWorkOk<C>
1279where
1280    C: Cursor,
1281{
1282    capability: Capability<C::Time>,
1283    cursor: C,
1284    batch: C::Storage,
1285}
1286
1287impl<C> PendingWorkOk<C>
1288where
1289    C: Cursor<KeyContainer: BatchContainer<Owned: PartialEq + Sized>>,
1290{
1291    fn new(capability: Capability<C::Time>, cursor: C, batch: C::Storage) -> Self {
1292        Self {
1293            capability,
1294            cursor,
1295            batch,
1296        }
1297    }
1298
1299    /// Perform roughly `fuel` work through the cursor, applying `logic` and sending results to
1300    /// the single output session.
1301    fn do_work<D, DCB, L>(
1302        &mut self,
1303        key: Option<&C::Key<'_>>,
1304        logic: &mut L,
1305        fuel: &mut usize,
1306        ok_output: &mut OutputBuilderSession<'_, C::Time, DCB>,
1307    ) where
1308        D: Data,
1309        DCB: ContainerBuilder + PushInto<(D, C::Time, C::Diff)>,
1310        L: FnMut(C::Key<'_>, C::Val<'_>, C::Time, C::Diff, &mut Session<C::Time, DCB>) -> usize
1311            + 'static,
1312    {
1313        let mut ok_session = ok_output.session_with_builder(&self.capability);
1314        walk_cursor(&mut self.cursor, &self.batch, key, fuel, |k, v, t, d| {
1315            logic(k, v, t, d, &mut ok_session)
1316        });
1317    }
1318}
1319
1320/// Walk a cursor, calling `emit` for each consolidated `(key, val, time, diff)` tuple. If
1321/// `key` is set, the cursor is seeked to it and only values for that key are produced.
1322///
1323/// `emit` returns the number of records it produced for the given input tuple. The cursor
1324/// stops as soon as the accumulated emit count reaches `*fuel`, leaving the cursor in place
1325/// so work can resume on a later call. Within a batch, both the inner val loop and the
1326/// outer key loop are bounded only by emit count, so selective filters (`emit` returns 0)
1327/// run to batch completion in a single activation — see [`ArrangementFlavor::flat_map`]
1328/// for why fuel counts output rather than input.
1329fn walk_cursor<C, F>(
1330    cursor: &mut C,
1331    batch: &C::Storage,
1332    key: Option<&C::Key<'_>>,
1333    fuel: &mut usize,
1334    mut emit: F,
1335) where
1336    C: Cursor<KeyContainer: BatchContainer<Owned: PartialEq + Sized>>,
1337    F: FnMut(C::Key<'_>, C::Val<'_>, C::Time, C::Diff) -> usize,
1338{
1339    use differential_dataflow::consolidation::consolidate;
1340
1341    let mut work: usize = 0;
1342    let mut buffer = Vec::new();
1343    if let Some(key) = key {
1344        let key = C::KeyContainer::reborrow(*key);
1345        if cursor.get_key(batch).map(|k| k == key) != Some(true) {
1346            cursor.seek_key(batch, key);
1347        }
1348        if cursor.get_key(batch).map(|k| k == key) == Some(true) {
1349            let key = cursor.key(batch);
1350            while let Some(val) = cursor.get_val(batch) {
1351                cursor.map_times(batch, |time, diff| {
1352                    buffer.push((C::owned_time(time), C::owned_diff(diff)));
1353                });
1354                consolidate(&mut buffer);
1355                for (time, diff) in buffer.drain(..) {
1356                    work += emit(key, val, time, diff);
1357                }
1358                cursor.step_val(batch);
1359                if work >= *fuel {
1360                    *fuel = 0;
1361                    return;
1362                }
1363            }
1364        }
1365    } else {
1366        while let Some(key) = cursor.get_key(batch) {
1367            while let Some(val) = cursor.get_val(batch) {
1368                cursor.map_times(batch, |time, diff| {
1369                    buffer.push((C::owned_time(time), C::owned_diff(diff)));
1370                });
1371                consolidate(&mut buffer);
1372                for (time, diff) in buffer.drain(..) {
1373                    work += emit(key, val, time, diff);
1374                }
1375                cursor.step_val(batch);
1376                if work >= *fuel {
1377                    *fuel = 0;
1378                    return;
1379                }
1380            }
1381            cursor.step_key(batch);
1382        }
1383    }
1384    *fuel -= work;
1385}