mz_compute/render/join/delta_join.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Delta join execution dataflow construction.
11//!
12//! Consult [DeltaJoinPlan] documentation for details.
13
14#![allow(clippy::op_ref)]
15
16use std::collections::{BTreeMap, BTreeSet};
17
18use std::rc::Rc;
19
20use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
21use differential_dataflow::operators::arrange::Arranged;
22use differential_dataflow::trace::implementations::BatchContainer;
23use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
24use differential_dataflow::{AsCollection, VecCollection};
25use mz_compute_types::dyncfgs::ENABLE_HALF_JOIN2;
26use mz_compute_types::plan::join::JoinClosure;
27use mz_compute_types::plan::join::delta_join::{DeltaJoinPlan, DeltaPathPlan, DeltaStagePlan};
28use mz_dyncfg::ConfigSet;
29use mz_expr::MirScalarExpr;
30use mz_repr::fixed_length::ToDatumIter;
31use mz_repr::{DatumVec, Diff, Row, RowArena, SharedRow};
32use mz_storage_types::errors::DataflowError;
33use mz_timely_util::operator::{CollectionExt, StreamExt};
34use timely::container::CapacityContainerBuilder;
35use timely::dataflow::channels::pact::Pipeline;
36use timely::dataflow::operators::OkErr;
37use timely::dataflow::operators::generic::Session;
38use timely::dataflow::operators::vec::Map;
39use timely::progress::Antichain;
40
41use crate::render::RenderTimestamp;
42use crate::render::context::{ArrangementFlavor, CollectionBundle, Context};
43use crate::typedefs::{RowRowAgent, RowRowEnter};
44
45impl<'scope, T: RenderTimestamp> Context<'scope, T> {
46 /// Renders `MirRelationExpr:Join` using dogs^3 delta query dataflows.
47 ///
48 /// The join is followed by the application of `map_filter_project`, whose
49 /// implementation will be pushed in to the join pipeline if at all possible.
50 pub fn render_delta_join(
51 &self,
52 inputs: Vec<CollectionBundle<'scope, T>>,
53 join_plan: DeltaJoinPlan,
54 ) -> CollectionBundle<'scope, T> {
55 // We create a new region to contain the dataflow paths for the delta join.
56 let (oks, errs) = self.scope.clone().region_named("Join(Delta)", |inner| {
57 // Collects error streams for the ambient scope.
58 let mut inner_errs = Vec::new();
59
60 // Deduplicate the error streams of multiply used arrangements.
61 let mut err_dedup = BTreeSet::new();
62
63 // Our plan is to iterate through each input relation, and attempt
64 // to find a plan that maximally uses existing keys (better: uses
65 // existing arrangements, to which we have access).
66 let mut join_results = Vec::new();
67
68 // First let's prepare the input arrangements we will need.
69 // This reduces redundant imports, and simplifies the dataflow structure.
70 // As the arrangements are all shared, it should not dramatically improve
71 // the efficiency, but the dataflow simplification is worth doing.
72 let mut arrangements = BTreeMap::new();
73 for path_plan in join_plan.path_plans.iter() {
74 for stage_plan in path_plan.stage_plans.iter() {
75 let lookup_idx = stage_plan.lookup_relation;
76 let lookup_key = stage_plan.lookup_key.clone();
77 arrangements
78 .entry((lookup_idx, lookup_key.clone()))
79 .or_insert_with(|| {
80 match inputs[lookup_idx]
81 .arrangement(&lookup_key)
82 .unwrap_or_else(|| {
83 panic!(
84 "Arrangement alarmingly absent!: {}, {:?}",
85 lookup_idx, lookup_key,
86 )
87 }) {
88 ArrangementFlavor::Local(oks, errs) => {
89 if err_dedup.insert((lookup_idx, lookup_key)) {
90 inner_errs.push(
91 errs.enter_region(inner)
92 .as_collection(|k, _v| k.clone()),
93 );
94 }
95 Ok(oks.enter_region(inner))
96 }
97 ArrangementFlavor::Trace(_gid, oks, errs) => {
98 if err_dedup.insert((lookup_idx, lookup_key)) {
99 inner_errs.push(
100 errs.enter_region(inner)
101 .as_collection(|k, _v| k.clone()),
102 );
103 }
104 Err(oks.enter_region(inner))
105 }
106 }
107 });
108 }
109 }
110
111 for path_plan in join_plan.path_plans {
112 // Deconstruct the stages of the path plan.
113 let DeltaPathPlan {
114 source_relation,
115 initial_closure,
116 stage_plans,
117 final_closure,
118 source_key,
119 } = path_plan;
120
121 // This collection determines changes that result from updates inbound
122 // from `inputs[relation]` and reflects all strictly prior updates and
123 // concurrent updates from relations prior to `relation`.
124 let name = format!("delta path {}", source_relation);
125 let path_results = inner.clone().region_named(&name, |region| {
126 // The plan is to move through each relation, starting from `relation` and in the order
127 // indicated in `orders[relation]`. At each moment, we will have the columns from the
128 // subset of relations encountered so far, and we will have applied as much as we can
129 // of the filters in `equivalences` and the logic in `map_filter_project`, based on the
130 // available columns.
131 //
132 // As we go, we will track the physical locations of each intended output column, as well
133 // as the locations of intermediate results from partial application of `map_filter_project`.
134 //
135 // Just before we apply the `lookup` function to perform a join, we will first use our
136 // available information to determine the filtering and logic that we can apply, and
137 // introduce that in to the `lookup` logic to cause it to happen in that operator.
138
139 // Collects error streams for the region scope. Concats before leaving.
140 let mut region_errs = Vec::with_capacity(inputs.len());
141
142 // Ensure this input is rendered, and extract its update stream.
143 let val = arrangements
144 .get(&(source_relation, source_key))
145 .expect("Arrangement promised by the planner is absent!");
146 let as_of = self.as_of_frontier.clone();
147 let update_stream = match val {
148 Ok(local) => {
149 let arranged = local.clone().enter_region(region);
150 let (update_stream, err_stream) =
151 build_update_stream::<_, RowRowAgent<_, _>>(
152 arranged,
153 as_of,
154 source_relation,
155 initial_closure,
156 );
157 region_errs.push(err_stream);
158 update_stream
159 }
160 Err(trace) => {
161 let arranged = trace.clone().enter_region(region);
162 let (update_stream, err_stream) =
163 build_update_stream::<_, RowRowEnter<_, _, _>>(
164 arranged,
165 as_of,
166 source_relation,
167 initial_closure,
168 );
169 region_errs.push(err_stream);
170 update_stream
171 }
172 };
173 // Promote `time` to a datum element.
174 //
175 // The `half_join` operator manipulates as "data" a pair `(data, time)`,
176 // while tracking the initial time `init_time` separately and without
177 // modification. The initial value for both times is the initial time.
178 let mut update_stream = update_stream
179 .inner
180 .map(|(v, t, d)| ((v, t.clone()), t, d))
181 .as_collection();
182
183 // Repeatedly update `update_stream` to reflect joins with more and more
184 // other relations, in the specified order.
185 for stage_plan in stage_plans {
186 let DeltaStagePlan {
187 lookup_relation,
188 stream_key,
189 stream_thinning,
190 lookup_key,
191 closure,
192 } = stage_plan;
193
194 // We require different logic based on the relative order of the two inputs.
195 // If the `source` relation precedes the `lookup` relation, we present all
196 // updates with less or equal `time`, and otherwise we present only updates
197 // with strictly less `time`.
198 //
199 // We need to write the logic twice, as there are two types of arrangement
200 // we might have: either dataflow-local or an imported trace.
201 let (oks, errs) =
202 match arrangements.get(&(lookup_relation, lookup_key)).unwrap() {
203 Ok(local) => {
204 if source_relation < lookup_relation {
205 build_halfjoin::<_, RowRowAgent<_, _>, _>(
206 update_stream,
207 local.clone().enter_region(region),
208 stream_key,
209 stream_thinning,
210 |t1, t2| t1.le(t2),
211 closure,
212 Rc::clone(&self.config_set),
213 )
214 } else {
215 build_halfjoin::<_, RowRowAgent<_, _>, _>(
216 update_stream,
217 local.clone().enter_region(region),
218 stream_key,
219 stream_thinning,
220 |t1, t2| t1.lt(t2),
221 closure,
222 Rc::clone(&self.config_set),
223 )
224 }
225 }
226 Err(trace) => {
227 if source_relation < lookup_relation {
228 build_halfjoin::<_, RowRowEnter<_, _, _>, _>(
229 update_stream,
230 trace.clone().enter_region(region),
231 stream_key,
232 stream_thinning,
233 |t1, t2| t1.le(t2),
234 closure,
235 Rc::clone(&self.config_set),
236 )
237 } else {
238 build_halfjoin::<_, RowRowEnter<_, _, _>, _>(
239 update_stream,
240 trace.clone().enter_region(region),
241 stream_key,
242 stream_thinning,
243 |t1, t2| t1.lt(t2),
244 closure,
245 Rc::clone(&self.config_set),
246 )
247 }
248 }
249 };
250 update_stream = oks;
251 region_errs.push(errs);
252 }
253
254 // Delay updates as appropriate.
255 //
256 // The `half_join` operator maintains a time that we now discard (the `_`),
257 // and replace with the `time` that is maintained with the data. The former
258 // exists to pin a consistent total order on updates throughout the process,
259 // while allowing `time` to vary upwards as a result of actions on time.
260 let mut update_stream = update_stream
261 .inner
262 .map(|((row, time), _, diff)| (row, time, diff))
263 .as_collection();
264
265 // We have completed the join building, but may have work remaining.
266 // For example, we may have expressions not pushed down (e.g. literals)
267 // and projections that could not be applied (e.g. column repetition).
268 if let Some(final_closure) = final_closure {
269 let name = "DeltaJoinFinalization";
270 type CB<C> = ConsolidatingContainerBuilder<C>;
271 let (updates, errors) = update_stream
272 .flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>(name, {
273 // Reuseable allocation for unpacking.
274 let mut datums = DatumVec::new();
275 move |row| {
276 let mut row_builder = SharedRow::get();
277 let temp_storage = RowArena::new();
278 let mut datums_local = datums.borrow_with(&row);
279 // TODO(mcsherry): re-use `row` allocation.
280 final_closure
281 .apply(&mut datums_local, &temp_storage, &mut row_builder)
282 .map(|row| row.cloned())
283 .map_err(DataflowError::from)
284 .transpose()
285 }
286 });
287
288 update_stream = updates;
289 region_errs.push(errors);
290 }
291
292 inner_errs.push(
293 differential_dataflow::collection::concatenate(region, region_errs)
294 .leave_region(inner),
295 );
296 update_stream.leave_region(inner)
297 });
298
299 join_results.push(path_results);
300 }
301
302 // Concatenate the results of each delta query as the accumulated results.
303 (
304 differential_dataflow::collection::concatenate(inner, join_results)
305 .leave_region(self.scope),
306 differential_dataflow::collection::concatenate(inner, inner_errs)
307 .leave_region(self.scope),
308 )
309 });
310 CollectionBundle::from_collections(oks, errs)
311 }
312}
313
314/// Constructs a `half_join` from supplied arguments.
315///
316/// This method exists to factor common logic from four code paths that are generic over the type of trace.
317/// The `comparison` function should either be `le` or `lt` depending on which relation comes first in the
318/// total order on relations (in order to break ties consistently).
319///
320/// The input and output streams are of pairs `(data, time)` where the `time` component can be greater than
321/// the time of the update. This operator may manipulate `time` as part of this pair, but will not manipulate
322/// the time of the update. This is crucial for correctness, as the total order on times of updates is used
323/// to ensure that any two updates are matched at most once.
324fn build_halfjoin<'scope, T, Tr, CF>(
325 updates: VecCollection<'scope, T, (Row, T), Diff>,
326 trace: Arranged<'scope, Tr>,
327 prev_key: Vec<MirScalarExpr>,
328 prev_thinning: Vec<usize>,
329 comparison: CF,
330 closure: JoinClosure,
331 config_set: Rc<ConfigSet>,
332) -> (
333 VecCollection<'scope, T, (Row, T), Diff>,
334 VecCollection<'scope, T, DataflowError, Diff>,
335)
336where
337 T: RenderTimestamp,
338 Tr: TraceReader<KeyContainer: BatchContainer<Owned = Row>, Time = T, Diff = Diff>
339 + Clone
340 + 'static,
341 for<'a> Tr::Val<'a>: ToDatumIter,
342 CF: Fn(Tr::TimeGat<'_>, &T) -> bool + 'static,
343{
344 let use_half_join2 = ENABLE_HALF_JOIN2.get(&config_set);
345
346 let name = "DeltaJoinKeyPreparation";
347 type CB<C> = CapacityContainerBuilder<C>;
348 let (updates, errs) = updates.map_fallible::<CB<_>, CB<_>, _, _, _>(name, {
349 // Reuseable allocation for unpacking.
350 let mut datums = DatumVec::new();
351 move |(row, time)| {
352 let temp_storage = RowArena::new();
353 let datums_local = datums.borrow_with(&row);
354 let mut row_builder = SharedRow::get();
355 row_builder.packer().try_extend(
356 prev_key
357 .iter()
358 .map(|e| e.eval(&datums_local, &temp_storage)),
359 )?;
360 let key = row_builder.clone();
361 row_builder
362 .packer()
363 .extend(prev_thinning.iter().map(|&c| datums_local[c]));
364 let row_value = row_builder.clone();
365
366 Ok((key, row_value, time))
367 }
368 });
369 let datums = DatumVec::new();
370
371 if use_half_join2 {
372 build_halfjoin2(updates, trace, comparison, closure, datums, errs)
373 } else {
374 build_halfjoin1(updates, trace, comparison, closure, datums, errs)
375 }
376}
377
378/// `half_join2` implementation (less-quadratic, new default).
379fn build_halfjoin2<'scope, T, Tr, CF>(
380 updates: VecCollection<'scope, T, (Row, Row, T), Diff>,
381 trace: Arranged<'scope, Tr>,
382 comparison: CF,
383 closure: JoinClosure,
384 mut datums: DatumVec,
385 errs: VecCollection<'scope, T, DataflowError, Diff>,
386) -> (
387 VecCollection<'scope, T, (Row, T), Diff>,
388 VecCollection<'scope, T, DataflowError, Diff>,
389)
390where
391 T: RenderTimestamp,
392 Tr: TraceReader<KeyContainer: BatchContainer<Owned = Row>, Time = T, Diff = Diff>
393 + Clone
394 + 'static,
395 for<'a> Tr::Val<'a>: ToDatumIter,
396 CF: Fn(Tr::TimeGat<'_>, &T) -> bool + 'static,
397{
398 type CB<C> = CapacityContainerBuilder<C>;
399
400 if closure.could_error() {
401 let (oks, errs2) = differential_dogs3::operators::half_join2::half_join_internal_unsafe(
402 updates,
403 trace,
404 |time, antichain| {
405 antichain.insert(time.step_back());
406 },
407 comparison,
408 // TODO(mcsherry): investigate/establish trade-offs here; time based had problems,
409 // in that we seem to yield too much and do too little work when we do.
410 |_timer, count| count > 1_000_000,
411 // TODO(mcsherry): consider `RefOrMut` in `half_join` interface to allow re-use.
412 move |session: &mut CB<Vec<_>>, key, stream_row, lookup_row, initial, diff1, output| {
413 let mut row_builder = SharedRow::get();
414 let temp_storage = RowArena::new();
415
416 let mut datums_local = datums.borrow();
417 datums_local.extend(key.iter());
418 datums_local.extend(stream_row.iter());
419 datums_local.extend(lookup_row.to_datum_iter());
420
421 let row = closure.apply(&mut datums_local, &temp_storage, &mut row_builder);
422
423 for (time, diff2) in output.drain(..) {
424 let row = row.as_ref().map(|row| row.cloned()).map_err(Clone::clone);
425 let diff = diff1.clone() * diff2.clone();
426 let data = ((row, time.clone()), initial.clone(), diff);
427 use timely::container::PushInto;
428 session.push_into(data);
429 }
430 },
431 )
432 .ok_err(|(data_time, init_time, diff)| {
433 // TODO(mcsherry): consider `ok_err()` for `Collection`.
434 match data_time {
435 (Ok(data), time) => Ok((data.map(|data| (data, time)), init_time, diff)),
436 (Err(err), _time) => Err((DataflowError::from(err), init_time, diff)),
437 }
438 });
439
440 (
441 oks.as_collection().flat_map(|x| x),
442 errs.concat(errs2.as_collection()),
443 )
444 } else {
445 let oks = differential_dogs3::operators::half_join2::half_join_internal_unsafe(
446 updates,
447 trace,
448 |time, antichain| {
449 antichain.insert(time.step_back());
450 },
451 comparison,
452 // TODO(mcsherry): investigate/establish trade-offs here; time based had problems,
453 // in that we seem to yield too much and do too little work when we do.
454 |_timer, count| count > 1_000_000,
455 // TODO(mcsherry): consider `RefOrMut` in `half_join` interface to allow re-use.
456 move |session: &mut CB<Vec<_>>, key, stream_row, lookup_row, initial, diff1, output| {
457 if output.is_empty() {
458 return;
459 }
460
461 let mut row_builder = SharedRow::get();
462 let temp_storage = RowArena::new();
463
464 let mut datums_local = datums.borrow();
465 datums_local.extend(key.iter());
466 datums_local.extend(stream_row.iter());
467 datums_local.extend(lookup_row.to_datum_iter());
468
469 if let Some(row) = closure
470 .apply(&mut datums_local, &temp_storage, &mut row_builder)
471 .expect("Closure claimed to never error")
472 {
473 for (time, diff2) in output.drain(..) {
474 let diff = diff1.clone() * diff2.clone();
475 use timely::container::PushInto;
476 session.push_into(((row.clone(), time.clone()), initial.clone(), diff));
477 }
478 }
479 },
480 );
481
482 (oks.as_collection(), errs)
483 }
484}
485
486/// Original `half_join` implementation (fallback).
487fn build_halfjoin1<'scope, T, Tr, CF>(
488 updates: VecCollection<'scope, T, (Row, Row, T), Diff>,
489 trace: Arranged<'scope, Tr>,
490 comparison: CF,
491 closure: JoinClosure,
492 mut datums: DatumVec,
493 errs: VecCollection<'scope, T, DataflowError, Diff>,
494) -> (
495 VecCollection<'scope, T, (Row, T), Diff>,
496 VecCollection<'scope, T, DataflowError, Diff>,
497)
498where
499 T: RenderTimestamp,
500 Tr: TraceReader<KeyContainer: BatchContainer<Owned = Row>, Time = T, Diff = Diff>
501 + Clone
502 + 'static,
503 for<'a> Tr::Val<'a>: ToDatumIter,
504 CF: Fn(Tr::TimeGat<'_>, &T) -> bool + 'static,
505{
506 type CB<C> = CapacityContainerBuilder<C>;
507
508 if closure.could_error() {
509 let (oks, errs2) = differential_dogs3::operators::half_join::half_join_internal_unsafe(
510 updates,
511 trace,
512 |time, antichain| {
513 antichain.insert(time.step_back());
514 },
515 comparison,
516 |_timer, count| count > 1_000_000,
517 move |session: &mut Session<'_, '_, T, CB<Vec<_>>, _>,
518 key,
519 stream_row: &Row,
520 lookup_row,
521 initial,
522 diff1,
523 output| {
524 let mut row_builder = SharedRow::get();
525 let temp_storage = RowArena::new();
526
527 let mut datums_local = datums.borrow();
528 datums_local.extend(key.iter());
529 datums_local.extend(stream_row.iter());
530 datums_local.extend(lookup_row.to_datum_iter());
531
532 let row = closure.apply(&mut datums_local, &temp_storage, &mut row_builder);
533
534 for (time, diff2) in output.drain(..) {
535 let row = row.as_ref().map(|row| row.cloned()).map_err(Clone::clone);
536 let diff = diff1.clone() * diff2.clone();
537 let data = ((row, time.clone()), initial.clone(), diff);
538 session.give(data);
539 }
540 },
541 )
542 .ok_err(|(data_time, init_time, diff)| match data_time {
543 (Ok(data), time) => Ok((data.map(|data| (data, time)), init_time, diff)),
544 (Err(err), _time) => Err((DataflowError::from(err), init_time, diff)),
545 });
546
547 (
548 oks.as_collection().flat_map(|x| x),
549 errs.concat(errs2.as_collection()),
550 )
551 } else {
552 let oks = differential_dogs3::operators::half_join::half_join_internal_unsafe(
553 updates,
554 trace,
555 |time, antichain| {
556 antichain.insert(time.step_back());
557 },
558 comparison,
559 |_timer, count| count > 1_000_000,
560 move |session: &mut Session<'_, '_, T, CB<Vec<_>>, _>,
561 key,
562 stream_row: &Row,
563 lookup_row,
564 initial,
565 diff1,
566 output| {
567 if output.is_empty() {
568 return;
569 }
570
571 let mut row_builder = SharedRow::get();
572 let temp_storage = RowArena::new();
573
574 let mut datums_local = datums.borrow();
575 datums_local.extend(key.iter());
576 datums_local.extend(stream_row.iter());
577 datums_local.extend(lookup_row.to_datum_iter());
578
579 if let Some(row) = closure
580 .apply(&mut datums_local, &temp_storage, &mut row_builder)
581 .expect("Closure claimed to never error")
582 {
583 for (time, diff2) in output.drain(..) {
584 let diff = diff1.clone() * diff2.clone();
585 session.give(((row.clone(), time.clone()), initial.clone(), diff));
586 }
587 }
588 },
589 );
590
591 (oks.as_collection(), errs)
592 }
593}
594
595/// Builds the beginning of the update stream of a delta path.
596///
597/// At start-up time only the delta path for the first relation sees updates, since any updates fed to the
598/// other delta paths would be discarded anyway due to the tie-breaking logic that avoids double-counting
599/// updates happening at the same time on different relations.
600fn build_update_stream<'scope, T, Tr>(
601 trace: Arranged<'scope, Tr>,
602 as_of: Antichain<mz_repr::Timestamp>,
603 source_relation: usize,
604 initial_closure: JoinClosure,
605) -> (
606 VecCollection<'scope, T, Row, Diff>,
607 VecCollection<'scope, T, DataflowError, Diff>,
608)
609where
610 T: RenderTimestamp,
611 for<'a, 'b> &'a T: PartialEq<Tr::TimeGat<'b>>,
612 Tr: for<'a> TraceReader<Time = T, Diff = Diff> + Clone + 'static,
613 for<'a> Tr::Key<'a>: ToDatumIter,
614 for<'a> Tr::Val<'a>: ToDatumIter,
615{
616 let mut inner_as_of = Antichain::new();
617 for event_time in as_of.elements().iter() {
618 inner_as_of.insert(<T>::to_inner(event_time.clone()));
619 }
620
621 let (ok_stream, err_stream) =
622 trace
623 .stream
624 .unary_fallible(Pipeline, "UpdateStream", move |_, _| {
625 let mut datums = DatumVec::new();
626 Box::new(move |input, ok_output, err_output| {
627 // Buffer to accumulate contributing (time, diff) pairs for each (key, val).
628 let mut times_diffs = Vec::default();
629 input.for_each(|time, data| {
630 let mut row_builder = SharedRow::get();
631 let mut ok_session = ok_output.session(&time);
632 let mut err_session = err_output.session(&time);
633
634 for wrapper in data.iter() {
635 let batch = &wrapper;
636 let mut cursor = batch.cursor();
637 while let Some(key) = cursor.get_key(batch) {
638 while let Some(val) = cursor.get_val(batch) {
639 // Collect contributing (time, diff) pairs before invoking the closure.
640 cursor.map_times(batch, |time, diff| {
641 if source_relation == 0
642 || inner_as_of.elements().iter().all(|e| e != time)
643 {
644 // TODO: Consolidate as we push, defensively.
645 times_diffs
646 .push((Tr::owned_time(time), Tr::owned_diff(diff)));
647 }
648 });
649 differential_dataflow::consolidation::consolidate(
650 &mut times_diffs,
651 );
652 // The can not-uncommonly be empty, if the inbound updates cancel.
653 if !times_diffs.is_empty() {
654 let temp_storage = RowArena::new();
655
656 let mut datums_local = datums.borrow();
657 datums_local.extend(key.to_datum_iter());
658 datums_local.extend(val.to_datum_iter());
659
660 if !initial_closure.is_identity() {
661 match initial_closure
662 .apply(
663 &mut datums_local,
664 &temp_storage,
665 &mut row_builder,
666 )
667 .map(|row| row.cloned())
668 .transpose()
669 {
670 Some(Ok(row)) => {
671 for (time, diff) in times_diffs.drain(..) {
672 ok_session.give((row.clone(), time, diff))
673 }
674 }
675 Some(Err(err)) => {
676 for (time, diff) in times_diffs.drain(..) {
677 err_session.give((err.clone(), time, diff))
678 }
679 }
680 None => {}
681 }
682 } else {
683 let row = {
684 row_builder.packer().extend(&*datums_local);
685 row_builder.clone()
686 };
687 for (time, diff) in times_diffs.drain(..) {
688 ok_session.give((row.clone(), time, diff));
689 }
690 }
691 }
692 times_diffs.clear();
693
694 cursor.step_val(batch);
695 }
696 cursor.step_key(batch);
697 }
698 }
699 });
700 })
701 });
702
703 (
704 ok_stream.as_collection(),
705 err_stream.as_collection().map(DataflowError::from),
706 )
707}