Skip to main content

mz_compute/render/
context.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Management of dataflow-local state, like arrangements, while building a
11//! dataflow.
12
13use std::collections::BTreeMap;
14use std::rc::Rc;
15
16use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
17use differential_dataflow::operators::arrange::Arranged;
18use differential_dataflow::trace::implementations::BatchContainer;
19use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
20use differential_dataflow::{AsCollection, Data, VecCollection};
21use mz_compute_types::dataflows::DataflowDescription;
22use mz_compute_types::dyncfgs::{
23    ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION, ENABLE_COMPUTE_TEMPORAL_BUCKETING,
24    TEMPORAL_BUCKETING_SUMMARY,
25};
26use mz_compute_types::plan::{ArrangementStrategy, AvailableCollections};
27use mz_dyncfg::ConfigSet;
28use mz_expr::{Id, MapFilterProject, MirScalarExpr};
29use mz_ore::soft_assert_or_log;
30use mz_repr::fixed_length::ToDatumIter;
31use mz_repr::{DatumVec, DatumVecBorrow, Diff, GlobalId, Row, RowArena, SharedRow};
32use mz_storage_types::controller::CollectionMetadata;
33use mz_timely_util::columnar::builder::ColumnBuilder;
34use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
35use timely::ContainerBuilder;
36use timely::container::{CapacityContainerBuilder, PushInto};
37use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
38use timely::dataflow::operators::Capability;
39use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
40use timely::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession};
41use timely::dataflow::{Scope, Stream};
42use timely::progress::operate::FrontierInterest;
43use timely::progress::{Antichain, Timestamp};
44
45use crate::compute_state::ComputeState;
46use crate::extensions::arrange::{KeyCollection, MzArrange, MzArrangeCore};
47use crate::render::errors::{DataflowErrorSer, ErrorLogger};
48use crate::render::{LinearJoinSpec, MaybeBucketByTime, RenderTimestamp};
49use crate::row_spine::{DatumSeq, RowRowBuilder};
50use crate::typedefs::{
51    ErrAgent, ErrBatcher, ErrBuilder, ErrEnter, ErrSpine, RowRowAgent, RowRowEnter, RowRowSpine,
52};
53
54/// Dataflow-local collections and arrangements.
55///
56/// A context means to wrap available data assets and present them in an easy-to-use manner.
57/// These assets include dataflow-local collections and arrangements, as well as imported
58/// arrangements from outside the dataflow.
59///
60/// Context has a timestamp type `T`, which is the timestamp used by the scope in question.
61pub struct Context<'scope, T: RenderTimestamp> {
62    /// The scope within which all managed collections exist.
63    ///
64    /// It is an error to add any collections not contained in this scope.
65    pub(crate) scope: Scope<'scope, T>,
66    /// The debug name of the dataflow associated with this context.
67    pub debug_name: String,
68    /// The Timely ID of the dataflow associated with this context.
69    pub dataflow_id: usize,
70    /// The collection IDs of exports of the dataflow associated with this context.
71    pub export_ids: Vec<GlobalId>,
72    /// Frontier before which updates should not be emitted.
73    ///
74    /// We *must* apply it to sinks, to ensure correct outputs.
75    /// We *should* apply it to sources and imported traces, because it improves performance.
76    pub as_of_frontier: Antichain<mz_repr::Timestamp>,
77    /// Frontier after which updates should not be emitted.
78    /// Used to limit the amount of work done when appropriate.
79    pub until: Antichain<mz_repr::Timestamp>,
80    /// Bindings of identifiers to collections.
81    pub bindings: BTreeMap<Id, CollectionBundle<'scope, T>>,
82    /// The logger, from Timely's logging framework, if logs are enabled.
83    pub(super) compute_logger: Option<crate::logging::compute::Logger>,
84    /// Specification for rendering linear joins.
85    pub(super) linear_join_spec: LinearJoinSpec,
86    /// The expiration time for dataflows in this context. The output's frontier should never advance
87    /// past this frontier, except the empty frontier.
88    pub dataflow_expiration: Antichain<mz_repr::Timestamp>,
89    /// The config set for this context.
90    pub config_set: Rc<ConfigSet>,
91}
92
93impl<'scope, T: RenderTimestamp> Context<'scope, T> {
94    /// Creates a new empty Context.
95    pub fn for_dataflow_in<Plan>(
96        dataflow: &DataflowDescription<Plan, CollectionMetadata>,
97        scope: Scope<'scope, T>,
98        compute_state: &ComputeState,
99        until: Antichain<mz_repr::Timestamp>,
100        dataflow_expiration: Antichain<mz_repr::Timestamp>,
101    ) -> Self {
102        use mz_ore::collections::CollectionExt as IteratorExt;
103        let dataflow_id = *scope.addr().into_first();
104        let as_of_frontier = dataflow
105            .as_of
106            .clone()
107            .unwrap_or_else(|| Antichain::from_elem(Timestamp::minimum()));
108
109        let export_ids = dataflow.export_ids().collect();
110
111        // Skip compute event logging for transient dataflows. We do this to avoid overhead for
112        // slow-path peeks, but it also affects subscribes. For now that seems fine, but we may
113        // want to reconsider in the future.
114        let compute_logger = if dataflow.is_transient() {
115            None
116        } else {
117            compute_state.compute_logger.clone()
118        };
119
120        Self {
121            scope,
122            debug_name: dataflow.debug_name.clone(),
123            dataflow_id,
124            export_ids,
125            as_of_frontier,
126            until,
127            bindings: BTreeMap::new(),
128            compute_logger,
129            linear_join_spec: compute_state.linear_join_spec,
130            dataflow_expiration,
131            config_set: Rc::clone(&compute_state.worker_config),
132        }
133    }
134}
135
136impl<'scope, T: RenderTimestamp> Context<'scope, T> {
137    /// Insert a collection bundle by an identifier.
138    ///
139    /// This is expected to be used to install external collections (sources, indexes, other views),
140    /// as well as for `Let` bindings of local collections.
141    pub fn insert_id(
142        &mut self,
143        id: Id,
144        collection: CollectionBundle<'scope, T>,
145    ) -> Option<CollectionBundle<'scope, T>> {
146        self.bindings.insert(id, collection)
147    }
148    /// Remove a collection bundle by an identifier.
149    ///
150    /// The primary use of this method is uninstalling `Let` bindings.
151    pub fn remove_id(&mut self, id: Id) -> Option<CollectionBundle<'scope, T>> {
152        self.bindings.remove(&id)
153    }
154    /// Melds a collection bundle to whatever exists.
155    pub fn update_id(&mut self, id: Id, collection: CollectionBundle<'scope, T>) {
156        if !self.bindings.contains_key(&id) {
157            self.bindings.insert(id, collection);
158        } else {
159            let binding = self
160                .bindings
161                .get_mut(&id)
162                .expect("Binding verified to exist");
163            if collection.collection.is_some() {
164                binding.collection = collection.collection;
165            }
166            for (key, flavor) in collection.arranged.into_iter() {
167                binding.arranged.insert(key, flavor);
168            }
169        }
170    }
171    /// Look up a collection bundle by an identifier.
172    pub fn lookup_id(&self, id: Id) -> Option<CollectionBundle<'scope, T>> {
173        self.bindings.get(&id).cloned()
174    }
175
176    pub(super) fn error_logger(&self) -> ErrorLogger {
177        ErrorLogger::new(self.debug_name.clone())
178    }
179}
180
181impl<'scope, T: RenderTimestamp> Context<'scope, T> {
182    /// Brings the underlying arrangements and collections into a region.
183    pub fn enter_region<'a>(
184        &self,
185        region: Scope<'a, T>,
186        bindings: Option<&std::collections::BTreeSet<Id>>,
187    ) -> Context<'a, T> {
188        let bindings = self
189            .bindings
190            .iter()
191            .filter(|(key, _)| bindings.as_ref().map(|b| b.contains(key)).unwrap_or(true))
192            .map(|(key, bundle)| (*key, bundle.enter_region(region)))
193            .collect();
194
195        Context {
196            scope: region,
197            debug_name: self.debug_name.clone(),
198            dataflow_id: self.dataflow_id.clone(),
199            export_ids: self.export_ids.clone(),
200            as_of_frontier: self.as_of_frontier.clone(),
201            until: self.until.clone(),
202            compute_logger: self.compute_logger.clone(),
203            linear_join_spec: self.linear_join_spec.clone(),
204            bindings,
205            dataflow_expiration: self.dataflow_expiration.clone(),
206            config_set: Rc::clone(&self.config_set),
207        }
208    }
209}
210
211/// Describes flavor of arrangement: local or imported trace.
212#[derive(Clone)]
213pub enum ArrangementFlavor<'scope, T: RenderTimestamp> {
214    /// A dataflow-local arrangement.
215    Local(
216        Arranged<'scope, RowRowAgent<T, Diff>>,
217        Arranged<'scope, ErrAgent<T, Diff>>,
218    ),
219    /// An imported trace from outside the dataflow.
220    ///
221    /// The `GlobalId` identifier exists so that exports of this same trace
222    /// can refer back to and depend on the original instance.
223    Trace(
224        GlobalId,
225        Arranged<'scope, RowRowEnter<mz_repr::Timestamp, Diff, T>>,
226        Arranged<'scope, ErrEnter<mz_repr::Timestamp, T>>,
227    ),
228}
229
230impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> {
231    /// Presents `self` as a stream of updates.
232    ///
233    /// Deprecated: This function is not fueled and hence risks flattening the whole arrangement.
234    ///
235    /// This method presents the contents as they are, without further computation.
236    /// If you have logic that could be applied to each record, consider using the
237    /// `flat_map` methods which allows this and can reduce the work done.
238    #[deprecated(note = "Use `flat_map` instead.")]
239    pub fn as_collection(
240        &self,
241    ) -> (
242        VecCollection<'scope, T, Row, Diff>,
243        VecCollection<'scope, T, DataflowErrorSer, Diff>,
244    ) {
245        let mut datums = DatumVec::new();
246        let logic = move |k: DatumSeq, v: DatumSeq| {
247            let mut datums_borrow = datums.borrow();
248            datums_borrow.extend(k);
249            datums_borrow.extend(v);
250            SharedRow::pack(&**datums_borrow)
251        };
252        match &self {
253            ArrangementFlavor::Local(oks, errs) => (
254                oks.clone().as_collection(logic),
255                errs.clone().as_collection(|k, &()| k.clone()),
256            ),
257            ArrangementFlavor::Trace(_, oks, errs) => (
258                oks.clone().as_collection(logic),
259                errs.clone().as_collection(|k, &()| k.clone()),
260            ),
261        }
262    }
263
264    /// Constructs and applies logic to elements of `self` and returns the results.
265    ///
266    /// The `logic` callback receives a borrow of the decoded datum vector, a timestamp, a
267    /// diff, and two output sessions: one for `ok` updates of type `(D, T, Diff)` and one for
268    /// MFP-style `DataflowErrorSer` updates. It must return the number of records *produced*
269    /// (written to either session), not the number of input tuples consumed.
270    ///
271    /// # Fuel
272    ///
273    /// The operator accumulates the returned counts as fuel and yields when the total reaches
274    /// an internal refuel threshold. The metric is output-produced (not input-consumed) on
275    /// purpose: it regulates two asymmetric pressures.
276    ///
277    /// * **Drain inputs.** The operator holds a clone of each pending `Batch` until its work
278    ///   item pops; we want to release that memory back to the upstream arrangement as soon
279    ///   as possible. A `filter(false)` MFP returns 0 for every tuple, so fuel never trips
280    ///   and the cursor runs to end-of-batch in one activation.
281    /// * **Throttle outputs.** A `map("1KB-string")` MFP produces large records per input;
282    ///   stopping when emit count hits the threshold caps how much data a single activation
283    ///   dumps on the next operator.
284    ///
285    /// The refuel constant is a pragmatic compromise: large enough to be a non-event in
286    /// steady-state, small enough that one activation can't flood downstream. There is no
287    /// universal value across MFP shapes.
288    ///
289    /// If `key` is set, this is a promise that `logic` will produce no results on
290    /// records for which the key does not evaluate to the value. This is used to
291    /// leap directly to exactly those records.
292    ///
293    /// The `max_demand` parameter limits the number of columns decoded from the
294    /// input. Only the first `max_demand` columns are decoded. Pass `usize::MAX` to
295    /// decode all columns.
296    pub fn flat_map<D, DCB, L>(
297        &self,
298        key: Option<&Row>,
299        max_demand: usize,
300        mut logic: L,
301    ) -> (
302        Stream<'scope, T, DCB::Container>,
303        VecCollection<'scope, T, DataflowErrorSer, Diff>,
304    )
305    where
306        D: Data,
307        DCB: ContainerBuilder + PushInto<(D, T, Diff)>,
308        L: for<'a, 'b> FnMut(
309                &'a mut DatumVecBorrow<'b>,
310                T,
311                Diff,
312                &mut Session<T, DCB>,
313                &mut Session<T, ECB<T>>,
314            ) -> usize
315            + 'static,
316    {
317        let mut datums = DatumVec::new();
318        let logic = move |k: DatumSeq,
319                          v: DatumSeq,
320                          t,
321                          d,
322                          ok_session: &mut Session<T, DCB>,
323                          err_session: &mut Session<T, ECB<T>>| {
324            let mut datums_borrow = datums.borrow();
325            datums_borrow.extend(k.to_datum_iter().take(max_demand));
326            let max_demand = max_demand.saturating_sub(datums_borrow.len());
327            datums_borrow.extend(v.to_datum_iter().take(max_demand));
328            logic(&mut datums_borrow, t, d, ok_session, err_session)
329        };
330
331        match &self {
332            ArrangementFlavor::Local(oks, errs) => {
333                let (oks, mfp_errs) = CollectionBundle::<T>::flat_map_core_fallible::<_, _, DCB, _>(
334                    oks.clone(),
335                    key,
336                    logic,
337                    REFUEL,
338                );
339                let errs = errs.clone().as_collection(|k, &()| k.clone());
340                let errs = errs.concat(mfp_errs.as_collection());
341                (oks, errs)
342            }
343            ArrangementFlavor::Trace(_, oks, errs) => {
344                let (oks, mfp_errs) = CollectionBundle::<T>::flat_map_core_fallible::<_, _, DCB, _>(
345                    oks.clone(),
346                    key,
347                    logic,
348                    REFUEL,
349                );
350                let errs = errs.clone().as_collection(|k, &()| k.clone());
351                let errs = errs.concat(mfp_errs.as_collection());
352                (oks, errs)
353            }
354        }
355    }
356
357    /// Ok-only variant of [`Self::flat_map`]. The `logic` callback receives a single output
358    /// session, cannot produce errors, and returns the number of records produced (see
359    /// [`Self::flat_map`] for fuel semantics). The returned err collection comes solely from
360    /// the arrangement; no extra operator is built to carry an empty MFP-error stream.
361    pub fn flat_map_ok<D, DCB, L>(
362        &self,
363        key: Option<&Row>,
364        max_demand: usize,
365        mut logic: L,
366    ) -> (
367        Stream<'scope, T, DCB::Container>,
368        VecCollection<'scope, T, DataflowErrorSer, Diff>,
369    )
370    where
371        D: Data,
372        DCB: ContainerBuilder + PushInto<(D, T, Diff)>,
373        L: for<'a, 'b> FnMut(&'a mut DatumVecBorrow<'b>, T, Diff, &mut Session<T, DCB>) -> usize
374            + 'static,
375    {
376        let mut datums = DatumVec::new();
377        let logic = move |k: DatumSeq, v: DatumSeq, t, d, ok_session: &mut Session<T, DCB>| {
378            let mut datums_borrow = datums.borrow();
379            datums_borrow.extend(k.to_datum_iter().take(max_demand));
380            let max_demand = max_demand.saturating_sub(datums_borrow.len());
381            datums_borrow.extend(v.to_datum_iter().take(max_demand));
382            logic(&mut datums_borrow, t, d, ok_session)
383        };
384
385        match &self {
386            ArrangementFlavor::Local(oks, errs) => {
387                let oks = CollectionBundle::<T>::flat_map_core_ok::<_, _, DCB, _>(
388                    oks.clone(),
389                    key,
390                    logic,
391                    REFUEL,
392                );
393                let errs = errs.clone().as_collection(|k, &()| k.clone());
394                (oks, errs)
395            }
396            ArrangementFlavor::Trace(_, oks, errs) => {
397                let oks = CollectionBundle::<T>::flat_map_core_ok::<_, _, DCB, _>(
398                    oks.clone(),
399                    key,
400                    logic,
401                    REFUEL,
402                );
403                let errs = errs.clone().as_collection(|k, &()| k.clone());
404                (oks, errs)
405            }
406        }
407    }
408}
409impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> {
410    /// The scope containing the collection bundle.
411    pub fn scope(&self) -> Scope<'scope, T> {
412        match self {
413            ArrangementFlavor::Local(oks, _errs) => oks.stream.scope(),
414            ArrangementFlavor::Trace(_gid, oks, _errs) => oks.stream.scope(),
415        }
416    }
417
418    /// Brings the arrangement flavor into a region.
419    pub fn enter_region<'a>(&self, region: Scope<'a, T>) -> ArrangementFlavor<'a, T> {
420        match self {
421            ArrangementFlavor::Local(oks, errs) => ArrangementFlavor::Local(
422                oks.clone().enter_region(region),
423                errs.clone().enter_region(region),
424            ),
425            ArrangementFlavor::Trace(gid, oks, errs) => ArrangementFlavor::Trace(
426                *gid,
427                oks.clone().enter_region(region),
428                errs.clone().enter_region(region),
429            ),
430        }
431    }
432}
433impl<'scope, T: RenderTimestamp> ArrangementFlavor<'scope, T> {
434    /// Extracts the arrangement flavor from a region.
435    pub fn leave_region<'outer>(&self, outer: Scope<'outer, T>) -> ArrangementFlavor<'outer, T> {
436        match self {
437            ArrangementFlavor::Local(oks, errs) => ArrangementFlavor::Local(
438                oks.clone().leave_region(outer),
439                errs.clone().leave_region(outer),
440            ),
441            ArrangementFlavor::Trace(gid, oks, errs) => ArrangementFlavor::Trace(
442                *gid,
443                oks.clone().leave_region(outer),
444                errs.clone().leave_region(outer),
445            ),
446        }
447    }
448}
449
450/// A bundle of the various ways a collection can be represented.
451///
452/// This type maintains the invariant that it does contain at least one valid
453/// source of data, either a collection or at least one arrangement.
454#[derive(Clone)]
455pub struct CollectionBundle<'scope, T: RenderTimestamp> {
456    pub collection: Option<(
457        VecCollection<'scope, T, Row, Diff>,
458        VecCollection<'scope, T, DataflowErrorSer, Diff>,
459    )>,
460    pub arranged: BTreeMap<Vec<MirScalarExpr>, ArrangementFlavor<'scope, T>>,
461}
462
463impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
464    /// Construct a new collection bundle from update streams.
465    pub fn from_collections(
466        oks: VecCollection<'scope, T, Row, Diff>,
467        errs: VecCollection<'scope, T, DataflowErrorSer, Diff>,
468    ) -> Self {
469        Self {
470            collection: Some((oks, errs)),
471            arranged: BTreeMap::default(),
472        }
473    }
474
475    /// Inserts arrangements by the expressions on which they are keyed.
476    pub fn from_expressions(
477        exprs: Vec<MirScalarExpr>,
478        arrangements: ArrangementFlavor<'scope, T>,
479    ) -> Self {
480        let mut arranged = BTreeMap::new();
481        arranged.insert(exprs, arrangements);
482        Self {
483            collection: None,
484            arranged,
485        }
486    }
487
488    /// Inserts arrangements by the columns on which they are keyed.
489    pub fn from_columns<I: IntoIterator<Item = usize>>(
490        columns: I,
491        arrangements: ArrangementFlavor<'scope, T>,
492    ) -> Self {
493        let mut keys = Vec::new();
494        for column in columns {
495            keys.push(MirScalarExpr::column(column));
496        }
497        Self::from_expressions(keys, arrangements)
498    }
499
500    /// The scope containing the collection bundle.
501    pub fn scope(&self) -> Scope<'scope, T> {
502        if let Some((oks, _errs)) = &self.collection {
503            oks.inner.scope()
504        } else {
505            self.arranged
506                .values()
507                .next()
508                .expect("Must contain a valid collection")
509                .scope()
510        }
511    }
512
513    /// Brings the collection bundle into a region.
514    pub fn enter_region<'inner>(&self, region: Scope<'inner, T>) -> CollectionBundle<'inner, T> {
515        CollectionBundle {
516            collection: self.collection.as_ref().map(|(oks, errs)| {
517                (
518                    oks.clone().enter_region(region),
519                    errs.clone().enter_region(region),
520                )
521            }),
522            arranged: self
523                .arranged
524                .iter()
525                .map(|(key, bundle)| (key.clone(), bundle.enter_region(region)))
526                .collect(),
527        }
528    }
529}
530
531impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
532    /// Extracts the collection bundle from a region.
533    pub fn leave_region<'outer>(&self, outer: Scope<'outer, T>) -> CollectionBundle<'outer, T> {
534        CollectionBundle {
535            collection: self.collection.as_ref().map(|(oks, errs)| {
536                (
537                    oks.clone().leave_region(outer),
538                    errs.clone().leave_region(outer),
539                )
540            }),
541            arranged: self
542                .arranged
543                .iter()
544                .map(|(key, bundle)| (key.clone(), bundle.leave_region(outer)))
545                .collect(),
546        }
547    }
548}
549
550impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
551    /// Asserts that the arrangement for a specific key
552    /// (or the raw collection for no key) exists,
553    /// and returns the corresponding collection.
554    ///
555    /// This returns the collection as-is, without
556    /// doing any unthinning transformation.
557    /// Therefore, it should be used when the appropriate transformation
558    /// was planned as part of a following MFP.
559    ///
560    /// If `key` is specified, the function converts the arrangement to a collection. It uses either
561    /// the fueled `flat_map` or `as_collection` method, depending on the flag
562    /// [`ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION`].
563    pub fn as_specific_collection(
564        &self,
565        key: Option<&[MirScalarExpr]>,
566        config_set: &ConfigSet,
567    ) -> (
568        VecCollection<'scope, T, Row, Diff>,
569        VecCollection<'scope, T, DataflowErrorSer, Diff>,
570    ) {
571        // Any operator that uses this method was told to use a particular
572        // collection during LIR planning, where we should have made
573        // sure that that collection exists.
574        //
575        // If it doesn't, we panic.
576        match key {
577            None => self
578                .collection
579                .clone()
580                .expect("The unarranged collection doesn't exist."),
581            Some(key) => {
582                let arranged = self.arranged.get(key).unwrap_or_else(|| {
583                    panic!("The collection arranged by {:?} doesn't exist.", key)
584                });
585                if ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION.get(config_set) {
586                    // Decode all columns, pass max_demand as usize::MAX. Output is 1:1 from the
587                    // cursor (no duplicates), so a non-consolidating container builder is the
588                    // right choice.
589                    let (ok, err) = arranged
590                        .flat_map_ok::<_, CapacityContainerBuilder<Vec<(Row, T, Diff)>>, _>(
591                            None,
592                            usize::MAX,
593                            |borrow, t, r, ok_session| {
594                                ok_session.give((SharedRow::pack(borrow.iter()), t, r));
595                                1
596                            },
597                        );
598                    (ok.as_collection(), err)
599                } else {
600                    #[allow(deprecated)]
601                    arranged.as_collection()
602                }
603            }
604        }
605    }
606
607    /// Constructs and applies logic to elements of a collection and returns the results.
608    ///
609    /// The function applies `logic` on elements. The logic conceptually receives
610    /// `(&Row, &Row)` pairs in the form of a datum vec in the expected order.
611    ///
612    /// If `key_val` is set, this is a promise that `logic` will produce no results on
613    /// records for which the key does not evaluate to the value. This is used when we
614    /// have an arrangement by that key to leap directly to exactly those records.
615    /// It is important that `logic` still guard against data that does not satisfy
616    /// this constraint, as this method does not statically know that it will have
617    /// that arrangement.
618    ///
619    /// The `max_demand` parameter limits the number of columns decoded from the
620    /// input. Only the first `max_demand` columns are decoded. Pass `usize::MAX` to
621    /// decode all columns.
622    pub fn flat_map<D, DCB, L>(
623        &self,
624        key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
625        max_demand: usize,
626        mut logic: L,
627    ) -> (
628        Stream<'scope, T, DCB::Container>,
629        VecCollection<'scope, T, DataflowErrorSer, Diff>,
630    )
631    where
632        D: Data,
633        DCB: ContainerBuilder + PushInto<(D, T, Diff)>,
634        L: for<'a> FnMut(
635                &'a mut DatumVecBorrow<'_>,
636                T,
637                Diff,
638                &mut Session<T, DCB>,
639                &mut Session<T, ECB<T>>,
640            ) -> usize
641            + 'static,
642    {
643        // If `key_val` is set, we should have to use the corresponding arrangement.
644        // If there isn't one, that implies an error in the contract between
645        // key-production and available arrangements.
646        if let Some((key, val)) = key_val {
647            self.arrangement(&key)
648                .expect("Should have ensured during planning that this arrangement exists.")
649                .flat_map::<_, DCB, _>(val.as_ref(), max_demand, logic)
650        } else {
651            let (oks, errs) = self
652                .collection
653                .clone()
654                .expect("Invariant violated: CollectionBundle contains no collection.");
655            let scope = oks.inner.scope();
656            let mut builder = OperatorBuilder::new("CollectionFlatMap".to_string(), scope);
657            let (ok_output, ok_stream) = builder.new_output();
658            let mut ok_output = OutputBuilder::<_, DCB>::from(ok_output);
659            let (err_output, err_stream) = builder.new_output();
660            let mut err_output = OutputBuilder::<_, ECB<T>>::from(err_output);
661            let mut input = builder.new_input(oks.inner, Pipeline);
662            builder.build(move |_capabilities| {
663                let mut datums = DatumVec::new();
664                move |_frontiers| {
665                    let mut ok_output = ok_output.activate();
666                    let mut err_output = err_output.activate();
667                    input.for_each(|time, data| {
668                        // Retain the input capability to derive a `Capability` for each output;
669                        // the `Session` type alias is fixed to `Capability<T>`.
670                        let ok_cap = time.retain(0);
671                        let err_cap = time.retain(1);
672                        let mut ok_session = ok_output.session_with_builder(&ok_cap);
673                        let mut err_session = err_output.session_with_builder(&err_cap);
674                        for (v, t, d) in data.drain(..) {
675                            logic(
676                                &mut datums.borrow_with_limit(&v, max_demand),
677                                t,
678                                d,
679                                &mut ok_session,
680                                &mut err_session,
681                            );
682                        }
683                    });
684                }
685            });
686            let errs = errs.concat(err_stream.as_collection());
687            (ok_stream, errs)
688        }
689    }
690
691    /// Factored out common logic for using literal keys in general traces.
692    ///
693    /// This logic is sufficiently interesting that we want to write it only
694    /// once, and thereby avoid any skew in the two uses of the logic.
695    ///
696    /// The function presents the contents of the trace as `(key, value, time, delta)` tuples,
697    /// where key and value are potentially specialized, but convertible into rows. The `logic`
698    /// callback writes ok results into the first session and errors into the second, returning
699    /// the number of records produced. See [`ArrangementFlavor::flat_map`] for the fuel
700    /// rationale.
701    fn flat_map_core_fallible<Tr, D, DCB, L>(
702        trace: Arranged<'scope, Tr>,
703        key: Option<&<Tr::KeyContainer as BatchContainer>::Owned>,
704        mut logic: L,
705        refuel: usize,
706    ) -> (
707        Stream<'scope, T, DCB::Container>,
708        Stream<'scope, T, Vec<(DataflowErrorSer, T, Diff)>>,
709    )
710    where
711        Tr: for<'a> TraceReader<
712                Key<'a>: ToDatumIter,
713                Val<'a>: ToDatumIter,
714                Time = T,
715                Diff = mz_repr::Diff,
716            > + Clone
717            + 'static,
718        <Tr::KeyContainer as BatchContainer>::Owned: PartialEq,
719        D: Data,
720        DCB: ContainerBuilder + PushInto<(D, T, Diff)>,
721        L: FnMut(
722                Tr::Key<'_>,
723                Tr::Val<'_>,
724                T,
725                mz_repr::Diff,
726                &mut Session<T, DCB>,
727                &mut Session<T, ECB<T>>,
728            ) -> usize
729            + 'static,
730    {
731        let scope = trace.stream.scope();
732
733        let mut key_con = Tr::KeyContainer::with_capacity(1);
734        if let Some(key) = &key {
735            key_con.push_own(key);
736        }
737        let mode = if key.is_some() { "index" } else { "scan" };
738        let name = format!("ArrangementFlatMap({})", mode);
739
740        let mut builder = OperatorBuilder::new(name, scope.clone());
741        let (ok_output, ok_stream) = builder.new_output();
742        let mut ok_output = OutputBuilder::<_, DCB>::from(ok_output);
743        let (err_output, err_stream) = builder.new_output();
744        let mut err_output = OutputBuilder::<_, ECB<T>>::from(err_output);
745        let mut input = builder.new_input(trace.stream.clone(), Pipeline);
746        let operator_info = builder.operator_info();
747
748        builder.build(move |_capabilities| {
749            // Acquire an activator to reschedule the operator when it has unfinished work.
750            let activator = scope.activator_for(operator_info.address);
751            // Maintain a list of work to do, cursor to navigate and process.
752            let mut todo = std::collections::VecDeque::new();
753            move |_frontiers| {
754                let key = key_con.get(0);
755                let mut ok_output = ok_output.activate();
756                let mut err_output = err_output.activate();
757
758                // First, dequeue all batches.
759                input.for_each(|time, data| {
760                    // Retain a capability for each output, as the work may complete across
761                    // multiple activations.
762                    let ok_cap = time.retain(0);
763                    let err_cap = time.retain(1);
764                    for batch in data.iter() {
765                        todo.push_back(PendingWork::new(
766                            ok_cap.clone(),
767                            err_cap.clone(),
768                            batch.cursor(),
769                            batch.clone(),
770                        ));
771                    }
772                });
773
774                // Second, make progress on `todo`.
775                let mut fuel = refuel;
776                while !todo.is_empty() && fuel > 0 {
777                    todo.front_mut().unwrap().do_work(
778                        key.as_ref(),
779                        &mut logic,
780                        &mut fuel,
781                        &mut ok_output,
782                        &mut err_output,
783                    );
784                    if fuel > 0 {
785                        todo.pop_front();
786                    }
787                }
788                // If we have not finished all work, re-activate the operator.
789                if !todo.is_empty() {
790                    activator.activate();
791                }
792            }
793        });
794
795        (ok_stream, err_stream)
796    }
797
798    /// Ok-only variant of [`Self::flat_map_core_fallible`]. The `logic` callback writes results
799    /// into a single output session and returns the number of records produced (see the
800    /// fallible variant for fuel semantics). Use this when the caller statically knows it
801    /// will never produce `DataflowErrorSer` records, to avoid building a second output port
802    /// and the empty err stream that would follow it.
803    fn flat_map_core_ok<Tr, D, DCB, L>(
804        trace: Arranged<'scope, Tr>,
805        key: Option<&<Tr::KeyContainer as BatchContainer>::Owned>,
806        mut logic: L,
807        refuel: usize,
808    ) -> Stream<'scope, T, DCB::Container>
809    where
810        Tr: for<'a> TraceReader<
811                Key<'a>: ToDatumIter,
812                Val<'a>: ToDatumIter,
813                Time = T,
814                Diff = mz_repr::Diff,
815            > + Clone
816            + 'static,
817        <Tr::KeyContainer as BatchContainer>::Owned: PartialEq,
818        D: Data,
819        DCB: ContainerBuilder + PushInto<(D, T, Diff)>,
820        L: FnMut(Tr::Key<'_>, Tr::Val<'_>, T, mz_repr::Diff, &mut Session<T, DCB>) -> usize
821            + 'static,
822    {
823        let scope = trace.stream.scope();
824
825        let mut key_con = Tr::KeyContainer::with_capacity(1);
826        if let Some(key) = &key {
827            key_con.push_own(key);
828        }
829        let mode = if key.is_some() { "index" } else { "scan" };
830        let name = format!("ArrangementFlatMapOk({})", mode);
831
832        let mut builder = OperatorBuilder::new(name, scope.clone());
833        let (ok_output, ok_stream) = builder.new_output();
834        let mut ok_output = OutputBuilder::<_, DCB>::from(ok_output);
835        let mut input = builder.new_input(trace.stream.clone(), Pipeline);
836        let operator_info = builder.operator_info();
837
838        builder.build(move |_capabilities| {
839            let activator = scope.activator_for(operator_info.address);
840            let mut todo = std::collections::VecDeque::new();
841            move |_frontiers| {
842                let key = key_con.get(0);
843                let mut ok_output = ok_output.activate();
844
845                input.for_each(|time, data| {
846                    let cap = time.retain(0);
847                    for batch in data.iter() {
848                        todo.push_back(PendingWorkOk::new(
849                            cap.clone(),
850                            batch.cursor(),
851                            batch.clone(),
852                        ));
853                    }
854                });
855
856                let mut fuel = refuel;
857                while !todo.is_empty() && fuel > 0 {
858                    todo.front_mut().unwrap().do_work(
859                        key.as_ref(),
860                        &mut logic,
861                        &mut fuel,
862                        &mut ok_output,
863                    );
864                    if fuel > 0 {
865                        todo.pop_front();
866                    }
867                }
868                if !todo.is_empty() {
869                    activator.activate();
870                }
871            }
872        });
873
874        ok_stream
875    }
876
877    /// Look up an arrangement by the expressions that form the key.
878    ///
879    /// The result may be `None` if no such arrangement exists, or it may be one of many
880    /// "arrangement flavors" that represent the types of arranged data we might have.
881    pub fn arrangement(&self, key: &[MirScalarExpr]) -> Option<ArrangementFlavor<'scope, T>> {
882        self.arranged.get(key).map(|x| x.clone())
883    }
884}
885
886impl<'scope, T: RenderTimestamp> CollectionBundle<'scope, T> {
887    /// Presents `self` as a stream of updates, having been subjected to `mfp`.
888    ///
889    /// This operator is able to apply the logic of `mfp` early, which can substantially
890    /// reduce the amount of data produced when `mfp` is non-trivial.
891    ///
892    /// The `key_val` argument, when present, indicates that a specific arrangement should
893    /// be used, and if, in addition, the `val` component is present,
894    /// that we can seek to the supplied row.
895    pub fn as_collection_core(
896        &self,
897        mut mfp: MapFilterProject,
898        key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
899        until: Antichain<mz_repr::Timestamp>,
900        config_set: &ConfigSet,
901    ) -> (
902        VecCollection<'scope, T, mz_repr::Row, Diff>,
903        VecCollection<'scope, T, DataflowErrorSer, Diff>,
904    ) {
905        mfp.optimize();
906        let mfp_plan = mfp.clone().into_plan().unwrap();
907
908        // If the MFP is trivial, we can just call `as_collection`.
909        // In the case that we weren't going to apply the `key_val` optimization,
910        // this path results in a slightly smaller and faster
911        // dataflow graph, and is intended to fix
912        // https://github.com/MaterializeInc/database-issues/issues/3111
913        let has_key_val = if let Some((_key, Some(_val))) = &key_val {
914            true
915        } else {
916            false
917        };
918
919        if mfp_plan.is_identity() && !has_key_val {
920            let key = key_val.map(|(k, _v)| k);
921            return self.as_specific_collection(key.as_deref(), config_set);
922        }
923
924        let max_demand = mfp.demand().iter().max().map(|x| *x + 1).unwrap_or(0);
925        mfp.permute_fn(|c| c, max_demand);
926        mfp.optimize();
927        let mfp_plan = mfp.into_plan().unwrap();
928
929        let mut datum_vec = DatumVec::new();
930        // Wrap in an `Rc` so that lifetimes work out.
931        let until = std::rc::Rc::new(until);
932
933        let (stream, errors) = self
934            .flat_map::<_, ConsolidatingContainerBuilder<Vec<(Row, T, Diff)>>, _>(
935                key_val,
936                max_demand,
937                move |row_datums, time, diff, ok_session, err_session| {
938                    let mut row_builder = SharedRow::get();
939                    let until = std::rc::Rc::clone(&until);
940                    let temp_storage = RowArena::new();
941                    let row_iter = row_datums.iter();
942                    let mut datums_local = datum_vec.borrow();
943                    datums_local.extend(row_iter);
944                    let event_time = time.event_time();
945                    let mut work: usize = 0;
946                    for result in mfp_plan.evaluate(
947                        &mut datums_local,
948                        &temp_storage,
949                        event_time,
950                        diff.clone(),
951                        move |time| !until.less_equal(time),
952                        &mut row_builder,
953                    ) {
954                        work += 1;
955                        match result {
956                            Ok((row, event_time, diff)) => {
957                                // Copy the whole time, and re-populate event time.
958                                let mut time: T = time.clone();
959                                *time.event_time_mut() = event_time;
960                                ok_session.give((row, time, diff));
961                            }
962                            Err((e, event_time, diff)) => {
963                                // Copy the whole time, and re-populate event time.
964                                let mut time: T = time.clone();
965                                *time.event_time_mut() = event_time;
966                                err_session.give((e, time, diff));
967                            }
968                        }
969                    }
970                    work
971                },
972            );
973
974        (stream.as_collection(), errors)
975    }
976    pub fn ensure_collections(
977        mut self,
978        collections: AvailableCollections,
979        input_key: Option<Vec<MirScalarExpr>>,
980        input_mfp: MapFilterProject,
981        as_of: Antichain<mz_repr::Timestamp>,
982        until: Antichain<mz_repr::Timestamp>,
983        config_set: &ConfigSet,
984        strategy: ArrangementStrategy,
985    ) -> Self
986    where
987        T: MaybeBucketByTime,
988    {
989        if collections == Default::default() {
990            return self;
991        }
992        // Cache collection to avoid reforming it each time.
993        //
994        // TODO(mcsherry): In theory this could be faster run out of another arrangement,
995        // as the `map_fallible` that follows could be run against an arrangement itself.
996        //
997        // Note(btv): If we ever do that, we would then only need to make the raw collection here
998        // if `collections.raw` is true.
999
1000        for (key, _, _) in collections.arranged.iter() {
1001            soft_assert_or_log!(
1002                !self.arranged.contains_key(key),
1003                "LIR ArrangeBy tried to create an existing arrangement"
1004            );
1005        }
1006
1007        // Track whether we applied temporal bucketing, to avoid double-bucketing.
1008        let mut bucketed = false;
1009
1010        // True iff at least one new arrangement will actually be built below. Bucketing only
1011        // pays off when something downstream merges/compacts the future-stamped updates; on a
1012        // pure raw collection (no new arrangement) the work is wasted.
1013        let will_create_arrangement = collections
1014            .arranged
1015            .iter()
1016            .any(|(key, _, _)| !self.arranged.contains_key(key));
1017
1018        // We need the collection if either (1) it is explicitly demanded, or (2) we are going to render any arrangement
1019        let form_raw_collection = collections.raw || will_create_arrangement;
1020        if form_raw_collection && self.collection.is_none() {
1021            let (oks, errs) =
1022                self.as_collection_core(input_mfp, input_key.map(|k| (k, None)), until, config_set);
1023            // Apply temporal bucketing when the lowering selected `TemporalBucketing` and
1024            // we will build at least one arrangement. This path fires when the collection
1025            // must be formed from scratch (e.g., from an arrangement via as_collection_core).
1026            let oks = if will_create_arrangement
1027                && matches!(strategy, ArrangementStrategy::TemporalBucketing)
1028                && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(config_set)
1029            {
1030                let summary: mz_repr::Timestamp = TEMPORAL_BUCKETING_SUMMARY
1031                    .get(config_set)
1032                    .try_into()
1033                    .expect("must fit");
1034                bucketed = true;
1035                T::maybe_apply_temporal_bucketing(oks.inner, as_of.clone(), summary)
1036            } else {
1037                oks
1038            };
1039            self.collection = Some((oks, errs));
1040        }
1041        for (key, _, thinning) in collections.arranged {
1042            if !self.arranged.contains_key(&key) {
1043                // TODO: Consider allowing more expressive names.
1044                let name = format!("ArrangeBy[{:?}]", key);
1045
1046                let (oks, errs) = self
1047                    .collection
1048                    .take()
1049                    .expect("Collection constructed above");
1050                // Apply temporal bucketing if the collection already existed on
1051                // the bundle (e.g., from an upstream temporal Mfp or Get) and we
1052                // haven't bucketed yet. This is the common path for temporal-MFP
1053                // → ArrangeBy flows.
1054                let oks = if !bucketed
1055                    && matches!(strategy, ArrangementStrategy::TemporalBucketing)
1056                    && ENABLE_COMPUTE_TEMPORAL_BUCKETING.get(config_set)
1057                {
1058                    let summary: mz_repr::Timestamp = TEMPORAL_BUCKETING_SUMMARY
1059                        .get(config_set)
1060                        .try_into()
1061                        .expect("must fit");
1062                    bucketed = true;
1063                    T::maybe_apply_temporal_bucketing(oks.inner, as_of.clone(), summary)
1064                } else {
1065                    oks
1066                };
1067                let (oks, errs_keyed, passthrough) =
1068                    Self::arrange_collection(&name, oks, key.clone(), thinning.clone());
1069                let errs_concat: KeyCollection<_, _, _> = errs.clone().concat(errs_keyed).into();
1070                self.collection = Some((passthrough, errs));
1071                let errs =
1072                    errs_concat.mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, ErrSpine<_, _>>(
1073                        &format!("{}-errors", name),
1074                    );
1075                self.arranged
1076                    .insert(key, ArrangementFlavor::Local(oks, errs));
1077            }
1078        }
1079        self
1080    }
1081
1082    /// Builds an arrangement from a collection, using the specified key and value thinning.
1083    ///
1084    /// The arrangement's key is based on the `key` expressions, and the value the input with
1085    /// the `thinning` applied to it. It selects which of the input columns are included in the
1086    /// value of the arrangement. The thinning is in support of permuting arrangements such that
1087    /// columns in the key are not included in the value.
1088    ///
1089    /// In addition to the ok and err streams, we produce a passthrough stream that forwards
1090    /// the input as-is, which allows downstream consumers to reuse the collection without
1091    /// teeing the stream.
1092    fn arrange_collection(
1093        name: &String,
1094        oks: VecCollection<'scope, T, Row, Diff>,
1095        key: Vec<MirScalarExpr>,
1096        thinning: Vec<usize>,
1097    ) -> (
1098        Arranged<'scope, RowRowAgent<T, Diff>>,
1099        VecCollection<'scope, T, DataflowErrorSer, Diff>,
1100        VecCollection<'scope, T, Row, Diff>,
1101    ) {
1102        // This operator implements a `map_fallible`, but produces columnar updates for the ok
1103        // stream. The `map_fallible` cannot be used here because the closure cannot return
1104        // references, which is what we need to push into columnar streams. Instead, we use a
1105        // bespoke operator that also optimizes reuse of allocations across individual updates.
1106        let mut builder = OperatorBuilder::new("FormArrangementKey".to_string(), oks.inner.scope());
1107        let (ok_output, ok_stream) = builder.new_output();
1108        let mut ok_output =
1109            OutputBuilder::<_, ColumnBuilder<((Row, Row), T, Diff)>>::from(ok_output);
1110        let (err_output, err_stream) = builder.new_output();
1111        let mut err_output = OutputBuilder::from(err_output);
1112        let (passthrough_output, passthrough_stream) = builder.new_output();
1113        let mut passthrough_output = OutputBuilder::from(passthrough_output);
1114        let mut input = builder.new_input(oks.inner, Pipeline);
1115        builder.set_notify_for(0, FrontierInterest::Never);
1116        builder.build(move |_capabilities| {
1117            let mut key_buf = Row::default();
1118            let mut val_buf = Row::default();
1119            let mut datums = DatumVec::new();
1120            let mut temp_storage = RowArena::new();
1121            move |_frontiers| {
1122                let mut ok_output = ok_output.activate();
1123                let mut err_output = err_output.activate();
1124                let mut passthrough_output = passthrough_output.activate();
1125                input.for_each(|time, data| {
1126                    let mut ok_session = ok_output.session_with_builder(&time);
1127                    let mut err_session = err_output.session(&time);
1128                    for (row, time, diff) in data.iter() {
1129                        temp_storage.clear();
1130                        let datums = datums.borrow_with(row);
1131                        let key_iter = key.iter().map(|k| k.eval(&datums, &temp_storage));
1132                        match key_buf.packer().try_extend(key_iter) {
1133                            Ok(()) => {
1134                                let val_datum_iter = thinning.iter().map(|c| datums[*c]);
1135                                val_buf.packer().extend(val_datum_iter);
1136                                ok_session.give(((&*key_buf, &*val_buf), time, diff));
1137                            }
1138                            Err(e) => {
1139                                err_session.give((e.into(), time.clone(), *diff));
1140                            }
1141                        }
1142                    }
1143                    passthrough_output.session(&time).give_container(data);
1144                });
1145            }
1146        });
1147
1148        let oks = ok_stream
1149            .mz_arrange_core::<
1150                _,
1151                Col2ValBatcher<_, _, _, _>,
1152                RowRowBuilder<_, _>,
1153                RowRowSpine<_, _>,
1154            >(
1155                ExchangeCore::<ColumnBuilder<_>, _>::new_core(
1156                    columnar_exchange::<Row, Row, T, Diff>,
1157                ),
1158                name
1159            );
1160        (
1161            oks,
1162            err_stream.as_collection(),
1163            passthrough_stream.as_collection(),
1164        )
1165    }
1166}
1167
1168/// Type alias for a timely output `Session` whose capability is a `Capability<T>`. The container
1169/// builder `CB` is left to the caller; sessions can therefore drive consolidating, capacity, or
1170/// (in the future) columnar output builders without changing call sites.
1171type Session<'a, 'b, T, CB> =
1172    timely::dataflow::operators::generic::Session<'a, 'b, T, CB, Capability<T>>;
1173
1174/// Container builder used for the err output of every flat_map variant. Pre-refactor the
1175/// merged Ok/Err stream flowed through a [`ConsolidatingContainerBuilder`] before the
1176/// `map_fallible` demux split it; we preserve that consolidation here so errors with the
1177/// same `(error, time)` cancel within a batch rather than propagating to downstream.
1178type ECB<T> = ConsolidatingContainerBuilder<Vec<(DataflowErrorSer, T, Diff)>>;
1179
1180/// Number of output records the arrangement flat_map operators may produce before yielding.
1181/// See [`ArrangementFlavor::flat_map`] for the fuel rationale; the constant is a pragmatic
1182/// compromise and not tuned empirically.
1183const REFUEL: usize = 1_000_000;
1184
1185struct PendingWork<C>
1186where
1187    C: Cursor,
1188{
1189    /// Capability for the `ok` output (output port 0).
1190    ok_capability: Capability<C::Time>,
1191    /// Capability for the `err` output (output port 1).
1192    err_capability: Capability<C::Time>,
1193    cursor: C,
1194    batch: C::Storage,
1195}
1196
1197impl<C> PendingWork<C>
1198where
1199    C: Cursor<KeyContainer: BatchContainer<Owned: PartialEq + Sized>>,
1200{
1201    /// Create a new bundle of pending work, from a pair of capabilities (one per output),
1202    /// a cursor, and backing storage.
1203    fn new(
1204        ok_capability: Capability<C::Time>,
1205        err_capability: Capability<C::Time>,
1206        cursor: C,
1207        batch: C::Storage,
1208    ) -> Self {
1209        Self {
1210            ok_capability,
1211            err_capability,
1212            cursor,
1213            batch,
1214        }
1215    }
1216    /// Perform roughly `fuel` work through the cursor, applying `logic` and sending results to
1217    /// the two output sessions.
1218    fn do_work<D, DCB, L>(
1219        &mut self,
1220        key: Option<&C::Key<'_>>,
1221        logic: &mut L,
1222        fuel: &mut usize,
1223        ok_output: &mut OutputBuilderSession<'_, C::Time, DCB>,
1224        err_output: &mut OutputBuilderSession<'_, C::Time, ECB<C::Time>>,
1225    ) where
1226        D: Data,
1227        DCB: ContainerBuilder + PushInto<(D, C::Time, C::Diff)>,
1228        L: FnMut(
1229                C::Key<'_>,
1230                C::Val<'_>,
1231                C::Time,
1232                C::Diff,
1233                &mut Session<C::Time, DCB>,
1234                &mut Session<C::Time, ECB<C::Time>>,
1235            ) -> usize
1236            + 'static,
1237    {
1238        let mut ok_session = ok_output.session_with_builder(&self.ok_capability);
1239        let mut err_session = err_output.session_with_builder(&self.err_capability);
1240        walk_cursor(&mut self.cursor, &self.batch, key, fuel, |k, v, t, d| {
1241            logic(k, v, t, d, &mut ok_session, &mut err_session)
1242        });
1243    }
1244}
1245
1246/// Pending work for the Ok-only variant of `flat_map_core_fallible`. Holds a single capability since
1247/// the operator has only one output port.
1248struct PendingWorkOk<C>
1249where
1250    C: Cursor,
1251{
1252    capability: Capability<C::Time>,
1253    cursor: C,
1254    batch: C::Storage,
1255}
1256
1257impl<C> PendingWorkOk<C>
1258where
1259    C: Cursor<KeyContainer: BatchContainer<Owned: PartialEq + Sized>>,
1260{
1261    fn new(capability: Capability<C::Time>, cursor: C, batch: C::Storage) -> Self {
1262        Self {
1263            capability,
1264            cursor,
1265            batch,
1266        }
1267    }
1268
1269    /// Perform roughly `fuel` work through the cursor, applying `logic` and sending results to
1270    /// the single output session.
1271    fn do_work<D, DCB, L>(
1272        &mut self,
1273        key: Option<&C::Key<'_>>,
1274        logic: &mut L,
1275        fuel: &mut usize,
1276        ok_output: &mut OutputBuilderSession<'_, C::Time, DCB>,
1277    ) where
1278        D: Data,
1279        DCB: ContainerBuilder + PushInto<(D, C::Time, C::Diff)>,
1280        L: FnMut(C::Key<'_>, C::Val<'_>, C::Time, C::Diff, &mut Session<C::Time, DCB>) -> usize
1281            + 'static,
1282    {
1283        let mut ok_session = ok_output.session_with_builder(&self.capability);
1284        walk_cursor(&mut self.cursor, &self.batch, key, fuel, |k, v, t, d| {
1285            logic(k, v, t, d, &mut ok_session)
1286        });
1287    }
1288}
1289
1290/// Walk a cursor, calling `emit` for each consolidated `(key, val, time, diff)` tuple. If
1291/// `key` is set, the cursor is seeked to it and only values for that key are produced.
1292///
1293/// `emit` returns the number of records it produced for the given input tuple. The cursor
1294/// stops as soon as the accumulated emit count reaches `*fuel`, leaving the cursor in place
1295/// so work can resume on a later call. Within a batch, both the inner val loop and the
1296/// outer key loop are bounded only by emit count, so selective filters (`emit` returns 0)
1297/// run to batch completion in a single activation — see [`ArrangementFlavor::flat_map`]
1298/// for why fuel counts output rather than input.
1299fn walk_cursor<C, F>(
1300    cursor: &mut C,
1301    batch: &C::Storage,
1302    key: Option<&C::Key<'_>>,
1303    fuel: &mut usize,
1304    mut emit: F,
1305) where
1306    C: Cursor<KeyContainer: BatchContainer<Owned: PartialEq + Sized>>,
1307    F: FnMut(C::Key<'_>, C::Val<'_>, C::Time, C::Diff) -> usize,
1308{
1309    use differential_dataflow::consolidation::consolidate;
1310
1311    let mut work: usize = 0;
1312    let mut buffer = Vec::new();
1313    if let Some(key) = key {
1314        let key = C::KeyContainer::reborrow(*key);
1315        if cursor.get_key(batch).map(|k| k == key) != Some(true) {
1316            cursor.seek_key(batch, key);
1317        }
1318        if cursor.get_key(batch).map(|k| k == key) == Some(true) {
1319            let key = cursor.key(batch);
1320            while let Some(val) = cursor.get_val(batch) {
1321                cursor.map_times(batch, |time, diff| {
1322                    buffer.push((C::owned_time(time), C::owned_diff(diff)));
1323                });
1324                consolidate(&mut buffer);
1325                for (time, diff) in buffer.drain(..) {
1326                    work += emit(key, val, time, diff);
1327                }
1328                cursor.step_val(batch);
1329                if work >= *fuel {
1330                    *fuel = 0;
1331                    return;
1332                }
1333            }
1334        }
1335    } else {
1336        while let Some(key) = cursor.get_key(batch) {
1337            while let Some(val) = cursor.get_val(batch) {
1338                cursor.map_times(batch, |time, diff| {
1339                    buffer.push((C::owned_time(time), C::owned_diff(diff)));
1340                });
1341                consolidate(&mut buffer);
1342                for (time, diff) in buffer.drain(..) {
1343                    work += emit(key, val, time, diff);
1344                }
1345                cursor.step_val(batch);
1346                if work >= *fuel {
1347                    *fuel = 0;
1348                    return;
1349                }
1350            }
1351            cursor.step_key(batch);
1352        }
1353    }
1354    *fuel -= work;
1355}