Skip to main content

mz_compute/render/join/
delta_join.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Delta join execution dataflow construction.
11//!
12//! Consult [DeltaJoinPlan] documentation for details.
13
14#![allow(clippy::op_ref)]
15
16use std::collections::{BTreeMap, BTreeSet};
17
18use std::rc::Rc;
19
20use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
21use differential_dataflow::operators::arrange::Arranged;
22use differential_dataflow::trace::implementations::BatchContainer;
23use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
24use differential_dataflow::{AsCollection, VecCollection};
25use mz_compute_types::dyncfgs::ENABLE_HALF_JOIN2;
26use mz_compute_types::plan::join::JoinClosure;
27use mz_compute_types::plan::join::delta_join::{DeltaJoinPlan, DeltaPathPlan, DeltaStagePlan};
28use mz_dyncfg::ConfigSet;
29use mz_expr::MirScalarExpr;
30use mz_repr::fixed_length::ToDatumIter;
31use mz_repr::{DatumVec, Diff, Row, RowArena, SharedRow};
32use mz_storage_types::errors::DataflowError;
33use mz_timely_util::operator::{CollectionExt, StreamExt};
34use timely::container::CapacityContainerBuilder;
35use timely::dataflow::channels::pact::Pipeline;
36use timely::dataflow::operators::OkErr;
37use timely::dataflow::operators::generic::Session;
38use timely::dataflow::operators::vec::Map;
39use timely::progress::Antichain;
40
41use crate::render::RenderTimestamp;
42use crate::render::context::{ArrangementFlavor, CollectionBundle, Context};
43use crate::typedefs::{RowRowAgent, RowRowEnter};
44
45impl<'scope, T: RenderTimestamp> Context<'scope, T> {
46    /// Renders `MirRelationExpr:Join` using dogs^3 delta query dataflows.
47    ///
48    /// The join is followed by the application of `map_filter_project`, whose
49    /// implementation will be pushed in to the join pipeline if at all possible.
50    pub fn render_delta_join(
51        &self,
52        inputs: Vec<CollectionBundle<'scope, T>>,
53        join_plan: DeltaJoinPlan,
54    ) -> CollectionBundle<'scope, T> {
55        // We create a new region to contain the dataflow paths for the delta join.
56        let (oks, errs) = self.scope.clone().region_named("Join(Delta)", |inner| {
57            // Collects error streams for the ambient scope.
58            let mut inner_errs = Vec::new();
59
60            // Deduplicate the error streams of multiply used arrangements.
61            let mut err_dedup = BTreeSet::new();
62
63            // Our plan is to iterate through each input relation, and attempt
64            // to find a plan that maximally uses existing keys (better: uses
65            // existing arrangements, to which we have access).
66            let mut join_results = Vec::new();
67
68            // First let's prepare the input arrangements we will need.
69            // This reduces redundant imports, and simplifies the dataflow structure.
70            // As the arrangements are all shared, it should not dramatically improve
71            // the efficiency, but the dataflow simplification is worth doing.
72            let mut arrangements = BTreeMap::new();
73            for path_plan in join_plan.path_plans.iter() {
74                for stage_plan in path_plan.stage_plans.iter() {
75                    let lookup_idx = stage_plan.lookup_relation;
76                    let lookup_key = stage_plan.lookup_key.clone();
77                    arrangements
78                        .entry((lookup_idx, lookup_key.clone()))
79                        .or_insert_with(|| {
80                            match inputs[lookup_idx]
81                                .arrangement(&lookup_key)
82                                .unwrap_or_else(|| {
83                                    panic!(
84                                        "Arrangement alarmingly absent!: {}, {:?}",
85                                        lookup_idx, lookup_key,
86                                    )
87                                }) {
88                                ArrangementFlavor::Local(oks, errs) => {
89                                    if err_dedup.insert((lookup_idx, lookup_key)) {
90                                        inner_errs.push(
91                                            errs.enter_region(inner)
92                                                .as_collection(|k, _v| k.clone()),
93                                        );
94                                    }
95                                    Ok(oks.enter_region(inner))
96                                }
97                                ArrangementFlavor::Trace(_gid, oks, errs) => {
98                                    if err_dedup.insert((lookup_idx, lookup_key)) {
99                                        inner_errs.push(
100                                            errs.enter_region(inner)
101                                                .as_collection(|k, _v| k.clone()),
102                                        );
103                                    }
104                                    Err(oks.enter_region(inner))
105                                }
106                            }
107                        });
108                }
109            }
110
111            for path_plan in join_plan.path_plans {
112                // Deconstruct the stages of the path plan.
113                let DeltaPathPlan {
114                    source_relation,
115                    initial_closure,
116                    stage_plans,
117                    final_closure,
118                    source_key,
119                } = path_plan;
120
121                // This collection determines changes that result from updates inbound
122                // from `inputs[relation]` and reflects all strictly prior updates and
123                // concurrent updates from relations prior to `relation`.
124                let name = format!("delta path {}", source_relation);
125                let path_results = inner.clone().region_named(&name, |region| {
126                    // The plan is to move through each relation, starting from `relation` and in the order
127                    // indicated in `orders[relation]`. At each moment, we will have the columns from the
128                    // subset of relations encountered so far, and we will have applied as much as we can
129                    // of the filters in `equivalences` and the logic in `map_filter_project`, based on the
130                    // available columns.
131                    //
132                    // As we go, we will track the physical locations of each intended output column, as well
133                    // as the locations of intermediate results from partial application of `map_filter_project`.
134                    //
135                    // Just before we apply the `lookup` function to perform a join, we will first use our
136                    // available information to determine the filtering and logic that we can apply, and
137                    // introduce that in to the `lookup` logic to cause it to happen in that operator.
138
139                    // Collects error streams for the region scope. Concats before leaving.
140                    let mut region_errs = Vec::with_capacity(inputs.len());
141
142                    // Ensure this input is rendered, and extract its update stream.
143                    let val = arrangements
144                        .get(&(source_relation, source_key))
145                        .expect("Arrangement promised by the planner is absent!");
146                    let as_of = self.as_of_frontier.clone();
147                    let update_stream = match val {
148                        Ok(local) => {
149                            let arranged = local.clone().enter_region(region);
150                            let (update_stream, err_stream) =
151                                build_update_stream::<_, RowRowAgent<_, _>>(
152                                    arranged,
153                                    as_of,
154                                    source_relation,
155                                    initial_closure,
156                                );
157                            region_errs.push(err_stream);
158                            update_stream
159                        }
160                        Err(trace) => {
161                            let arranged = trace.clone().enter_region(region);
162                            let (update_stream, err_stream) =
163                                build_update_stream::<_, RowRowEnter<_, _, _>>(
164                                    arranged,
165                                    as_of,
166                                    source_relation,
167                                    initial_closure,
168                                );
169                            region_errs.push(err_stream);
170                            update_stream
171                        }
172                    };
173                    // Promote `time` to a datum element.
174                    //
175                    // The `half_join` operator manipulates as "data" a pair `(data, time)`,
176                    // while tracking the initial time `init_time` separately and without
177                    // modification. The initial value for both times is the initial time.
178                    let mut update_stream = update_stream
179                        .inner
180                        .map(|(v, t, d)| ((v, t.clone()), t, d))
181                        .as_collection();
182
183                    // Repeatedly update `update_stream` to reflect joins with more and more
184                    // other relations, in the specified order.
185                    for stage_plan in stage_plans {
186                        let DeltaStagePlan {
187                            lookup_relation,
188                            stream_key,
189                            stream_thinning,
190                            lookup_key,
191                            closure,
192                        } = stage_plan;
193
194                        // We require different logic based on the relative order of the two inputs.
195                        // If the `source` relation precedes the `lookup` relation, we present all
196                        // updates with less or equal `time`, and otherwise we present only updates
197                        // with strictly less `time`.
198                        //
199                        // We need to write the logic twice, as there are two types of arrangement
200                        // we might have: either dataflow-local or an imported trace.
201                        let (oks, errs) =
202                            match arrangements.get(&(lookup_relation, lookup_key)).unwrap() {
203                                Ok(local) => {
204                                    if source_relation < lookup_relation {
205                                        build_halfjoin::<_, RowRowAgent<_, _>, _>(
206                                            update_stream,
207                                            local.clone().enter_region(region),
208                                            stream_key,
209                                            stream_thinning,
210                                            |t1, t2| t1.le(t2),
211                                            closure,
212                                            Rc::clone(&self.config_set),
213                                        )
214                                    } else {
215                                        build_halfjoin::<_, RowRowAgent<_, _>, _>(
216                                            update_stream,
217                                            local.clone().enter_region(region),
218                                            stream_key,
219                                            stream_thinning,
220                                            |t1, t2| t1.lt(t2),
221                                            closure,
222                                            Rc::clone(&self.config_set),
223                                        )
224                                    }
225                                }
226                                Err(trace) => {
227                                    if source_relation < lookup_relation {
228                                        build_halfjoin::<_, RowRowEnter<_, _, _>, _>(
229                                            update_stream,
230                                            trace.clone().enter_region(region),
231                                            stream_key,
232                                            stream_thinning,
233                                            |t1, t2| t1.le(t2),
234                                            closure,
235                                            Rc::clone(&self.config_set),
236                                        )
237                                    } else {
238                                        build_halfjoin::<_, RowRowEnter<_, _, _>, _>(
239                                            update_stream,
240                                            trace.clone().enter_region(region),
241                                            stream_key,
242                                            stream_thinning,
243                                            |t1, t2| t1.lt(t2),
244                                            closure,
245                                            Rc::clone(&self.config_set),
246                                        )
247                                    }
248                                }
249                            };
250                        update_stream = oks;
251                        region_errs.push(errs);
252                    }
253
254                    // Delay updates as appropriate.
255                    //
256                    // The `half_join` operator maintains a time that we now discard (the `_`),
257                    // and replace with the `time` that is maintained with the data. The former
258                    // exists to pin a consistent total order on updates throughout the process,
259                    // while allowing `time` to vary upwards as a result of actions on time.
260                    let mut update_stream = update_stream
261                        .inner
262                        .map(|((row, time), _, diff)| (row, time, diff))
263                        .as_collection();
264
265                    // We have completed the join building, but may have work remaining.
266                    // For example, we may have expressions not pushed down (e.g. literals)
267                    // and projections that could not be applied (e.g. column repetition).
268                    if let Some(final_closure) = final_closure {
269                        let name = "DeltaJoinFinalization";
270                        type CB<C> = ConsolidatingContainerBuilder<C>;
271                        let (updates, errors) = update_stream
272                            .flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>(name, {
273                                // Reuseable allocation for unpacking.
274                                let mut datums = DatumVec::new();
275                                move |row| {
276                                    let mut row_builder = SharedRow::get();
277                                    let temp_storage = RowArena::new();
278                                    let mut datums_local = datums.borrow_with(&row);
279                                    // TODO(mcsherry): re-use `row` allocation.
280                                    final_closure
281                                        .apply(&mut datums_local, &temp_storage, &mut row_builder)
282                                        .map(|row| row.cloned())
283                                        .map_err(DataflowError::from)
284                                        .transpose()
285                                }
286                            });
287
288                        update_stream = updates;
289                        region_errs.push(errors);
290                    }
291
292                    inner_errs.push(
293                        differential_dataflow::collection::concatenate(region, region_errs)
294                            .leave_region(inner),
295                    );
296                    update_stream.leave_region(inner)
297                });
298
299                join_results.push(path_results);
300            }
301
302            // Concatenate the results of each delta query as the accumulated results.
303            (
304                differential_dataflow::collection::concatenate(inner, join_results)
305                    .leave_region(self.scope),
306                differential_dataflow::collection::concatenate(inner, inner_errs)
307                    .leave_region(self.scope),
308            )
309        });
310        CollectionBundle::from_collections(oks, errs)
311    }
312}
313
314/// Constructs a `half_join` from supplied arguments.
315///
316/// This method exists to factor common logic from four code paths that are generic over the type of trace.
317/// The `comparison` function should either be `le` or `lt` depending on which relation comes first in the
318/// total order on relations (in order to break ties consistently).
319///
320/// The input and output streams are of pairs `(data, time)` where the `time` component can be greater than
321/// the time of the update. This operator may manipulate `time` as part of this pair, but will not manipulate
322/// the time of the update. This is crucial for correctness, as the total order on times of updates is used
323/// to ensure that any two updates are matched at most once.
324fn build_halfjoin<'scope, T, Tr, CF>(
325    updates: VecCollection<'scope, T, (Row, T), Diff>,
326    trace: Arranged<'scope, Tr>,
327    prev_key: Vec<MirScalarExpr>,
328    prev_thinning: Vec<usize>,
329    comparison: CF,
330    closure: JoinClosure,
331    config_set: Rc<ConfigSet>,
332) -> (
333    VecCollection<'scope, T, (Row, T), Diff>,
334    VecCollection<'scope, T, DataflowError, Diff>,
335)
336where
337    T: RenderTimestamp,
338    Tr: TraceReader<KeyContainer: BatchContainer<Owned = Row>, Time = T, Diff = Diff>
339        + Clone
340        + 'static,
341    for<'a> Tr::Val<'a>: ToDatumIter,
342    CF: Fn(Tr::TimeGat<'_>, &T) -> bool + 'static,
343{
344    let use_half_join2 = ENABLE_HALF_JOIN2.get(&config_set);
345
346    let name = "DeltaJoinKeyPreparation";
347    type CB<C> = CapacityContainerBuilder<C>;
348    let (updates, errs) = updates.map_fallible::<CB<_>, CB<_>, _, _, _>(name, {
349        // Reuseable allocation for unpacking.
350        let mut datums = DatumVec::new();
351        move |(row, time)| {
352            let temp_storage = RowArena::new();
353            let datums_local = datums.borrow_with(&row);
354            let mut row_builder = SharedRow::get();
355            row_builder.packer().try_extend(
356                prev_key
357                    .iter()
358                    .map(|e| e.eval(&datums_local, &temp_storage)),
359            )?;
360            let key = row_builder.clone();
361            row_builder
362                .packer()
363                .extend(prev_thinning.iter().map(|&c| datums_local[c]));
364            let row_value = row_builder.clone();
365
366            Ok((key, row_value, time))
367        }
368    });
369    let datums = DatumVec::new();
370
371    if use_half_join2 {
372        build_halfjoin2(updates, trace, comparison, closure, datums, errs)
373    } else {
374        build_halfjoin1(updates, trace, comparison, closure, datums, errs)
375    }
376}
377
378/// `half_join2` implementation (less-quadratic, new default).
379fn build_halfjoin2<'scope, T, Tr, CF>(
380    updates: VecCollection<'scope, T, (Row, Row, T), Diff>,
381    trace: Arranged<'scope, Tr>,
382    comparison: CF,
383    closure: JoinClosure,
384    mut datums: DatumVec,
385    errs: VecCollection<'scope, T, DataflowError, Diff>,
386) -> (
387    VecCollection<'scope, T, (Row, T), Diff>,
388    VecCollection<'scope, T, DataflowError, Diff>,
389)
390where
391    T: RenderTimestamp,
392    Tr: TraceReader<KeyContainer: BatchContainer<Owned = Row>, Time = T, Diff = Diff>
393        + Clone
394        + 'static,
395    for<'a> Tr::Val<'a>: ToDatumIter,
396    CF: Fn(Tr::TimeGat<'_>, &T) -> bool + 'static,
397{
398    type CB<C> = CapacityContainerBuilder<C>;
399
400    if closure.could_error() {
401        let (oks, errs2) = differential_dogs3::operators::half_join2::half_join_internal_unsafe(
402            updates,
403            trace,
404            |time, antichain| {
405                antichain.insert(time.step_back());
406            },
407            comparison,
408            // TODO(mcsherry): investigate/establish trade-offs here; time based had problems,
409            // in that we seem to yield too much and do too little work when we do.
410            |_timer, count| count > 1_000_000,
411            // TODO(mcsherry): consider `RefOrMut` in `half_join` interface to allow re-use.
412            move |session: &mut CB<Vec<_>>, key, stream_row, lookup_row, initial, diff1, output| {
413                let mut row_builder = SharedRow::get();
414                let temp_storage = RowArena::new();
415
416                let mut datums_local = datums.borrow();
417                datums_local.extend(key.iter());
418                datums_local.extend(stream_row.iter());
419                datums_local.extend(lookup_row.to_datum_iter());
420
421                let row = closure.apply(&mut datums_local, &temp_storage, &mut row_builder);
422
423                for (time, diff2) in output.drain(..) {
424                    let row = row.as_ref().map(|row| row.cloned()).map_err(Clone::clone);
425                    let diff = diff1.clone() * diff2.clone();
426                    let data = ((row, time.clone()), initial.clone(), diff);
427                    use timely::container::PushInto;
428                    session.push_into(data);
429                }
430            },
431        )
432        .ok_err(|(data_time, init_time, diff)| {
433            // TODO(mcsherry): consider `ok_err()` for `Collection`.
434            match data_time {
435                (Ok(data), time) => Ok((data.map(|data| (data, time)), init_time, diff)),
436                (Err(err), _time) => Err((DataflowError::from(err), init_time, diff)),
437            }
438        });
439
440        (
441            oks.as_collection().flat_map(|x| x),
442            errs.concat(errs2.as_collection()),
443        )
444    } else {
445        let oks = differential_dogs3::operators::half_join2::half_join_internal_unsafe(
446            updates,
447            trace,
448            |time, antichain| {
449                antichain.insert(time.step_back());
450            },
451            comparison,
452            // TODO(mcsherry): investigate/establish trade-offs here; time based had problems,
453            // in that we seem to yield too much and do too little work when we do.
454            |_timer, count| count > 1_000_000,
455            // TODO(mcsherry): consider `RefOrMut` in `half_join` interface to allow re-use.
456            move |session: &mut CB<Vec<_>>, key, stream_row, lookup_row, initial, diff1, output| {
457                if output.is_empty() {
458                    return;
459                }
460
461                let mut row_builder = SharedRow::get();
462                let temp_storage = RowArena::new();
463
464                let mut datums_local = datums.borrow();
465                datums_local.extend(key.iter());
466                datums_local.extend(stream_row.iter());
467                datums_local.extend(lookup_row.to_datum_iter());
468
469                if let Some(row) = closure
470                    .apply(&mut datums_local, &temp_storage, &mut row_builder)
471                    .expect("Closure claimed to never error")
472                {
473                    for (time, diff2) in output.drain(..) {
474                        let diff = diff1.clone() * diff2.clone();
475                        use timely::container::PushInto;
476                        session.push_into(((row.clone(), time.clone()), initial.clone(), diff));
477                    }
478                }
479            },
480        );
481
482        (oks.as_collection(), errs)
483    }
484}
485
486/// Original `half_join` implementation (fallback).
487fn build_halfjoin1<'scope, T, Tr, CF>(
488    updates: VecCollection<'scope, T, (Row, Row, T), Diff>,
489    trace: Arranged<'scope, Tr>,
490    comparison: CF,
491    closure: JoinClosure,
492    mut datums: DatumVec,
493    errs: VecCollection<'scope, T, DataflowError, Diff>,
494) -> (
495    VecCollection<'scope, T, (Row, T), Diff>,
496    VecCollection<'scope, T, DataflowError, Diff>,
497)
498where
499    T: RenderTimestamp,
500    Tr: TraceReader<KeyContainer: BatchContainer<Owned = Row>, Time = T, Diff = Diff>
501        + Clone
502        + 'static,
503    for<'a> Tr::Val<'a>: ToDatumIter,
504    CF: Fn(Tr::TimeGat<'_>, &T) -> bool + 'static,
505{
506    type CB<C> = CapacityContainerBuilder<C>;
507
508    if closure.could_error() {
509        let (oks, errs2) = differential_dogs3::operators::half_join::half_join_internal_unsafe(
510            updates,
511            trace,
512            |time, antichain| {
513                antichain.insert(time.step_back());
514            },
515            comparison,
516            |_timer, count| count > 1_000_000,
517            move |session: &mut Session<'_, '_, T, CB<Vec<_>>, _>,
518                  key,
519                  stream_row: &Row,
520                  lookup_row,
521                  initial,
522                  diff1,
523                  output| {
524                let mut row_builder = SharedRow::get();
525                let temp_storage = RowArena::new();
526
527                let mut datums_local = datums.borrow();
528                datums_local.extend(key.iter());
529                datums_local.extend(stream_row.iter());
530                datums_local.extend(lookup_row.to_datum_iter());
531
532                let row = closure.apply(&mut datums_local, &temp_storage, &mut row_builder);
533
534                for (time, diff2) in output.drain(..) {
535                    let row = row.as_ref().map(|row| row.cloned()).map_err(Clone::clone);
536                    let diff = diff1.clone() * diff2.clone();
537                    let data = ((row, time.clone()), initial.clone(), diff);
538                    session.give(data);
539                }
540            },
541        )
542        .ok_err(|(data_time, init_time, diff)| match data_time {
543            (Ok(data), time) => Ok((data.map(|data| (data, time)), init_time, diff)),
544            (Err(err), _time) => Err((DataflowError::from(err), init_time, diff)),
545        });
546
547        (
548            oks.as_collection().flat_map(|x| x),
549            errs.concat(errs2.as_collection()),
550        )
551    } else {
552        let oks = differential_dogs3::operators::half_join::half_join_internal_unsafe(
553            updates,
554            trace,
555            |time, antichain| {
556                antichain.insert(time.step_back());
557            },
558            comparison,
559            |_timer, count| count > 1_000_000,
560            move |session: &mut Session<'_, '_, T, CB<Vec<_>>, _>,
561                  key,
562                  stream_row: &Row,
563                  lookup_row,
564                  initial,
565                  diff1,
566                  output| {
567                if output.is_empty() {
568                    return;
569                }
570
571                let mut row_builder = SharedRow::get();
572                let temp_storage = RowArena::new();
573
574                let mut datums_local = datums.borrow();
575                datums_local.extend(key.iter());
576                datums_local.extend(stream_row.iter());
577                datums_local.extend(lookup_row.to_datum_iter());
578
579                if let Some(row) = closure
580                    .apply(&mut datums_local, &temp_storage, &mut row_builder)
581                    .expect("Closure claimed to never error")
582                {
583                    for (time, diff2) in output.drain(..) {
584                        let diff = diff1.clone() * diff2.clone();
585                        session.give(((row.clone(), time.clone()), initial.clone(), diff));
586                    }
587                }
588            },
589        );
590
591        (oks.as_collection(), errs)
592    }
593}
594
595/// Builds the beginning of the update stream of a delta path.
596///
597/// At start-up time only the delta path for the first relation sees updates, since any updates fed to the
598/// other delta paths would be discarded anyway due to the tie-breaking logic that avoids double-counting
599/// updates happening at the same time on different relations.
600fn build_update_stream<'scope, T, Tr>(
601    trace: Arranged<'scope, Tr>,
602    as_of: Antichain<mz_repr::Timestamp>,
603    source_relation: usize,
604    initial_closure: JoinClosure,
605) -> (
606    VecCollection<'scope, T, Row, Diff>,
607    VecCollection<'scope, T, DataflowError, Diff>,
608)
609where
610    T: RenderTimestamp,
611    for<'a, 'b> &'a T: PartialEq<Tr::TimeGat<'b>>,
612    Tr: for<'a> TraceReader<Time = T, Diff = Diff> + Clone + 'static,
613    for<'a> Tr::Key<'a>: ToDatumIter,
614    for<'a> Tr::Val<'a>: ToDatumIter,
615{
616    let mut inner_as_of = Antichain::new();
617    for event_time in as_of.elements().iter() {
618        inner_as_of.insert(<T>::to_inner(event_time.clone()));
619    }
620
621    let (ok_stream, err_stream) =
622        trace
623            .stream
624            .unary_fallible(Pipeline, "UpdateStream", move |_, _| {
625                let mut datums = DatumVec::new();
626                Box::new(move |input, ok_output, err_output| {
627                    // Buffer to accumulate contributing (time, diff) pairs for each (key, val).
628                    let mut times_diffs = Vec::default();
629                    input.for_each(|time, data| {
630                        let mut row_builder = SharedRow::get();
631                        let mut ok_session = ok_output.session(&time);
632                        let mut err_session = err_output.session(&time);
633
634                        for wrapper in data.iter() {
635                            let batch = &wrapper;
636                            let mut cursor = batch.cursor();
637                            while let Some(key) = cursor.get_key(batch) {
638                                while let Some(val) = cursor.get_val(batch) {
639                                    // Collect contributing (time, diff) pairs before invoking the closure.
640                                    cursor.map_times(batch, |time, diff| {
641                                        if source_relation == 0
642                                            || inner_as_of.elements().iter().all(|e| e != time)
643                                        {
644                                            // TODO: Consolidate as we push, defensively.
645                                            times_diffs
646                                                .push((Tr::owned_time(time), Tr::owned_diff(diff)));
647                                        }
648                                    });
649                                    differential_dataflow::consolidation::consolidate(
650                                        &mut times_diffs,
651                                    );
652                                    // The can not-uncommonly be empty, if the inbound updates cancel.
653                                    if !times_diffs.is_empty() {
654                                        let temp_storage = RowArena::new();
655
656                                        let mut datums_local = datums.borrow();
657                                        datums_local.extend(key.to_datum_iter());
658                                        datums_local.extend(val.to_datum_iter());
659
660                                        if !initial_closure.is_identity() {
661                                            match initial_closure
662                                                .apply(
663                                                    &mut datums_local,
664                                                    &temp_storage,
665                                                    &mut row_builder,
666                                                )
667                                                .map(|row| row.cloned())
668                                                .transpose()
669                                            {
670                                                Some(Ok(row)) => {
671                                                    for (time, diff) in times_diffs.drain(..) {
672                                                        ok_session.give((row.clone(), time, diff))
673                                                    }
674                                                }
675                                                Some(Err(err)) => {
676                                                    for (time, diff) in times_diffs.drain(..) {
677                                                        err_session.give((err.clone(), time, diff))
678                                                    }
679                                                }
680                                                None => {}
681                                            }
682                                        } else {
683                                            let row = {
684                                                row_builder.packer().extend(&*datums_local);
685                                                row_builder.clone()
686                                            };
687                                            for (time, diff) in times_diffs.drain(..) {
688                                                ok_session.give((row.clone(), time, diff));
689                                            }
690                                        }
691                                    }
692                                    times_diffs.clear();
693
694                                    cursor.step_val(batch);
695                                }
696                                cursor.step_key(batch);
697                            }
698                        }
699                    });
700                })
701            });
702
703    (
704        ok_stream.as_collection(),
705        err_stream.as_collection().map(DataflowError::from),
706    )
707}