mz_compute/render/join/delta_join.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Delta join execution dataflow construction.
11//!
12//! Consult [DeltaJoinPlan] documentation for details.
13
14#![allow(clippy::op_ref)]
15
16use std::collections::{BTreeMap, BTreeSet};
17
18use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
19use differential_dataflow::operators::arrange::Arranged;
20use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
21use differential_dataflow::{AsCollection, VecCollection};
22use mz_compute_types::plan::join::JoinClosure;
23use mz_compute_types::plan::join::delta_join::{DeltaJoinPlan, DeltaPathPlan, DeltaStagePlan};
24use mz_expr::MirScalarExpr;
25use mz_repr::fixed_length::ToDatumIter;
26use mz_repr::{DatumVec, Diff, Row, RowArena, SharedRow};
27use mz_storage_types::errors::DataflowError;
28use mz_timely_util::operator::{CollectionExt, StreamExt};
29use timely::container::CapacityContainerBuilder;
30use timely::dataflow::Scope;
31use timely::dataflow::channels::pact::Pipeline;
32use timely::dataflow::operators::OkErr;
33use timely::dataflow::operators::generic::Session;
34use timely::dataflow::operators::vec::Map;
35use timely::progress::Antichain;
36use timely::progress::timestamp::Refines;
37
38use crate::render::RenderTimestamp;
39use crate::render::context::{ArrangementFlavor, CollectionBundle, Context};
40use crate::typedefs::{RowRowAgent, RowRowEnter};
41
42impl<G> Context<G>
43where
44 G: Scope,
45 G::Timestamp: RenderTimestamp,
46{
47 /// Renders `MirRelationExpr:Join` using dogs^3 delta query dataflows.
48 ///
49 /// The join is followed by the application of `map_filter_project`, whose
50 /// implementation will be pushed in to the join pipeline if at all possible.
51 pub fn render_delta_join(
52 &self,
53 inputs: Vec<CollectionBundle<G>>,
54 join_plan: DeltaJoinPlan,
55 ) -> CollectionBundle<G> {
56 // We create a new region to contain the dataflow paths for the delta join.
57 let (oks, errs) = self.scope.clone().region_named("Join(Delta)", |inner| {
58 // Collects error streams for the ambient scope.
59 let mut inner_errs = Vec::new();
60
61 // Deduplicate the error streams of multiply used arrangements.
62 let mut err_dedup = BTreeSet::new();
63
64 // Our plan is to iterate through each input relation, and attempt
65 // to find a plan that maximally uses existing keys (better: uses
66 // existing arrangements, to which we have access).
67 let mut join_results = Vec::new();
68
69 // First let's prepare the input arrangements we will need.
70 // This reduces redundant imports, and simplifies the dataflow structure.
71 // As the arrangements are all shared, it should not dramatically improve
72 // the efficiency, but the dataflow simplification is worth doing.
73 let mut arrangements = BTreeMap::new();
74 for path_plan in join_plan.path_plans.iter() {
75 for stage_plan in path_plan.stage_plans.iter() {
76 let lookup_idx = stage_plan.lookup_relation;
77 let lookup_key = stage_plan.lookup_key.clone();
78 arrangements
79 .entry((lookup_idx, lookup_key.clone()))
80 .or_insert_with(|| {
81 match inputs[lookup_idx]
82 .arrangement(&lookup_key)
83 .unwrap_or_else(|| {
84 panic!(
85 "Arrangement alarmingly absent!: {}, {:?}",
86 lookup_idx, lookup_key,
87 )
88 }) {
89 ArrangementFlavor::Local(oks, errs) => {
90 if err_dedup.insert((lookup_idx, lookup_key)) {
91 inner_errs.push(
92 errs.enter_region(inner)
93 .as_collection(|k, _v| k.clone()),
94 );
95 }
96 Ok(oks.enter_region(inner))
97 }
98 ArrangementFlavor::Trace(_gid, oks, errs) => {
99 if err_dedup.insert((lookup_idx, lookup_key)) {
100 inner_errs.push(
101 errs.enter_region(inner)
102 .as_collection(|k, _v| k.clone()),
103 );
104 }
105 Err(oks.enter_region(inner))
106 }
107 }
108 });
109 }
110 }
111
112 for path_plan in join_plan.path_plans {
113 // Deconstruct the stages of the path plan.
114 let DeltaPathPlan {
115 source_relation,
116 initial_closure,
117 stage_plans,
118 final_closure,
119 source_key,
120 } = path_plan;
121
122 // This collection determines changes that result from updates inbound
123 // from `inputs[relation]` and reflects all strictly prior updates and
124 // concurrent updates from relations prior to `relation`.
125 let name = format!("delta path {}", source_relation);
126 let path_results = inner.clone().region_named(&name, |region| {
127 // The plan is to move through each relation, starting from `relation` and in the order
128 // indicated in `orders[relation]`. At each moment, we will have the columns from the
129 // subset of relations encountered so far, and we will have applied as much as we can
130 // of the filters in `equivalences` and the logic in `map_filter_project`, based on the
131 // available columns.
132 //
133 // As we go, we will track the physical locations of each intended output column, as well
134 // as the locations of intermediate results from partial application of `map_filter_project`.
135 //
136 // Just before we apply the `lookup` function to perform a join, we will first use our
137 // available information to determine the filtering and logic that we can apply, and
138 // introduce that in to the `lookup` logic to cause it to happen in that operator.
139
140 // Collects error streams for the region scope. Concats before leaving.
141 let mut region_errs = Vec::with_capacity(inputs.len());
142
143 // Ensure this input is rendered, and extract its update stream.
144 let val = arrangements
145 .get(&(source_relation, source_key))
146 .expect("Arrangement promised by the planner is absent!");
147 let as_of = self.as_of_frontier.clone();
148 let update_stream = match val {
149 Ok(local) => {
150 let arranged = local.clone().enter_region(region);
151 let (update_stream, err_stream) =
152 build_update_stream::<_, RowRowAgent<_, _>>(
153 arranged,
154 as_of,
155 source_relation,
156 initial_closure,
157 );
158 region_errs.push(err_stream);
159 update_stream
160 }
161 Err(trace) => {
162 let arranged = trace.clone().enter_region(region);
163 let (update_stream, err_stream) =
164 build_update_stream::<_, RowRowEnter<_, _, _>>(
165 arranged,
166 as_of,
167 source_relation,
168 initial_closure,
169 );
170 region_errs.push(err_stream);
171 update_stream
172 }
173 };
174 // Promote `time` to a datum element.
175 //
176 // The `half_join` operator manipulates as "data" a pair `(data, time)`,
177 // while tracking the initial time `init_time` separately and without
178 // modification. The initial value for both times is the initial time.
179 let mut update_stream = update_stream
180 .inner
181 .map(|(v, t, d)| ((v, t.clone()), t, d))
182 .as_collection();
183
184 // Repeatedly update `update_stream` to reflect joins with more and more
185 // other relations, in the specified order.
186 for stage_plan in stage_plans {
187 let DeltaStagePlan {
188 lookup_relation,
189 stream_key,
190 stream_thinning,
191 lookup_key,
192 closure,
193 } = stage_plan;
194
195 // We require different logic based on the relative order of the two inputs.
196 // If the `source` relation precedes the `lookup` relation, we present all
197 // updates with less or equal `time`, and otherwise we present only updates
198 // with strictly less `time`.
199 //
200 // We need to write the logic twice, as there are two types of arrangement
201 // we might have: either dataflow-local or an imported trace.
202 let (oks, errs) =
203 match arrangements.get(&(lookup_relation, lookup_key)).unwrap() {
204 Ok(local) => {
205 if source_relation < lookup_relation {
206 build_halfjoin::<_, RowRowAgent<_, _>, _>(
207 update_stream,
208 local.clone().enter_region(region),
209 stream_key,
210 stream_thinning,
211 |t1, t2| t1.le(t2),
212 closure,
213 )
214 } else {
215 build_halfjoin::<_, RowRowAgent<_, _>, _>(
216 update_stream,
217 local.clone().enter_region(region),
218 stream_key,
219 stream_thinning,
220 |t1, t2| t1.lt(t2),
221 closure,
222 )
223 }
224 }
225 Err(trace) => {
226 if source_relation < lookup_relation {
227 build_halfjoin::<_, RowRowEnter<_, _, _>, _>(
228 update_stream,
229 trace.clone().enter_region(region),
230 stream_key,
231 stream_thinning,
232 |t1, t2| t1.le(t2),
233 closure,
234 )
235 } else {
236 build_halfjoin::<_, RowRowEnter<_, _, _>, _>(
237 update_stream,
238 trace.clone().enter_region(region),
239 stream_key,
240 stream_thinning,
241 |t1, t2| t1.lt(t2),
242 closure,
243 )
244 }
245 }
246 };
247 update_stream = oks;
248 region_errs.push(errs);
249 }
250
251 // Delay updates as appropriate.
252 //
253 // The `half_join` operator maintains a time that we now discard (the `_`),
254 // and replace with the `time` that is maintained with the data. The former
255 // exists to pin a consistent total order on updates throughout the process,
256 // while allowing `time` to vary upwards as a result of actions on time.
257 let mut update_stream = update_stream
258 .inner
259 .map(|((row, time), _, diff)| (row, time, diff))
260 .as_collection();
261
262 // We have completed the join building, but may have work remaining.
263 // For example, we may have expressions not pushed down (e.g. literals)
264 // and projections that could not be applied (e.g. column repetition).
265 if let Some(final_closure) = final_closure {
266 let name = "DeltaJoinFinalization";
267 type CB<C> = ConsolidatingContainerBuilder<C>;
268 let (updates, errors) = update_stream
269 .flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>(name, {
270 // Reuseable allocation for unpacking.
271 let mut datums = DatumVec::new();
272 move |row| {
273 let mut row_builder = SharedRow::get();
274 let temp_storage = RowArena::new();
275 let mut datums_local = datums.borrow_with(&row);
276 // TODO(mcsherry): re-use `row` allocation.
277 final_closure
278 .apply(&mut datums_local, &temp_storage, &mut row_builder)
279 .map(|row| row.cloned())
280 .map_err(DataflowError::from)
281 .transpose()
282 }
283 });
284
285 update_stream = updates;
286 region_errs.push(errors);
287 }
288
289 inner_errs.push(
290 differential_dataflow::collection::concatenate(region, region_errs)
291 .leave_region(),
292 );
293 update_stream.leave_region()
294 });
295
296 join_results.push(path_results);
297 }
298
299 // Concatenate the results of each delta query as the accumulated results.
300 (
301 differential_dataflow::collection::concatenate(inner, join_results).leave_region(),
302 differential_dataflow::collection::concatenate(inner, inner_errs).leave_region(),
303 )
304 });
305 CollectionBundle::from_collections(oks, errs)
306 }
307}
308
309/// Constructs a `half_join` from supplied arguments.
310///
311/// This method exists to factor common logic from four code paths that are generic over the type of trace.
312/// The `comparison` function should either be `le` or `lt` depending on which relation comes first in the
313/// total order on relations (in order to break ties consistently).
314///
315/// The input and output streams are of pairs `(data, time)` where the `time` component can be greater than
316/// the time of the update. This operator may manipulate `time` as part of this pair, but will not manipulate
317/// the time of the update. This is crucial for correctness, as the total order on times of updates is used
318/// to ensure that any two updates are matched at most once.
319fn build_halfjoin<G, Tr, CF>(
320 updates: VecCollection<G, (Row, G::Timestamp), Diff>,
321 trace: Arranged<G, Tr>,
322 prev_key: Vec<MirScalarExpr>,
323 prev_thinning: Vec<usize>,
324 comparison: CF,
325 closure: JoinClosure,
326) -> (
327 VecCollection<G, (Row, G::Timestamp), Diff>,
328 VecCollection<G, DataflowError, Diff>,
329)
330where
331 G: Scope,
332 G::Timestamp: RenderTimestamp,
333 Tr: TraceReader<KeyOwn = Row, Time = G::Timestamp, Diff = Diff> + Clone + 'static,
334 for<'a> Tr::Val<'a>: ToDatumIter,
335 CF: Fn(Tr::TimeGat<'_>, &G::Timestamp) -> bool + 'static,
336{
337 let name = "DeltaJoinKeyPreparation";
338 type CB<C> = CapacityContainerBuilder<C>;
339 let (updates, errs) = updates.map_fallible::<CB<_>, CB<_>, _, _, _>(name, {
340 // Reuseable allocation for unpacking.
341 let mut datums = DatumVec::new();
342 move |(row, time)| {
343 let temp_storage = RowArena::new();
344 let datums_local = datums.borrow_with(&row);
345 let mut row_builder = SharedRow::get();
346 row_builder.packer().try_extend(
347 prev_key
348 .iter()
349 .map(|e| e.eval(&datums_local, &temp_storage)),
350 )?;
351 let key = row_builder.clone();
352 row_builder
353 .packer()
354 .extend(prev_thinning.iter().map(|&c| datums_local[c]));
355 let row_value = row_builder.clone();
356
357 Ok((key, row_value, time))
358 }
359 });
360 let mut datums = DatumVec::new();
361
362 if closure.could_error() {
363 let (oks, errs2) = differential_dogs3::operators::half_join::half_join_internal_unsafe(
364 updates,
365 trace,
366 |time, antichain| {
367 antichain.insert(time.step_back());
368 },
369 comparison,
370 // TODO(mcsherry): investigate/establish trade-offs here; time based had problems,
371 // in that we seem to yield too much and do too little work when we do.
372 |_timer, count| count > 1_000_000,
373 // TODO(mcsherry): consider `RefOrMut` in `half_join` interface to allow re-use.
374 move |session: &mut Session<'_, '_, G::Timestamp, CB<Vec<_>>, _>,
375 key,
376 stream_row,
377 lookup_row,
378 initial,
379 diff1,
380 output| {
381 let mut row_builder = SharedRow::get();
382 let temp_storage = RowArena::new();
383
384 let mut datums_local = datums.borrow();
385 datums_local.extend(key.iter());
386 datums_local.extend(stream_row.iter());
387 datums_local.extend(lookup_row.to_datum_iter());
388
389 let row = closure.apply(&mut datums_local, &temp_storage, &mut row_builder);
390
391 for (time, diff2) in output.drain(..) {
392 let row = row.as_ref().map(|row| row.cloned()).map_err(Clone::clone);
393 let diff = diff1.clone() * diff2.clone();
394 let data = ((row, time.clone()), initial.clone(), diff);
395 session.give(data);
396 }
397 },
398 )
399 .ok_err(|(data_time, init_time, diff)| {
400 // TODO(mcsherry): consider `ok_err()` for `Collection`.
401 match data_time {
402 (Ok(data), time) => Ok((data.map(|data| (data, time)), init_time, diff)),
403 (Err(err), _time) => Err((DataflowError::from(err), init_time, diff)),
404 }
405 });
406
407 (
408 oks.as_collection().flat_map(|x| x),
409 errs.concat(errs2.as_collection()),
410 )
411 } else {
412 let oks = differential_dogs3::operators::half_join::half_join_internal_unsafe(
413 updates,
414 trace,
415 |time, antichain| {
416 antichain.insert(time.step_back());
417 },
418 comparison,
419 // TODO(mcsherry): investigate/establish trade-offs here; time based had problems,
420 // in that we seem to yield too much and do too little work when we do.
421 |_timer, count| count > 1_000_000,
422 // TODO(mcsherry): consider `RefOrMut` in `half_join` interface to allow re-use.
423 move |session: &mut Session<'_, '_, G::Timestamp, CB<Vec<_>>, _>,
424 key,
425 stream_row,
426 lookup_row,
427 initial,
428 diff1,
429 output| {
430 if output.is_empty() {
431 return;
432 }
433
434 let mut row_builder = SharedRow::get();
435 let temp_storage = RowArena::new();
436
437 let mut datums_local = datums.borrow();
438 datums_local.extend(key.iter());
439 datums_local.extend(stream_row.iter());
440 datums_local.extend(lookup_row.to_datum_iter());
441
442 if let Some(row) = closure
443 .apply(&mut datums_local, &temp_storage, &mut row_builder)
444 .expect("Closure claimed to never error")
445 {
446 for (time, diff2) in output.drain(..) {
447 let diff = diff1.clone() * diff2.clone();
448 session.give(((row.clone(), time.clone()), initial.clone(), diff));
449 }
450 }
451 },
452 );
453
454 (oks.as_collection(), errs)
455 }
456}
457
458/// Builds the beginning of the update stream of a delta path.
459///
460/// At start-up time only the delta path for the first relation sees updates, since any updates fed to the
461/// other delta paths would be discarded anyway due to the tie-breaking logic that avoids double-counting
462/// updates happening at the same time on different relations.
463fn build_update_stream<G, Tr>(
464 trace: Arranged<G, Tr>,
465 as_of: Antichain<mz_repr::Timestamp>,
466 source_relation: usize,
467 initial_closure: JoinClosure,
468) -> (
469 VecCollection<G, Row, Diff>,
470 VecCollection<G, DataflowError, Diff>,
471)
472where
473 G: Scope,
474 G::Timestamp: RenderTimestamp,
475 for<'a, 'b> &'a G::Timestamp: PartialEq<Tr::TimeGat<'b>>,
476 Tr: for<'a> TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
477 for<'a> Tr::Key<'a>: ToDatumIter,
478 for<'a> Tr::Val<'a>: ToDatumIter,
479{
480 let mut inner_as_of = Antichain::new();
481 for event_time in as_of.elements().iter() {
482 inner_as_of.insert(<G::Timestamp>::to_inner(event_time.clone()));
483 }
484
485 let (ok_stream, err_stream) =
486 trace
487 .stream
488 .unary_fallible(Pipeline, "UpdateStream", move |_, _| {
489 let mut datums = DatumVec::new();
490 Box::new(move |input, ok_output, err_output| {
491 // Buffer to accumulate contributing (time, diff) pairs for each (key, val).
492 let mut times_diffs = Vec::default();
493 input.for_each(|time, data| {
494 let mut row_builder = SharedRow::get();
495 let mut ok_session = ok_output.session(&time);
496 let mut err_session = err_output.session(&time);
497
498 for wrapper in data.iter() {
499 let batch = &wrapper;
500 let mut cursor = batch.cursor();
501 while let Some(key) = cursor.get_key(batch) {
502 while let Some(val) = cursor.get_val(batch) {
503 // Collect contributing (time, diff) pairs before invoking the closure.
504 cursor.map_times(batch, |time, diff| {
505 if source_relation == 0
506 || inner_as_of.elements().iter().all(|e| e != time)
507 {
508 // TODO: Consolidate as we push, defensively.
509 times_diffs
510 .push((Tr::owned_time(time), Tr::owned_diff(diff)));
511 }
512 });
513 differential_dataflow::consolidation::consolidate(
514 &mut times_diffs,
515 );
516 // The can not-uncommonly be empty, if the inbound updates cancel.
517 if !times_diffs.is_empty() {
518 let temp_storage = RowArena::new();
519
520 let mut datums_local = datums.borrow();
521 datums_local.extend(key.to_datum_iter());
522 datums_local.extend(val.to_datum_iter());
523
524 if !initial_closure.is_identity() {
525 match initial_closure
526 .apply(
527 &mut datums_local,
528 &temp_storage,
529 &mut row_builder,
530 )
531 .map(|row| row.cloned())
532 .transpose()
533 {
534 Some(Ok(row)) => {
535 for (time, diff) in times_diffs.drain(..) {
536 ok_session.give((row.clone(), time, diff))
537 }
538 }
539 Some(Err(err)) => {
540 for (time, diff) in times_diffs.drain(..) {
541 err_session.give((err.clone(), time, diff))
542 }
543 }
544 None => {}
545 }
546 } else {
547 let row = {
548 row_builder.packer().extend(&*datums_local);
549 row_builder.clone()
550 };
551 for (time, diff) in times_diffs.drain(..) {
552 ok_session.give((row.clone(), time, diff));
553 }
554 }
555 }
556 times_diffs.clear();
557
558 cursor.step_val(batch);
559 }
560 cursor.step_key(batch);
561 }
562 }
563 });
564 })
565 });
566
567 (
568 ok_stream.as_collection(),
569 err_stream.as_collection().map(DataflowError::from),
570 )
571}