Skip to main content

mz_compute/render/join/
delta_join.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Delta join execution dataflow construction.
11//!
12//! Consult [DeltaJoinPlan] documentation for details.
13
14#![allow(clippy::op_ref)]
15
16use std::collections::{BTreeMap, BTreeSet};
17
18use std::rc::Rc;
19
20use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
21use differential_dataflow::operators::arrange::Arranged;
22use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
23use differential_dataflow::{AsCollection, VecCollection};
24use mz_compute_types::dyncfgs::ENABLE_HALF_JOIN2;
25use mz_compute_types::plan::join::JoinClosure;
26use mz_compute_types::plan::join::delta_join::{DeltaJoinPlan, DeltaPathPlan, DeltaStagePlan};
27use mz_dyncfg::ConfigSet;
28use mz_expr::MirScalarExpr;
29use mz_repr::fixed_length::ToDatumIter;
30use mz_repr::{DatumVec, Diff, Row, RowArena, SharedRow};
31use mz_storage_types::errors::DataflowError;
32use mz_timely_util::operator::{CollectionExt, StreamExt};
33use timely::container::CapacityContainerBuilder;
34use timely::dataflow::Scope;
35use timely::dataflow::channels::pact::Pipeline;
36use timely::dataflow::operators::OkErr;
37use timely::dataflow::operators::generic::Session;
38use timely::dataflow::operators::vec::Map;
39use timely::progress::Antichain;
40use timely::progress::timestamp::Refines;
41
42use crate::render::RenderTimestamp;
43use crate::render::context::{ArrangementFlavor, CollectionBundle, Context};
44use crate::typedefs::{RowRowAgent, RowRowEnter};
45
46impl<G> Context<G>
47where
48    G: Scope,
49    G::Timestamp: RenderTimestamp,
50{
51    /// Renders `MirRelationExpr:Join` using dogs^3 delta query dataflows.
52    ///
53    /// The join is followed by the application of `map_filter_project`, whose
54    /// implementation will be pushed in to the join pipeline if at all possible.
55    pub fn render_delta_join(
56        &self,
57        inputs: Vec<CollectionBundle<G>>,
58        join_plan: DeltaJoinPlan,
59    ) -> CollectionBundle<G> {
60        // We create a new region to contain the dataflow paths for the delta join.
61        let (oks, errs) = self.scope.clone().region_named("Join(Delta)", |inner| {
62            // Collects error streams for the ambient scope.
63            let mut inner_errs = Vec::new();
64
65            // Deduplicate the error streams of multiply used arrangements.
66            let mut err_dedup = BTreeSet::new();
67
68            // Our plan is to iterate through each input relation, and attempt
69            // to find a plan that maximally uses existing keys (better: uses
70            // existing arrangements, to which we have access).
71            let mut join_results = Vec::new();
72
73            // First let's prepare the input arrangements we will need.
74            // This reduces redundant imports, and simplifies the dataflow structure.
75            // As the arrangements are all shared, it should not dramatically improve
76            // the efficiency, but the dataflow simplification is worth doing.
77            let mut arrangements = BTreeMap::new();
78            for path_plan in join_plan.path_plans.iter() {
79                for stage_plan in path_plan.stage_plans.iter() {
80                    let lookup_idx = stage_plan.lookup_relation;
81                    let lookup_key = stage_plan.lookup_key.clone();
82                    arrangements
83                        .entry((lookup_idx, lookup_key.clone()))
84                        .or_insert_with(|| {
85                            match inputs[lookup_idx]
86                                .arrangement(&lookup_key)
87                                .unwrap_or_else(|| {
88                                    panic!(
89                                        "Arrangement alarmingly absent!: {}, {:?}",
90                                        lookup_idx, lookup_key,
91                                    )
92                                }) {
93                                ArrangementFlavor::Local(oks, errs) => {
94                                    if err_dedup.insert((lookup_idx, lookup_key)) {
95                                        inner_errs.push(
96                                            errs.enter_region(inner)
97                                                .as_collection(|k, _v| k.clone()),
98                                        );
99                                    }
100                                    Ok(oks.enter_region(inner))
101                                }
102                                ArrangementFlavor::Trace(_gid, oks, errs) => {
103                                    if err_dedup.insert((lookup_idx, lookup_key)) {
104                                        inner_errs.push(
105                                            errs.enter_region(inner)
106                                                .as_collection(|k, _v| k.clone()),
107                                        );
108                                    }
109                                    Err(oks.enter_region(inner))
110                                }
111                            }
112                        });
113                }
114            }
115
116            for path_plan in join_plan.path_plans {
117                // Deconstruct the stages of the path plan.
118                let DeltaPathPlan {
119                    source_relation,
120                    initial_closure,
121                    stage_plans,
122                    final_closure,
123                    source_key,
124                } = path_plan;
125
126                // This collection determines changes that result from updates inbound
127                // from `inputs[relation]` and reflects all strictly prior updates and
128                // concurrent updates from relations prior to `relation`.
129                let name = format!("delta path {}", source_relation);
130                let path_results = inner.clone().region_named(&name, |region| {
131                    // The plan is to move through each relation, starting from `relation` and in the order
132                    // indicated in `orders[relation]`. At each moment, we will have the columns from the
133                    // subset of relations encountered so far, and we will have applied as much as we can
134                    // of the filters in `equivalences` and the logic in `map_filter_project`, based on the
135                    // available columns.
136                    //
137                    // As we go, we will track the physical locations of each intended output column, as well
138                    // as the locations of intermediate results from partial application of `map_filter_project`.
139                    //
140                    // Just before we apply the `lookup` function to perform a join, we will first use our
141                    // available information to determine the filtering and logic that we can apply, and
142                    // introduce that in to the `lookup` logic to cause it to happen in that operator.
143
144                    // Collects error streams for the region scope. Concats before leaving.
145                    let mut region_errs = Vec::with_capacity(inputs.len());
146
147                    // Ensure this input is rendered, and extract its update stream.
148                    let val = arrangements
149                        .get(&(source_relation, source_key))
150                        .expect("Arrangement promised by the planner is absent!");
151                    let as_of = self.as_of_frontier.clone();
152                    let update_stream = match val {
153                        Ok(local) => {
154                            let arranged = local.clone().enter_region(region);
155                            let (update_stream, err_stream) =
156                                build_update_stream::<_, RowRowAgent<_, _>>(
157                                    arranged,
158                                    as_of,
159                                    source_relation,
160                                    initial_closure,
161                                );
162                            region_errs.push(err_stream);
163                            update_stream
164                        }
165                        Err(trace) => {
166                            let arranged = trace.clone().enter_region(region);
167                            let (update_stream, err_stream) =
168                                build_update_stream::<_, RowRowEnter<_, _, _>>(
169                                    arranged,
170                                    as_of,
171                                    source_relation,
172                                    initial_closure,
173                                );
174                            region_errs.push(err_stream);
175                            update_stream
176                        }
177                    };
178                    // Promote `time` to a datum element.
179                    //
180                    // The `half_join` operator manipulates as "data" a pair `(data, time)`,
181                    // while tracking the initial time `init_time` separately and without
182                    // modification. The initial value for both times is the initial time.
183                    let mut update_stream = update_stream
184                        .inner
185                        .map(|(v, t, d)| ((v, t.clone()), t, d))
186                        .as_collection();
187
188                    // Repeatedly update `update_stream` to reflect joins with more and more
189                    // other relations, in the specified order.
190                    for stage_plan in stage_plans {
191                        let DeltaStagePlan {
192                            lookup_relation,
193                            stream_key,
194                            stream_thinning,
195                            lookup_key,
196                            closure,
197                        } = stage_plan;
198
199                        // We require different logic based on the relative order of the two inputs.
200                        // If the `source` relation precedes the `lookup` relation, we present all
201                        // updates with less or equal `time`, and otherwise we present only updates
202                        // with strictly less `time`.
203                        //
204                        // We need to write the logic twice, as there are two types of arrangement
205                        // we might have: either dataflow-local or an imported trace.
206                        let (oks, errs) =
207                            match arrangements.get(&(lookup_relation, lookup_key)).unwrap() {
208                                Ok(local) => {
209                                    if source_relation < lookup_relation {
210                                        build_halfjoin::<_, RowRowAgent<_, _>, _>(
211                                            update_stream,
212                                            local.clone().enter_region(region),
213                                            stream_key,
214                                            stream_thinning,
215                                            |t1, t2| t1.le(t2),
216                                            closure,
217                                            Rc::clone(&self.config_set),
218                                        )
219                                    } else {
220                                        build_halfjoin::<_, RowRowAgent<_, _>, _>(
221                                            update_stream,
222                                            local.clone().enter_region(region),
223                                            stream_key,
224                                            stream_thinning,
225                                            |t1, t2| t1.lt(t2),
226                                            closure,
227                                            Rc::clone(&self.config_set),
228                                        )
229                                    }
230                                }
231                                Err(trace) => {
232                                    if source_relation < lookup_relation {
233                                        build_halfjoin::<_, RowRowEnter<_, _, _>, _>(
234                                            update_stream,
235                                            trace.clone().enter_region(region),
236                                            stream_key,
237                                            stream_thinning,
238                                            |t1, t2| t1.le(t2),
239                                            closure,
240                                            Rc::clone(&self.config_set),
241                                        )
242                                    } else {
243                                        build_halfjoin::<_, RowRowEnter<_, _, _>, _>(
244                                            update_stream,
245                                            trace.clone().enter_region(region),
246                                            stream_key,
247                                            stream_thinning,
248                                            |t1, t2| t1.lt(t2),
249                                            closure,
250                                            Rc::clone(&self.config_set),
251                                        )
252                                    }
253                                }
254                            };
255                        update_stream = oks;
256                        region_errs.push(errs);
257                    }
258
259                    // Delay updates as appropriate.
260                    //
261                    // The `half_join` operator maintains a time that we now discard (the `_`),
262                    // and replace with the `time` that is maintained with the data. The former
263                    // exists to pin a consistent total order on updates throughout the process,
264                    // while allowing `time` to vary upwards as a result of actions on time.
265                    let mut update_stream = update_stream
266                        .inner
267                        .map(|((row, time), _, diff)| (row, time, diff))
268                        .as_collection();
269
270                    // We have completed the join building, but may have work remaining.
271                    // For example, we may have expressions not pushed down (e.g. literals)
272                    // and projections that could not be applied (e.g. column repetition).
273                    if let Some(final_closure) = final_closure {
274                        let name = "DeltaJoinFinalization";
275                        type CB<C> = ConsolidatingContainerBuilder<C>;
276                        let (updates, errors) = update_stream
277                            .flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>(name, {
278                                // Reuseable allocation for unpacking.
279                                let mut datums = DatumVec::new();
280                                move |row| {
281                                    let mut row_builder = SharedRow::get();
282                                    let temp_storage = RowArena::new();
283                                    let mut datums_local = datums.borrow_with(&row);
284                                    // TODO(mcsherry): re-use `row` allocation.
285                                    final_closure
286                                        .apply(&mut datums_local, &temp_storage, &mut row_builder)
287                                        .map(|row| row.cloned())
288                                        .map_err(DataflowError::from)
289                                        .transpose()
290                                }
291                            });
292
293                        update_stream = updates;
294                        region_errs.push(errors);
295                    }
296
297                    inner_errs.push(
298                        differential_dataflow::collection::concatenate(region, region_errs)
299                            .leave_region(),
300                    );
301                    update_stream.leave_region()
302                });
303
304                join_results.push(path_results);
305            }
306
307            // Concatenate the results of each delta query as the accumulated results.
308            (
309                differential_dataflow::collection::concatenate(inner, join_results).leave_region(),
310                differential_dataflow::collection::concatenate(inner, inner_errs).leave_region(),
311            )
312        });
313        CollectionBundle::from_collections(oks, errs)
314    }
315}
316
317/// Constructs a `half_join` from supplied arguments.
318///
319/// This method exists to factor common logic from four code paths that are generic over the type of trace.
320/// The `comparison` function should either be `le` or `lt` depending on which relation comes first in the
321/// total order on relations (in order to break ties consistently).
322///
323/// The input and output streams are of pairs `(data, time)` where the `time` component can be greater than
324/// the time of the update. This operator may manipulate `time` as part of this pair, but will not manipulate
325/// the time of the update. This is crucial for correctness, as the total order on times of updates is used
326/// to ensure that any two updates are matched at most once.
327fn build_halfjoin<G, Tr, CF>(
328    updates: VecCollection<G, (Row, G::Timestamp), Diff>,
329    trace: Arranged<G, Tr>,
330    prev_key: Vec<MirScalarExpr>,
331    prev_thinning: Vec<usize>,
332    comparison: CF,
333    closure: JoinClosure,
334    config_set: Rc<ConfigSet>,
335) -> (
336    VecCollection<G, (Row, G::Timestamp), Diff>,
337    VecCollection<G, DataflowError, Diff>,
338)
339where
340    G: Scope,
341    G::Timestamp: RenderTimestamp,
342    Tr: TraceReader<KeyOwn = Row, Time = G::Timestamp, Diff = Diff> + Clone + 'static,
343    for<'a> Tr::Val<'a>: ToDatumIter,
344    CF: Fn(Tr::TimeGat<'_>, &G::Timestamp) -> bool + 'static,
345{
346    let use_half_join2 = ENABLE_HALF_JOIN2.get(&config_set);
347
348    let name = "DeltaJoinKeyPreparation";
349    type CB<C> = CapacityContainerBuilder<C>;
350    let (updates, errs) = updates.map_fallible::<CB<_>, CB<_>, _, _, _>(name, {
351        // Reuseable allocation for unpacking.
352        let mut datums = DatumVec::new();
353        move |(row, time)| {
354            let temp_storage = RowArena::new();
355            let datums_local = datums.borrow_with(&row);
356            let mut row_builder = SharedRow::get();
357            row_builder.packer().try_extend(
358                prev_key
359                    .iter()
360                    .map(|e| e.eval(&datums_local, &temp_storage)),
361            )?;
362            let key = row_builder.clone();
363            row_builder
364                .packer()
365                .extend(prev_thinning.iter().map(|&c| datums_local[c]));
366            let row_value = row_builder.clone();
367
368            Ok((key, row_value, time))
369        }
370    });
371    let datums = DatumVec::new();
372
373    if use_half_join2 {
374        build_halfjoin2(updates, trace, comparison, closure, datums, errs)
375    } else {
376        build_halfjoin1(updates, trace, comparison, closure, datums, errs)
377    }
378}
379
380/// `half_join2` implementation (less-quadratic, new default).
381fn build_halfjoin2<G, Tr, CF>(
382    updates: VecCollection<G, (Row, Row, G::Timestamp), Diff>,
383    trace: Arranged<G, Tr>,
384    comparison: CF,
385    closure: JoinClosure,
386    mut datums: DatumVec,
387    errs: VecCollection<G, DataflowError, Diff>,
388) -> (
389    VecCollection<G, (Row, G::Timestamp), Diff>,
390    VecCollection<G, DataflowError, Diff>,
391)
392where
393    G: Scope,
394    G::Timestamp: RenderTimestamp,
395    Tr: TraceReader<KeyOwn = Row, Time = G::Timestamp, Diff = Diff> + Clone + 'static,
396    for<'a> Tr::Val<'a>: ToDatumIter,
397    CF: Fn(Tr::TimeGat<'_>, &G::Timestamp) -> bool + 'static,
398{
399    type CB<C> = CapacityContainerBuilder<C>;
400
401    if closure.could_error() {
402        let (oks, errs2) = differential_dogs3::operators::half_join2::half_join_internal_unsafe(
403            updates,
404            trace,
405            |time, antichain| {
406                antichain.insert(time.step_back());
407            },
408            comparison,
409            // TODO(mcsherry): investigate/establish trade-offs here; time based had problems,
410            // in that we seem to yield too much and do too little work when we do.
411            |_timer, count| count > 1_000_000,
412            // TODO(mcsherry): consider `RefOrMut` in `half_join` interface to allow re-use.
413            move |session: &mut CB<Vec<_>>, key, stream_row, lookup_row, initial, diff1, output| {
414                let mut row_builder = SharedRow::get();
415                let temp_storage = RowArena::new();
416
417                let mut datums_local = datums.borrow();
418                datums_local.extend(key.iter());
419                datums_local.extend(stream_row.iter());
420                datums_local.extend(lookup_row.to_datum_iter());
421
422                let row = closure.apply(&mut datums_local, &temp_storage, &mut row_builder);
423
424                for (time, diff2) in output.drain(..) {
425                    let row = row.as_ref().map(|row| row.cloned()).map_err(Clone::clone);
426                    let diff = diff1.clone() * diff2.clone();
427                    let data = ((row, time.clone()), initial.clone(), diff);
428                    use timely::container::PushInto;
429                    session.push_into(data);
430                }
431            },
432        )
433        .ok_err(|(data_time, init_time, diff)| {
434            // TODO(mcsherry): consider `ok_err()` for `Collection`.
435            match data_time {
436                (Ok(data), time) => Ok((data.map(|data| (data, time)), init_time, diff)),
437                (Err(err), _time) => Err((DataflowError::from(err), init_time, diff)),
438            }
439        });
440
441        (
442            oks.as_collection().flat_map(|x| x),
443            errs.concat(errs2.as_collection()),
444        )
445    } else {
446        let oks = differential_dogs3::operators::half_join2::half_join_internal_unsafe(
447            updates,
448            trace,
449            |time, antichain| {
450                antichain.insert(time.step_back());
451            },
452            comparison,
453            // TODO(mcsherry): investigate/establish trade-offs here; time based had problems,
454            // in that we seem to yield too much and do too little work when we do.
455            |_timer, count| count > 1_000_000,
456            // TODO(mcsherry): consider `RefOrMut` in `half_join` interface to allow re-use.
457            move |session: &mut CB<Vec<_>>, key, stream_row, lookup_row, initial, diff1, output| {
458                if output.is_empty() {
459                    return;
460                }
461
462                let mut row_builder = SharedRow::get();
463                let temp_storage = RowArena::new();
464
465                let mut datums_local = datums.borrow();
466                datums_local.extend(key.iter());
467                datums_local.extend(stream_row.iter());
468                datums_local.extend(lookup_row.to_datum_iter());
469
470                if let Some(row) = closure
471                    .apply(&mut datums_local, &temp_storage, &mut row_builder)
472                    .expect("Closure claimed to never error")
473                {
474                    for (time, diff2) in output.drain(..) {
475                        let diff = diff1.clone() * diff2.clone();
476                        use timely::container::PushInto;
477                        session.push_into(((row.clone(), time.clone()), initial.clone(), diff));
478                    }
479                }
480            },
481        );
482
483        (oks.as_collection(), errs)
484    }
485}
486
487/// Original `half_join` implementation (fallback).
488fn build_halfjoin1<G, Tr, CF>(
489    updates: VecCollection<G, (Row, Row, G::Timestamp), Diff>,
490    trace: Arranged<G, Tr>,
491    comparison: CF,
492    closure: JoinClosure,
493    mut datums: DatumVec,
494    errs: VecCollection<G, DataflowError, Diff>,
495) -> (
496    VecCollection<G, (Row, G::Timestamp), Diff>,
497    VecCollection<G, DataflowError, Diff>,
498)
499where
500    G: Scope,
501    G::Timestamp: RenderTimestamp,
502    Tr: TraceReader<KeyOwn = Row, Time = G::Timestamp, Diff = Diff> + Clone + 'static,
503    for<'a> Tr::Val<'a>: ToDatumIter,
504    CF: Fn(Tr::TimeGat<'_>, &G::Timestamp) -> bool + 'static,
505{
506    type CB<C> = CapacityContainerBuilder<C>;
507
508    if closure.could_error() {
509        let (oks, errs2) = differential_dogs3::operators::half_join::half_join_internal_unsafe(
510            updates,
511            trace,
512            |time, antichain| {
513                antichain.insert(time.step_back());
514            },
515            comparison,
516            |_timer, count| count > 1_000_000,
517            move |session: &mut Session<'_, '_, G::Timestamp, CB<Vec<_>>, _>,
518                  key,
519                  stream_row: &Row,
520                  lookup_row,
521                  initial,
522                  diff1,
523                  output| {
524                let mut row_builder = SharedRow::get();
525                let temp_storage = RowArena::new();
526
527                let mut datums_local = datums.borrow();
528                datums_local.extend(key.iter());
529                datums_local.extend(stream_row.iter());
530                datums_local.extend(lookup_row.to_datum_iter());
531
532                let row = closure.apply(&mut datums_local, &temp_storage, &mut row_builder);
533
534                for (time, diff2) in output.drain(..) {
535                    let row = row.as_ref().map(|row| row.cloned()).map_err(Clone::clone);
536                    let diff = diff1.clone() * diff2.clone();
537                    let data = ((row, time.clone()), initial.clone(), diff);
538                    session.give(data);
539                }
540            },
541        )
542        .ok_err(|(data_time, init_time, diff)| match data_time {
543            (Ok(data), time) => Ok((data.map(|data| (data, time)), init_time, diff)),
544            (Err(err), _time) => Err((DataflowError::from(err), init_time, diff)),
545        });
546
547        (
548            oks.as_collection().flat_map(|x| x),
549            errs.concat(errs2.as_collection()),
550        )
551    } else {
552        let oks = differential_dogs3::operators::half_join::half_join_internal_unsafe(
553            updates,
554            trace,
555            |time, antichain| {
556                antichain.insert(time.step_back());
557            },
558            comparison,
559            |_timer, count| count > 1_000_000,
560            move |session: &mut Session<'_, '_, G::Timestamp, CB<Vec<_>>, _>,
561                  key,
562                  stream_row: &Row,
563                  lookup_row,
564                  initial,
565                  diff1,
566                  output| {
567                if output.is_empty() {
568                    return;
569                }
570
571                let mut row_builder = SharedRow::get();
572                let temp_storage = RowArena::new();
573
574                let mut datums_local = datums.borrow();
575                datums_local.extend(key.iter());
576                datums_local.extend(stream_row.iter());
577                datums_local.extend(lookup_row.to_datum_iter());
578
579                if let Some(row) = closure
580                    .apply(&mut datums_local, &temp_storage, &mut row_builder)
581                    .expect("Closure claimed to never error")
582                {
583                    for (time, diff2) in output.drain(..) {
584                        let diff = diff1.clone() * diff2.clone();
585                        session.give(((row.clone(), time.clone()), initial.clone(), diff));
586                    }
587                }
588            },
589        );
590
591        (oks.as_collection(), errs)
592    }
593}
594
595/// Builds the beginning of the update stream of a delta path.
596///
597/// At start-up time only the delta path for the first relation sees updates, since any updates fed to the
598/// other delta paths would be discarded anyway due to the tie-breaking logic that avoids double-counting
599/// updates happening at the same time on different relations.
600fn build_update_stream<G, Tr>(
601    trace: Arranged<G, Tr>,
602    as_of: Antichain<mz_repr::Timestamp>,
603    source_relation: usize,
604    initial_closure: JoinClosure,
605) -> (
606    VecCollection<G, Row, Diff>,
607    VecCollection<G, DataflowError, Diff>,
608)
609where
610    G: Scope,
611    G::Timestamp: RenderTimestamp,
612    for<'a, 'b> &'a G::Timestamp: PartialEq<Tr::TimeGat<'b>>,
613    Tr: for<'a> TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
614    for<'a> Tr::Key<'a>: ToDatumIter,
615    for<'a> Tr::Val<'a>: ToDatumIter,
616{
617    let mut inner_as_of = Antichain::new();
618    for event_time in as_of.elements().iter() {
619        inner_as_of.insert(<G::Timestamp>::to_inner(event_time.clone()));
620    }
621
622    let (ok_stream, err_stream) =
623        trace
624            .stream
625            .unary_fallible(Pipeline, "UpdateStream", move |_, _| {
626                let mut datums = DatumVec::new();
627                Box::new(move |input, ok_output, err_output| {
628                    // Buffer to accumulate contributing (time, diff) pairs for each (key, val).
629                    let mut times_diffs = Vec::default();
630                    input.for_each(|time, data| {
631                        let mut row_builder = SharedRow::get();
632                        let mut ok_session = ok_output.session(&time);
633                        let mut err_session = err_output.session(&time);
634
635                        for wrapper in data.iter() {
636                            let batch = &wrapper;
637                            let mut cursor = batch.cursor();
638                            while let Some(key) = cursor.get_key(batch) {
639                                while let Some(val) = cursor.get_val(batch) {
640                                    // Collect contributing (time, diff) pairs before invoking the closure.
641                                    cursor.map_times(batch, |time, diff| {
642                                        if source_relation == 0
643                                            || inner_as_of.elements().iter().all(|e| e != time)
644                                        {
645                                            // TODO: Consolidate as we push, defensively.
646                                            times_diffs
647                                                .push((Tr::owned_time(time), Tr::owned_diff(diff)));
648                                        }
649                                    });
650                                    differential_dataflow::consolidation::consolidate(
651                                        &mut times_diffs,
652                                    );
653                                    // The can not-uncommonly be empty, if the inbound updates cancel.
654                                    if !times_diffs.is_empty() {
655                                        let temp_storage = RowArena::new();
656
657                                        let mut datums_local = datums.borrow();
658                                        datums_local.extend(key.to_datum_iter());
659                                        datums_local.extend(val.to_datum_iter());
660
661                                        if !initial_closure.is_identity() {
662                                            match initial_closure
663                                                .apply(
664                                                    &mut datums_local,
665                                                    &temp_storage,
666                                                    &mut row_builder,
667                                                )
668                                                .map(|row| row.cloned())
669                                                .transpose()
670                                            {
671                                                Some(Ok(row)) => {
672                                                    for (time, diff) in times_diffs.drain(..) {
673                                                        ok_session.give((row.clone(), time, diff))
674                                                    }
675                                                }
676                                                Some(Err(err)) => {
677                                                    for (time, diff) in times_diffs.drain(..) {
678                                                        err_session.give((err.clone(), time, diff))
679                                                    }
680                                                }
681                                                None => {}
682                                            }
683                                        } else {
684                                            let row = {
685                                                row_builder.packer().extend(&*datums_local);
686                                                row_builder.clone()
687                                            };
688                                            for (time, diff) in times_diffs.drain(..) {
689                                                ok_session.give((row.clone(), time, diff));
690                                            }
691                                        }
692                                    }
693                                    times_diffs.clear();
694
695                                    cursor.step_val(batch);
696                                }
697                                cursor.step_key(batch);
698                            }
699                        }
700                    });
701                })
702            });
703
704    (
705        ok_stream.as_collection(),
706        err_stream.as_collection().map(DataflowError::from),
707    )
708}