mz_compute/render/join/delta_join.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Delta join execution dataflow construction.
11//!
12//! Consult [DeltaJoinPlan] documentation for details.
13
14#![allow(clippy::op_ref)]
15
16use std::collections::{BTreeMap, BTreeSet};
17
18use std::rc::Rc;
19
20use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
21use differential_dataflow::operators::arrange::Arranged;
22use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
23use differential_dataflow::{AsCollection, VecCollection};
24use mz_compute_types::dyncfgs::ENABLE_HALF_JOIN2;
25use mz_compute_types::plan::join::JoinClosure;
26use mz_compute_types::plan::join::delta_join::{DeltaJoinPlan, DeltaPathPlan, DeltaStagePlan};
27use mz_dyncfg::ConfigSet;
28use mz_expr::MirScalarExpr;
29use mz_repr::fixed_length::ToDatumIter;
30use mz_repr::{DatumVec, Diff, Row, RowArena, SharedRow};
31use mz_storage_types::errors::DataflowError;
32use mz_timely_util::operator::{CollectionExt, StreamExt};
33use timely::container::CapacityContainerBuilder;
34use timely::dataflow::Scope;
35use timely::dataflow::channels::pact::Pipeline;
36use timely::dataflow::operators::OkErr;
37use timely::dataflow::operators::generic::Session;
38use timely::dataflow::operators::vec::Map;
39use timely::progress::Antichain;
40use timely::progress::timestamp::Refines;
41
42use crate::render::RenderTimestamp;
43use crate::render::context::{ArrangementFlavor, CollectionBundle, Context};
44use crate::typedefs::{RowRowAgent, RowRowEnter};
45
46impl<G> Context<G>
47where
48 G: Scope,
49 G::Timestamp: RenderTimestamp,
50{
51 /// Renders `MirRelationExpr:Join` using dogs^3 delta query dataflows.
52 ///
53 /// The join is followed by the application of `map_filter_project`, whose
54 /// implementation will be pushed in to the join pipeline if at all possible.
55 pub fn render_delta_join(
56 &self,
57 inputs: Vec<CollectionBundle<G>>,
58 join_plan: DeltaJoinPlan,
59 ) -> CollectionBundle<G> {
60 // We create a new region to contain the dataflow paths for the delta join.
61 let (oks, errs) = self.scope.clone().region_named("Join(Delta)", |inner| {
62 // Collects error streams for the ambient scope.
63 let mut inner_errs = Vec::new();
64
65 // Deduplicate the error streams of multiply used arrangements.
66 let mut err_dedup = BTreeSet::new();
67
68 // Our plan is to iterate through each input relation, and attempt
69 // to find a plan that maximally uses existing keys (better: uses
70 // existing arrangements, to which we have access).
71 let mut join_results = Vec::new();
72
73 // First let's prepare the input arrangements we will need.
74 // This reduces redundant imports, and simplifies the dataflow structure.
75 // As the arrangements are all shared, it should not dramatically improve
76 // the efficiency, but the dataflow simplification is worth doing.
77 let mut arrangements = BTreeMap::new();
78 for path_plan in join_plan.path_plans.iter() {
79 for stage_plan in path_plan.stage_plans.iter() {
80 let lookup_idx = stage_plan.lookup_relation;
81 let lookup_key = stage_plan.lookup_key.clone();
82 arrangements
83 .entry((lookup_idx, lookup_key.clone()))
84 .or_insert_with(|| {
85 match inputs[lookup_idx]
86 .arrangement(&lookup_key)
87 .unwrap_or_else(|| {
88 panic!(
89 "Arrangement alarmingly absent!: {}, {:?}",
90 lookup_idx, lookup_key,
91 )
92 }) {
93 ArrangementFlavor::Local(oks, errs) => {
94 if err_dedup.insert((lookup_idx, lookup_key)) {
95 inner_errs.push(
96 errs.enter_region(inner)
97 .as_collection(|k, _v| k.clone()),
98 );
99 }
100 Ok(oks.enter_region(inner))
101 }
102 ArrangementFlavor::Trace(_gid, oks, errs) => {
103 if err_dedup.insert((lookup_idx, lookup_key)) {
104 inner_errs.push(
105 errs.enter_region(inner)
106 .as_collection(|k, _v| k.clone()),
107 );
108 }
109 Err(oks.enter_region(inner))
110 }
111 }
112 });
113 }
114 }
115
116 for path_plan in join_plan.path_plans {
117 // Deconstruct the stages of the path plan.
118 let DeltaPathPlan {
119 source_relation,
120 initial_closure,
121 stage_plans,
122 final_closure,
123 source_key,
124 } = path_plan;
125
126 // This collection determines changes that result from updates inbound
127 // from `inputs[relation]` and reflects all strictly prior updates and
128 // concurrent updates from relations prior to `relation`.
129 let name = format!("delta path {}", source_relation);
130 let path_results = inner.clone().region_named(&name, |region| {
131 // The plan is to move through each relation, starting from `relation` and in the order
132 // indicated in `orders[relation]`. At each moment, we will have the columns from the
133 // subset of relations encountered so far, and we will have applied as much as we can
134 // of the filters in `equivalences` and the logic in `map_filter_project`, based on the
135 // available columns.
136 //
137 // As we go, we will track the physical locations of each intended output column, as well
138 // as the locations of intermediate results from partial application of `map_filter_project`.
139 //
140 // Just before we apply the `lookup` function to perform a join, we will first use our
141 // available information to determine the filtering and logic that we can apply, and
142 // introduce that in to the `lookup` logic to cause it to happen in that operator.
143
144 // Collects error streams for the region scope. Concats before leaving.
145 let mut region_errs = Vec::with_capacity(inputs.len());
146
147 // Ensure this input is rendered, and extract its update stream.
148 let val = arrangements
149 .get(&(source_relation, source_key))
150 .expect("Arrangement promised by the planner is absent!");
151 let as_of = self.as_of_frontier.clone();
152 let update_stream = match val {
153 Ok(local) => {
154 let arranged = local.clone().enter_region(region);
155 let (update_stream, err_stream) =
156 build_update_stream::<_, RowRowAgent<_, _>>(
157 arranged,
158 as_of,
159 source_relation,
160 initial_closure,
161 );
162 region_errs.push(err_stream);
163 update_stream
164 }
165 Err(trace) => {
166 let arranged = trace.clone().enter_region(region);
167 let (update_stream, err_stream) =
168 build_update_stream::<_, RowRowEnter<_, _, _>>(
169 arranged,
170 as_of,
171 source_relation,
172 initial_closure,
173 );
174 region_errs.push(err_stream);
175 update_stream
176 }
177 };
178 // Promote `time` to a datum element.
179 //
180 // The `half_join` operator manipulates as "data" a pair `(data, time)`,
181 // while tracking the initial time `init_time` separately and without
182 // modification. The initial value for both times is the initial time.
183 let mut update_stream = update_stream
184 .inner
185 .map(|(v, t, d)| ((v, t.clone()), t, d))
186 .as_collection();
187
188 // Repeatedly update `update_stream` to reflect joins with more and more
189 // other relations, in the specified order.
190 for stage_plan in stage_plans {
191 let DeltaStagePlan {
192 lookup_relation,
193 stream_key,
194 stream_thinning,
195 lookup_key,
196 closure,
197 } = stage_plan;
198
199 // We require different logic based on the relative order of the two inputs.
200 // If the `source` relation precedes the `lookup` relation, we present all
201 // updates with less or equal `time`, and otherwise we present only updates
202 // with strictly less `time`.
203 //
204 // We need to write the logic twice, as there are two types of arrangement
205 // we might have: either dataflow-local or an imported trace.
206 let (oks, errs) =
207 match arrangements.get(&(lookup_relation, lookup_key)).unwrap() {
208 Ok(local) => {
209 if source_relation < lookup_relation {
210 build_halfjoin::<_, RowRowAgent<_, _>, _>(
211 update_stream,
212 local.clone().enter_region(region),
213 stream_key,
214 stream_thinning,
215 |t1, t2| t1.le(t2),
216 closure,
217 Rc::clone(&self.config_set),
218 )
219 } else {
220 build_halfjoin::<_, RowRowAgent<_, _>, _>(
221 update_stream,
222 local.clone().enter_region(region),
223 stream_key,
224 stream_thinning,
225 |t1, t2| t1.lt(t2),
226 closure,
227 Rc::clone(&self.config_set),
228 )
229 }
230 }
231 Err(trace) => {
232 if source_relation < lookup_relation {
233 build_halfjoin::<_, RowRowEnter<_, _, _>, _>(
234 update_stream,
235 trace.clone().enter_region(region),
236 stream_key,
237 stream_thinning,
238 |t1, t2| t1.le(t2),
239 closure,
240 Rc::clone(&self.config_set),
241 )
242 } else {
243 build_halfjoin::<_, RowRowEnter<_, _, _>, _>(
244 update_stream,
245 trace.clone().enter_region(region),
246 stream_key,
247 stream_thinning,
248 |t1, t2| t1.lt(t2),
249 closure,
250 Rc::clone(&self.config_set),
251 )
252 }
253 }
254 };
255 update_stream = oks;
256 region_errs.push(errs);
257 }
258
259 // Delay updates as appropriate.
260 //
261 // The `half_join` operator maintains a time that we now discard (the `_`),
262 // and replace with the `time` that is maintained with the data. The former
263 // exists to pin a consistent total order on updates throughout the process,
264 // while allowing `time` to vary upwards as a result of actions on time.
265 let mut update_stream = update_stream
266 .inner
267 .map(|((row, time), _, diff)| (row, time, diff))
268 .as_collection();
269
270 // We have completed the join building, but may have work remaining.
271 // For example, we may have expressions not pushed down (e.g. literals)
272 // and projections that could not be applied (e.g. column repetition).
273 if let Some(final_closure) = final_closure {
274 let name = "DeltaJoinFinalization";
275 type CB<C> = ConsolidatingContainerBuilder<C>;
276 let (updates, errors) = update_stream
277 .flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>(name, {
278 // Reuseable allocation for unpacking.
279 let mut datums = DatumVec::new();
280 move |row| {
281 let mut row_builder = SharedRow::get();
282 let temp_storage = RowArena::new();
283 let mut datums_local = datums.borrow_with(&row);
284 // TODO(mcsherry): re-use `row` allocation.
285 final_closure
286 .apply(&mut datums_local, &temp_storage, &mut row_builder)
287 .map(|row| row.cloned())
288 .map_err(DataflowError::from)
289 .transpose()
290 }
291 });
292
293 update_stream = updates;
294 region_errs.push(errors);
295 }
296
297 inner_errs.push(
298 differential_dataflow::collection::concatenate(region, region_errs)
299 .leave_region(),
300 );
301 update_stream.leave_region()
302 });
303
304 join_results.push(path_results);
305 }
306
307 // Concatenate the results of each delta query as the accumulated results.
308 (
309 differential_dataflow::collection::concatenate(inner, join_results).leave_region(),
310 differential_dataflow::collection::concatenate(inner, inner_errs).leave_region(),
311 )
312 });
313 CollectionBundle::from_collections(oks, errs)
314 }
315}
316
317/// Constructs a `half_join` from supplied arguments.
318///
319/// This method exists to factor common logic from four code paths that are generic over the type of trace.
320/// The `comparison` function should either be `le` or `lt` depending on which relation comes first in the
321/// total order on relations (in order to break ties consistently).
322///
323/// The input and output streams are of pairs `(data, time)` where the `time` component can be greater than
324/// the time of the update. This operator may manipulate `time` as part of this pair, but will not manipulate
325/// the time of the update. This is crucial for correctness, as the total order on times of updates is used
326/// to ensure that any two updates are matched at most once.
327fn build_halfjoin<G, Tr, CF>(
328 updates: VecCollection<G, (Row, G::Timestamp), Diff>,
329 trace: Arranged<G, Tr>,
330 prev_key: Vec<MirScalarExpr>,
331 prev_thinning: Vec<usize>,
332 comparison: CF,
333 closure: JoinClosure,
334 config_set: Rc<ConfigSet>,
335) -> (
336 VecCollection<G, (Row, G::Timestamp), Diff>,
337 VecCollection<G, DataflowError, Diff>,
338)
339where
340 G: Scope,
341 G::Timestamp: RenderTimestamp,
342 Tr: TraceReader<KeyOwn = Row, Time = G::Timestamp, Diff = Diff> + Clone + 'static,
343 for<'a> Tr::Val<'a>: ToDatumIter,
344 CF: Fn(Tr::TimeGat<'_>, &G::Timestamp) -> bool + 'static,
345{
346 let use_half_join2 = ENABLE_HALF_JOIN2.get(&config_set);
347
348 let name = "DeltaJoinKeyPreparation";
349 type CB<C> = CapacityContainerBuilder<C>;
350 let (updates, errs) = updates.map_fallible::<CB<_>, CB<_>, _, _, _>(name, {
351 // Reuseable allocation for unpacking.
352 let mut datums = DatumVec::new();
353 move |(row, time)| {
354 let temp_storage = RowArena::new();
355 let datums_local = datums.borrow_with(&row);
356 let mut row_builder = SharedRow::get();
357 row_builder.packer().try_extend(
358 prev_key
359 .iter()
360 .map(|e| e.eval(&datums_local, &temp_storage)),
361 )?;
362 let key = row_builder.clone();
363 row_builder
364 .packer()
365 .extend(prev_thinning.iter().map(|&c| datums_local[c]));
366 let row_value = row_builder.clone();
367
368 Ok((key, row_value, time))
369 }
370 });
371 let datums = DatumVec::new();
372
373 if use_half_join2 {
374 build_halfjoin2(updates, trace, comparison, closure, datums, errs)
375 } else {
376 build_halfjoin1(updates, trace, comparison, closure, datums, errs)
377 }
378}
379
380/// `half_join2` implementation (less-quadratic, new default).
381fn build_halfjoin2<G, Tr, CF>(
382 updates: VecCollection<G, (Row, Row, G::Timestamp), Diff>,
383 trace: Arranged<G, Tr>,
384 comparison: CF,
385 closure: JoinClosure,
386 mut datums: DatumVec,
387 errs: VecCollection<G, DataflowError, Diff>,
388) -> (
389 VecCollection<G, (Row, G::Timestamp), Diff>,
390 VecCollection<G, DataflowError, Diff>,
391)
392where
393 G: Scope,
394 G::Timestamp: RenderTimestamp,
395 Tr: TraceReader<KeyOwn = Row, Time = G::Timestamp, Diff = Diff> + Clone + 'static,
396 for<'a> Tr::Val<'a>: ToDatumIter,
397 CF: Fn(Tr::TimeGat<'_>, &G::Timestamp) -> bool + 'static,
398{
399 type CB<C> = CapacityContainerBuilder<C>;
400
401 if closure.could_error() {
402 let (oks, errs2) = differential_dogs3::operators::half_join2::half_join_internal_unsafe(
403 updates,
404 trace,
405 |time, antichain| {
406 antichain.insert(time.step_back());
407 },
408 comparison,
409 // TODO(mcsherry): investigate/establish trade-offs here; time based had problems,
410 // in that we seem to yield too much and do too little work when we do.
411 |_timer, count| count > 1_000_000,
412 // TODO(mcsherry): consider `RefOrMut` in `half_join` interface to allow re-use.
413 move |session: &mut CB<Vec<_>>, key, stream_row, lookup_row, initial, diff1, output| {
414 let mut row_builder = SharedRow::get();
415 let temp_storage = RowArena::new();
416
417 let mut datums_local = datums.borrow();
418 datums_local.extend(key.iter());
419 datums_local.extend(stream_row.iter());
420 datums_local.extend(lookup_row.to_datum_iter());
421
422 let row = closure.apply(&mut datums_local, &temp_storage, &mut row_builder);
423
424 for (time, diff2) in output.drain(..) {
425 let row = row.as_ref().map(|row| row.cloned()).map_err(Clone::clone);
426 let diff = diff1.clone() * diff2.clone();
427 let data = ((row, time.clone()), initial.clone(), diff);
428 use timely::container::PushInto;
429 session.push_into(data);
430 }
431 },
432 )
433 .ok_err(|(data_time, init_time, diff)| {
434 // TODO(mcsherry): consider `ok_err()` for `Collection`.
435 match data_time {
436 (Ok(data), time) => Ok((data.map(|data| (data, time)), init_time, diff)),
437 (Err(err), _time) => Err((DataflowError::from(err), init_time, diff)),
438 }
439 });
440
441 (
442 oks.as_collection().flat_map(|x| x),
443 errs.concat(errs2.as_collection()),
444 )
445 } else {
446 let oks = differential_dogs3::operators::half_join2::half_join_internal_unsafe(
447 updates,
448 trace,
449 |time, antichain| {
450 antichain.insert(time.step_back());
451 },
452 comparison,
453 // TODO(mcsherry): investigate/establish trade-offs here; time based had problems,
454 // in that we seem to yield too much and do too little work when we do.
455 |_timer, count| count > 1_000_000,
456 // TODO(mcsherry): consider `RefOrMut` in `half_join` interface to allow re-use.
457 move |session: &mut CB<Vec<_>>, key, stream_row, lookup_row, initial, diff1, output| {
458 if output.is_empty() {
459 return;
460 }
461
462 let mut row_builder = SharedRow::get();
463 let temp_storage = RowArena::new();
464
465 let mut datums_local = datums.borrow();
466 datums_local.extend(key.iter());
467 datums_local.extend(stream_row.iter());
468 datums_local.extend(lookup_row.to_datum_iter());
469
470 if let Some(row) = closure
471 .apply(&mut datums_local, &temp_storage, &mut row_builder)
472 .expect("Closure claimed to never error")
473 {
474 for (time, diff2) in output.drain(..) {
475 let diff = diff1.clone() * diff2.clone();
476 use timely::container::PushInto;
477 session.push_into(((row.clone(), time.clone()), initial.clone(), diff));
478 }
479 }
480 },
481 );
482
483 (oks.as_collection(), errs)
484 }
485}
486
487/// Original `half_join` implementation (fallback).
488fn build_halfjoin1<G, Tr, CF>(
489 updates: VecCollection<G, (Row, Row, G::Timestamp), Diff>,
490 trace: Arranged<G, Tr>,
491 comparison: CF,
492 closure: JoinClosure,
493 mut datums: DatumVec,
494 errs: VecCollection<G, DataflowError, Diff>,
495) -> (
496 VecCollection<G, (Row, G::Timestamp), Diff>,
497 VecCollection<G, DataflowError, Diff>,
498)
499where
500 G: Scope,
501 G::Timestamp: RenderTimestamp,
502 Tr: TraceReader<KeyOwn = Row, Time = G::Timestamp, Diff = Diff> + Clone + 'static,
503 for<'a> Tr::Val<'a>: ToDatumIter,
504 CF: Fn(Tr::TimeGat<'_>, &G::Timestamp) -> bool + 'static,
505{
506 type CB<C> = CapacityContainerBuilder<C>;
507
508 if closure.could_error() {
509 let (oks, errs2) = differential_dogs3::operators::half_join::half_join_internal_unsafe(
510 updates,
511 trace,
512 |time, antichain| {
513 antichain.insert(time.step_back());
514 },
515 comparison,
516 |_timer, count| count > 1_000_000,
517 move |session: &mut Session<'_, '_, G::Timestamp, CB<Vec<_>>, _>,
518 key,
519 stream_row: &Row,
520 lookup_row,
521 initial,
522 diff1,
523 output| {
524 let mut row_builder = SharedRow::get();
525 let temp_storage = RowArena::new();
526
527 let mut datums_local = datums.borrow();
528 datums_local.extend(key.iter());
529 datums_local.extend(stream_row.iter());
530 datums_local.extend(lookup_row.to_datum_iter());
531
532 let row = closure.apply(&mut datums_local, &temp_storage, &mut row_builder);
533
534 for (time, diff2) in output.drain(..) {
535 let row = row.as_ref().map(|row| row.cloned()).map_err(Clone::clone);
536 let diff = diff1.clone() * diff2.clone();
537 let data = ((row, time.clone()), initial.clone(), diff);
538 session.give(data);
539 }
540 },
541 )
542 .ok_err(|(data_time, init_time, diff)| match data_time {
543 (Ok(data), time) => Ok((data.map(|data| (data, time)), init_time, diff)),
544 (Err(err), _time) => Err((DataflowError::from(err), init_time, diff)),
545 });
546
547 (
548 oks.as_collection().flat_map(|x| x),
549 errs.concat(errs2.as_collection()),
550 )
551 } else {
552 let oks = differential_dogs3::operators::half_join::half_join_internal_unsafe(
553 updates,
554 trace,
555 |time, antichain| {
556 antichain.insert(time.step_back());
557 },
558 comparison,
559 |_timer, count| count > 1_000_000,
560 move |session: &mut Session<'_, '_, G::Timestamp, CB<Vec<_>>, _>,
561 key,
562 stream_row: &Row,
563 lookup_row,
564 initial,
565 diff1,
566 output| {
567 if output.is_empty() {
568 return;
569 }
570
571 let mut row_builder = SharedRow::get();
572 let temp_storage = RowArena::new();
573
574 let mut datums_local = datums.borrow();
575 datums_local.extend(key.iter());
576 datums_local.extend(stream_row.iter());
577 datums_local.extend(lookup_row.to_datum_iter());
578
579 if let Some(row) = closure
580 .apply(&mut datums_local, &temp_storage, &mut row_builder)
581 .expect("Closure claimed to never error")
582 {
583 for (time, diff2) in output.drain(..) {
584 let diff = diff1.clone() * diff2.clone();
585 session.give(((row.clone(), time.clone()), initial.clone(), diff));
586 }
587 }
588 },
589 );
590
591 (oks.as_collection(), errs)
592 }
593}
594
595/// Builds the beginning of the update stream of a delta path.
596///
597/// At start-up time only the delta path for the first relation sees updates, since any updates fed to the
598/// other delta paths would be discarded anyway due to the tie-breaking logic that avoids double-counting
599/// updates happening at the same time on different relations.
600fn build_update_stream<G, Tr>(
601 trace: Arranged<G, Tr>,
602 as_of: Antichain<mz_repr::Timestamp>,
603 source_relation: usize,
604 initial_closure: JoinClosure,
605) -> (
606 VecCollection<G, Row, Diff>,
607 VecCollection<G, DataflowError, Diff>,
608)
609where
610 G: Scope,
611 G::Timestamp: RenderTimestamp,
612 for<'a, 'b> &'a G::Timestamp: PartialEq<Tr::TimeGat<'b>>,
613 Tr: for<'a> TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
614 for<'a> Tr::Key<'a>: ToDatumIter,
615 for<'a> Tr::Val<'a>: ToDatumIter,
616{
617 let mut inner_as_of = Antichain::new();
618 for event_time in as_of.elements().iter() {
619 inner_as_of.insert(<G::Timestamp>::to_inner(event_time.clone()));
620 }
621
622 let (ok_stream, err_stream) =
623 trace
624 .stream
625 .unary_fallible(Pipeline, "UpdateStream", move |_, _| {
626 let mut datums = DatumVec::new();
627 Box::new(move |input, ok_output, err_output| {
628 // Buffer to accumulate contributing (time, diff) pairs for each (key, val).
629 let mut times_diffs = Vec::default();
630 input.for_each(|time, data| {
631 let mut row_builder = SharedRow::get();
632 let mut ok_session = ok_output.session(&time);
633 let mut err_session = err_output.session(&time);
634
635 for wrapper in data.iter() {
636 let batch = &wrapper;
637 let mut cursor = batch.cursor();
638 while let Some(key) = cursor.get_key(batch) {
639 while let Some(val) = cursor.get_val(batch) {
640 // Collect contributing (time, diff) pairs before invoking the closure.
641 cursor.map_times(batch, |time, diff| {
642 if source_relation == 0
643 || inner_as_of.elements().iter().all(|e| e != time)
644 {
645 // TODO: Consolidate as we push, defensively.
646 times_diffs
647 .push((Tr::owned_time(time), Tr::owned_diff(diff)));
648 }
649 });
650 differential_dataflow::consolidation::consolidate(
651 &mut times_diffs,
652 );
653 // The can not-uncommonly be empty, if the inbound updates cancel.
654 if !times_diffs.is_empty() {
655 let temp_storage = RowArena::new();
656
657 let mut datums_local = datums.borrow();
658 datums_local.extend(key.to_datum_iter());
659 datums_local.extend(val.to_datum_iter());
660
661 if !initial_closure.is_identity() {
662 match initial_closure
663 .apply(
664 &mut datums_local,
665 &temp_storage,
666 &mut row_builder,
667 )
668 .map(|row| row.cloned())
669 .transpose()
670 {
671 Some(Ok(row)) => {
672 for (time, diff) in times_diffs.drain(..) {
673 ok_session.give((row.clone(), time, diff))
674 }
675 }
676 Some(Err(err)) => {
677 for (time, diff) in times_diffs.drain(..) {
678 err_session.give((err.clone(), time, diff))
679 }
680 }
681 None => {}
682 }
683 } else {
684 let row = {
685 row_builder.packer().extend(&*datums_local);
686 row_builder.clone()
687 };
688 for (time, diff) in times_diffs.drain(..) {
689 ok_session.give((row.clone(), time, diff));
690 }
691 }
692 }
693 times_diffs.clear();
694
695 cursor.step_val(batch);
696 }
697 cursor.step_key(batch);
698 }
699 }
700 });
701 })
702 });
703
704 (
705 ok_stream.as_collection(),
706 err_stream.as_collection().map(DataflowError::from),
707 )
708}