1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Delta join execution dataflow construction.
//!
//! Consult [DeltaJoinPlan] documentation for details.

#![allow(clippy::op_ref)]

use std::collections::{BTreeMap, BTreeSet};

use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
use differential_dataflow::{AsCollection, Collection, ExchangeData, Hashable};
use mz_compute_types::plan::join::delta_join::{DeltaJoinPlan, DeltaPathPlan, DeltaStagePlan};
use mz_compute_types::plan::join::JoinClosure;
use mz_expr::MirScalarExpr;
use mz_repr::fixed_length::{FromDatumIter, ToDatumIter};
use mz_repr::{DatumVec, Diff, Row, RowArena, SharedRow};
use mz_storage_types::errors::DataflowError;
use mz_timely_util::operator::{CollectionExt, StreamExt};
use timely::container::columnation::Columnation;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::{Map, OkErr};
use timely::dataflow::Scope;
use timely::progress::timestamp::Refines;
use timely::progress::{Antichain, Timestamp};

use crate::render::context::{
    ArrangementFlavor, CollectionBundle, Context, MzArrangement, MzArrangementImport, ShutdownToken,
};
use crate::render::RenderTimestamp;
use crate::typedefs::{RowRowAgent, RowRowEnter};

impl<G> Context<G>
where
    G: Scope,
    G::Timestamp: crate::render::RenderTimestamp,
{
    /// Renders `MirRelationExpr:Join` using dogs^3 delta query dataflows.
    ///
    /// The join is followed by the application of `map_filter_project`, whose
    /// implementation will be pushed in to the join pipeline if at all possible.
    pub fn render_delta_join(
        &mut self,
        inputs: Vec<CollectionBundle<G>>,
        join_plan: DeltaJoinPlan,
    ) -> CollectionBundle<G> {
        // We create a new region to contain the dataflow paths for the delta join.
        let (oks, errs) = self.scope.clone().region_named("Join(Delta)", |inner| {
            // Collects error streams for the ambient scope.
            let mut inner_errs = Vec::new();

            // Deduplicate the error streams of multiply used arrangements.
            let mut err_dedup = BTreeSet::new();

            // Our plan is to iterate through each input relation, and attempt
            // to find a plan that maximally uses existing keys (better: uses
            // existing arrangements, to which we have access).
            let mut join_results = Vec::new();

            // First let's prepare the input arrangements we will need.
            // This reduces redundant imports, and simplifies the dataflow structure.
            // As the arrangements are all shared, it should not dramatically improve
            // the efficiency, but the dataflow simplification is worth doing.
            let mut arrangements = BTreeMap::new();
            for path_plan in join_plan.path_plans.iter() {
                for stage_plan in path_plan.stage_plans.iter() {
                    let lookup_idx = stage_plan.lookup_relation;
                    let lookup_key = stage_plan.lookup_key.clone();
                    arrangements
                        .entry((lookup_idx, lookup_key.clone()))
                        .or_insert_with(|| {
                            match inputs[lookup_idx]
                                .arrangement(&lookup_key)
                                .unwrap_or_else(|| {
                                    panic!(
                                        "Arrangement alarmingly absent!: {}, {:?}",
                                        lookup_idx, lookup_key,
                                    )
                                }) {
                                ArrangementFlavor::Local(oks, errs) => {
                                    if err_dedup.insert((lookup_idx, lookup_key)) {
                                        inner_errs.push(
                                            errs.enter_region(inner)
                                                .as_collection(|k, _v| k.clone()),
                                        );
                                    }
                                    Ok(oks.enter_region(inner))
                                }
                                ArrangementFlavor::Trace(_gid, oks, errs) => {
                                    if err_dedup.insert((lookup_idx, lookup_key)) {
                                        inner_errs.push(
                                            errs.enter_region(inner)
                                                .as_collection(|k, _v| k.clone()),
                                        );
                                    }
                                    Err(oks.enter_region(inner))
                                }
                            }
                        });
                }
            }

            for path_plan in join_plan.path_plans {
                // Deconstruct the stages of the path plan.
                let DeltaPathPlan {
                    source_relation,
                    initial_closure,
                    stage_plans,
                    final_closure,
                    source_key,
                } = path_plan;

                // This collection determines changes that result from updates inbound
                // from `inputs[relation]` and reflects all strictly prior updates and
                // concurrent updates from relations prior to `relation`.
                let name = format!("delta path {}", source_relation);
                let path_results = inner.clone().region_named(&name, |region| {
                    // The plan is to move through each relation, starting from `relation` and in the order
                    // indicated in `orders[relation]`. At each moment, we will have the columns from the
                    // subset of relations encountered so far, and we will have applied as much as we can
                    // of the filters in `equivalences` and the logic in `map_filter_project`, based on the
                    // available columns.
                    //
                    // As we go, we will track the physical locations of each intended output column, as well
                    // as the locations of intermediate results from partial application of `map_filter_project`.
                    //
                    // Just before we apply the `lookup` function to perform a join, we will first use our
                    // available information to determine the filtering and logic that we can apply, and
                    // introduce that in to the `lookup` logic to cause it to happen in that operator.

                    // Collects error streams for the region scope. Concats before leaving.
                    let mut region_errs = Vec::with_capacity(inputs.len());

                    // Ensure this input is rendered, and extract its update stream.
                    let val = arrangements
                        .get(&(source_relation, source_key))
                        .expect("Arrangement promised by the planner is absent!");
                    let as_of = self.as_of_frontier.clone();
                    let update_stream = match val {
                        Ok(local) => {
                            let arranged = local.enter_region(region);
                            let (update_stream, err_stream) = dispatch_build_update_stream_local(
                                arranged,
                                as_of,
                                source_relation,
                                initial_closure,
                            );
                            region_errs.push(err_stream);
                            update_stream
                        }
                        Err(trace) => {
                            let arranged = trace.enter_region(region);
                            let (update_stream, err_stream) = dispatch_build_update_stream_trace(
                                arranged,
                                as_of,
                                source_relation,
                                initial_closure,
                            );
                            region_errs.push(err_stream);
                            update_stream
                        }
                    };
                    // Promote `time` to a datum element.
                    //
                    // The `half_join` operator manipulates as "data" a pair `(data, time)`,
                    // while tracking the initial time `init_time` separately and without
                    // modification. The initial value for both times is the initial time.
                    let mut update_stream = update_stream
                        .inner
                        .map(|(v, t, d)| ((v, t.clone()), t, d))
                        .as_collection();

                    // Repeatedly update `update_stream` to reflect joins with more and more
                    // other relations, in the specified order.
                    for stage_plan in stage_plans {
                        let DeltaStagePlan {
                            lookup_relation,
                            stream_key,
                            stream_thinning,
                            lookup_key,
                            closure,
                        } = stage_plan;

                        // We require different logic based on the relative order of the two inputs.
                        // If the `source` relation precedes the `lookup` relation, we present all
                        // updates with less or equal `time`, and otherwise we present only updates
                        // with strictly less `time`.
                        //
                        // We need to write the logic twice, as there are two types of arrangement
                        // we might have: either dataflow-local or an imported trace.
                        let (oks, errs) =
                            match arrangements.get(&(lookup_relation, lookup_key)).unwrap() {
                                Ok(local) => {
                                    if source_relation < lookup_relation {
                                        dispatch_build_halfjoin_local(
                                            update_stream,
                                            local.enter_region(region),
                                            stream_key,
                                            stream_thinning,
                                            |t1, t2| t1.le(t2),
                                            closure,
                                            self.shutdown_token.clone(),
                                        )
                                    } else {
                                        dispatch_build_halfjoin_local(
                                            update_stream,
                                            local.enter_region(region),
                                            stream_key,
                                            stream_thinning,
                                            |t1, t2| t1.lt(t2),
                                            closure,
                                            self.shutdown_token.clone(),
                                        )
                                    }
                                }
                                Err(trace) => {
                                    if source_relation < lookup_relation {
                                        dispatch_build_halfjoin_trace(
                                            update_stream,
                                            trace.enter_region(region),
                                            stream_key,
                                            stream_thinning,
                                            |t1, t2| t1.le(t2),
                                            closure,
                                            self.shutdown_token.clone(),
                                        )
                                    } else {
                                        dispatch_build_halfjoin_trace(
                                            update_stream,
                                            trace.enter_region(region),
                                            stream_key,
                                            stream_thinning,
                                            |t1, t2| t1.lt(t2),
                                            closure,
                                            self.shutdown_token.clone(),
                                        )
                                    }
                                }
                            };
                        update_stream = oks;
                        region_errs.push(errs);
                    }

                    // Delay updates as appropriate.
                    //
                    // The `half_join` operator maintains a time that we now discard (the `_`),
                    // and replace with the `time` that is maintained with the data. The former
                    // exists to pin a consistent total order on updates throughout the process,
                    // while allowing `time` to vary upwards as a result of actions on time.
                    let mut update_stream = update_stream
                        .inner
                        .map(|((row, time), _, diff)| (row, time, diff))
                        .as_collection();

                    // We have completed the join building, but may have work remaining.
                    // For example, we may have expressions not pushed down (e.g. literals)
                    // and projections that could not be applied (e.g. column repetition).
                    if let Some(final_closure) = final_closure {
                        let (updates, errors) =
                            update_stream.flat_map_fallible("DeltaJoinFinalization", {
                                // Reuseable allocation for unpacking.
                                let mut datums = DatumVec::new();
                                move |row| {
                                    let binding = SharedRow::get();
                                    let mut row_builder = binding.borrow_mut();
                                    let temp_storage = RowArena::new();
                                    let mut datums_local = datums.borrow_with(&row);
                                    // TODO(mcsherry): re-use `row` allocation.
                                    final_closure
                                        .apply(&mut datums_local, &temp_storage, &mut row_builder)
                                        .map_err(DataflowError::from)
                                        .transpose()
                                }
                            });

                        update_stream = updates;
                        region_errs.push(errors);
                    }

                    inner_errs.push(
                        differential_dataflow::collection::concatenate(region, region_errs)
                            .leave_region(),
                    );
                    update_stream.leave_region()
                });

                join_results.push(path_results);
            }

            // Concatenate the results of each delta query as the accumulated results.
            (
                differential_dataflow::collection::concatenate(inner, join_results).leave_region(),
                differential_dataflow::collection::concatenate(inner, inner_errs).leave_region(),
            )
        });
        CollectionBundle::from_collections(oks, errs)
    }
}

/// Dispatches half-join construction according to arrangement type specialization.
fn dispatch_build_halfjoin_local<G, CF>(
    updates: Collection<G, (Row, G::Timestamp), Diff>,
    trace: MzArrangement<G>,
    prev_key: Vec<MirScalarExpr>,
    prev_thinning: Vec<usize>,
    comparison: CF,
    closure: JoinClosure,
    shutdown_token: ShutdownToken,
) -> (
    Collection<G, (Row, G::Timestamp), Diff>,
    Collection<G, DataflowError, Diff>,
)
where
    G: Scope,
    G::Timestamp: crate::render::RenderTimestamp,
    CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static,
{
    match trace {
        MzArrangement::RowRow(inner) => build_halfjoin::<_, RowRowAgent<_, _>, _>(
            updates,
            inner,
            prev_key,
            prev_thinning,
            comparison,
            closure,
            shutdown_token,
        ),
    }
}

/// Dispatches half-join construction according to trace type specialization.
fn dispatch_build_halfjoin_trace<G, T, CF>(
    updates: Collection<G, (Row, G::Timestamp), Diff>,
    trace: MzArrangementImport<G, T>,
    prev_key: Vec<MirScalarExpr>,
    prev_thinning: Vec<usize>,
    comparison: CF,
    closure: JoinClosure,
    shutdown_token: ShutdownToken,
) -> (
    Collection<G, (Row, G::Timestamp), Diff>,
    Collection<G, DataflowError, Diff>,
)
where
    G: Scope,
    T: Timestamp + Lattice + Columnation,
    G::Timestamp: Lattice + crate::render::RenderTimestamp + Refines<T> + Columnation,
    CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static,
{
    match trace {
        MzArrangementImport::RowRow(inner) => build_halfjoin::<_, RowRowEnter<_, _, _>, _>(
            updates,
            inner,
            prev_key,
            prev_thinning,
            comparison,
            closure,
            shutdown_token,
        ),
    }
}

/// Constructs a `half_join` from supplied arguments.
///
/// This method exists to factor common logic from four code paths that are generic over the type of trace.
/// The `comparison` function should either be `le` or `lt` depending on which relation comes first in the
/// total order on relations (in order to break ties consistently).
///
/// The input and output streams are of pairs `(data, time)` where the `time` component can be greater than
/// the time of the update. This operator may manipulate `time` as part of this pair, but will not manipulate
/// the time of the update. This is crucial for correctness, as the total order on times of updates is used
/// to ensure that any two updates are matched at most once.
fn build_halfjoin<G, Tr, CF>(
    updates: Collection<G, (Row, G::Timestamp), Diff>,
    trace: Arranged<G, Tr>,
    prev_key: Vec<MirScalarExpr>,
    prev_thinning: Vec<usize>,
    comparison: CF,
    closure: JoinClosure,
    shutdown_token: ShutdownToken,
) -> (
    Collection<G, (Row, G::Timestamp), Diff>,
    Collection<G, DataflowError, Diff>,
)
where
    G: Scope,
    G::Timestamp: crate::render::RenderTimestamp,
    Tr: TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
    Tr::KeyOwned: ExchangeData + Hashable + Default + FromDatumIter + ToDatumIter,
    for<'a> Tr::Val<'a>: ToDatumIter,
    CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static,
{
    let (updates, errs) = updates.map_fallible("DeltaJoinKeyPreparation", {
        // Reuseable allocation for unpacking.
        let mut datums = DatumVec::new();
        let mut key_buf = Tr::KeyOwned::default();
        move |(row, time)| {
            let temp_storage = RowArena::new();
            let datums_local = datums.borrow_with(&row);
            let key = key_buf.try_from_datum_iter(
                prev_key
                    .iter()
                    .map(|e| e.eval(&datums_local, &temp_storage)),
            )?;
            let binding = SharedRow::get();
            let mut row_builder = binding.borrow_mut();
            row_builder
                .packer()
                .extend(prev_thinning.iter().map(|&c| datums_local[c]));
            let row_value = row_builder.clone();

            Ok((key, row_value, time))
        }
    });

    let mut datums = DatumVec::new();

    if closure.could_error() {
        let (oks, errs2) = dogsdogsdogs::operators::half_join::half_join_internal_unsafe(
            &updates,
            trace,
            |time, antichain| {
                antichain.insert(time.step_back());
            },
            comparison,
            // TODO(mcsherry): investigate/establish trade-offs here; time based had problems,
            // in that we seem to yield too much and do too little work when we do.
            |_timer, count| count > 1_000_000,
            // TODO(mcsherry): consider `RefOrMut` in `half_join` interface to allow re-use.
            move |key, stream_row, lookup_row, initial, time, diff1, diff2| {
                // Check the shutdown token to avoid doing unnecessary work when the dataflow is
                // shutting down.
                shutdown_token.probe()?;

                let binding = SharedRow::get();
                let mut row_builder = binding.borrow_mut();
                let temp_storage = RowArena::new();

                let key = key.to_datum_iter();
                let stream_row = stream_row.to_datum_iter();
                let lookup_row = lookup_row.to_datum_iter();

                let mut datums_local = datums.borrow();
                datums_local.extend(key);
                datums_local.extend(stream_row);
                datums_local.extend(lookup_row);

                let row = closure.apply(&mut datums_local, &temp_storage, &mut row_builder);
                let diff = diff1.clone() * diff2.clone();
                let dout = (row, time.clone());
                Some((dout, initial.clone(), diff))
            },
        )
        .inner
        .ok_err(|(data_time, init_time, diff)| {
            // TODO(mcsherry): consider `ok_err()` for `Collection`.
            match data_time {
                (Ok(data), time) => Ok((data.map(|data| (data, time)), init_time, diff)),
                (Err(err), _time) => Err((DataflowError::from(err), init_time, diff)),
            }
        });

        (
            oks.as_collection().flat_map(|x| x),
            errs.concat(&errs2.as_collection()),
        )
    } else {
        let oks = dogsdogsdogs::operators::half_join::half_join_internal_unsafe(
            &updates,
            trace,
            |time, antichain| {
                antichain.insert(time.step_back());
            },
            comparison,
            // TODO(mcsherry): investigate/establish trade-offs here; time based had problems,
            // in that we seem to yield too much and do too little work when we do.
            |_timer, count| count > 1_000_000,
            // TODO(mcsherry): consider `RefOrMut` in `half_join` interface to allow re-use.
            move |key, stream_row, lookup_row, initial, time, diff1, diff2| {
                // Check the shutdown token to avoid doing unnecessary work when the dataflow is
                // shutting down.
                shutdown_token.probe()?;

                let binding = SharedRow::get();
                let mut row_builder = binding.borrow_mut();
                let temp_storage = RowArena::new();

                let key = key.to_datum_iter();
                let stream_row = stream_row.to_datum_iter();
                let lookup_row = lookup_row.to_datum_iter();

                let mut datums_local = datums.borrow();
                datums_local.extend(key);
                datums_local.extend(stream_row);
                datums_local.extend(lookup_row);

                let row = closure
                    .apply(&mut datums_local, &temp_storage, &mut row_builder)
                    .expect("Closure claimed to never errer");
                let diff = diff1.clone() * diff2.clone();
                row.map(|r| ((r, time.clone()), initial.clone(), diff))
            },
        );

        (oks, errs)
    }
}

/// Dispatches building of a delta path update stream by to arrangement type specialization.
fn dispatch_build_update_stream_local<G>(
    trace: MzArrangement<G>,
    as_of: Antichain<mz_repr::Timestamp>,
    source_relation: usize,
    initial_closure: JoinClosure,
) -> (Collection<G, Row, Diff>, Collection<G, DataflowError, Diff>)
where
    G: Scope,
    G::Timestamp: crate::render::RenderTimestamp,
{
    match trace {
        MzArrangement::RowRow(inner) => build_update_stream::<_, RowRowAgent<_, _>>(
            inner,
            as_of,
            source_relation,
            initial_closure,
        ),
    }
}

/// Dispatches building of a delta path update stream by to trace type specialization.
fn dispatch_build_update_stream_trace<G, T>(
    trace: MzArrangementImport<G, T>,
    as_of: Antichain<mz_repr::Timestamp>,
    source_relation: usize,
    initial_closure: JoinClosure,
) -> (Collection<G, Row, Diff>, Collection<G, DataflowError, Diff>)
where
    G: Scope,
    T: Timestamp + Lattice + Columnation,
    G::Timestamp: Lattice + crate::render::RenderTimestamp + Refines<T> + Columnation,
{
    match trace {
        MzArrangementImport::RowRow(inner) => build_update_stream::<_, RowRowEnter<_, _, _>>(
            inner,
            as_of,
            source_relation,
            initial_closure,
        ),
    }
}

/// Builds the beginning of the update stream of a delta path.
///
/// At start-up time only the delta path for the first relation sees updates, since any updates fed to the
/// other delta paths would be discarded anyway due to the tie-breaking logic that avoids double-counting
/// updates happening at the same time on different relations.
fn build_update_stream<G, Tr>(
    trace: Arranged<G, Tr>,
    as_of: Antichain<mz_repr::Timestamp>,
    source_relation: usize,
    initial_closure: JoinClosure,
) -> (Collection<G, Row, Diff>, Collection<G, DataflowError, Diff>)
where
    G: Scope,
    G::Timestamp: crate::render::RenderTimestamp,
    Tr: for<'a> TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
    for<'a> Tr::Key<'a>: ToDatumIter,
    for<'a> Tr::Val<'a>: ToDatumIter,
{
    let mut inner_as_of = Antichain::new();
    for event_time in as_of.elements().iter() {
        inner_as_of.insert(<G::Timestamp>::to_inner(event_time.clone()));
    }

    let (ok_stream, err_stream) =
        trace
            .stream
            .unary_fallible(Pipeline, "UpdateStream", move |_, _| {
                let mut datums = DatumVec::new();
                Box::new(move |input, ok_output, err_output| {
                    input.for_each(|time, data| {
                        let binding = SharedRow::get();
                        let mut row_builder = binding.borrow_mut();
                        let mut ok_session = ok_output.session(&time);
                        let mut err_session = err_output.session(&time);

                        for wrapper in data.iter() {
                            let batch = &wrapper;
                            let mut cursor = batch.cursor();
                            while let Some(key) = cursor.get_key(batch) {
                                while let Some(val) = cursor.get_val(batch) {
                                    cursor.map_times(batch, |time, diff| {
                                        // note: only the delta path for the first relation will see
                                        // updates at start-up time
                                        if source_relation == 0
                                            || !inner_as_of.elements().contains(time)
                                        {
                                            let temp_storage = RowArena::new();

                                            let key = key.to_datum_iter();
                                            let val = val.to_datum_iter();

                                            let mut datums_local = datums.borrow();
                                            datums_local.extend(key);
                                            datums_local.extend(val);

                                            if !initial_closure.is_identity() {
                                                match initial_closure
                                                    .apply(
                                                        &mut datums_local,
                                                        &temp_storage,
                                                        &mut row_builder,
                                                    )
                                                    .transpose()
                                                {
                                                    Some(Ok(row)) => ok_session.give((
                                                        row,
                                                        time.clone(),
                                                        diff.clone(),
                                                    )),
                                                    Some(Err(err)) => err_session.give((
                                                        err,
                                                        time.clone(),
                                                        diff.clone(),
                                                    )),
                                                    None => {}
                                                }
                                            } else {
                                                let row = {
                                                    row_builder.packer().extend(&*datums_local);
                                                    row_builder.clone()
                                                };
                                                ok_session.give((row, time.clone(), diff.clone()));
                                            }
                                        }
                                    });
                                    cursor.step_val(batch);
                                }
                                cursor.step_key(batch);
                            }
                        }
                    });
                })
            });

    (
        ok_stream.as_collection(),
        err_stream.as_collection().map(DataflowError::from),
    )
}