mz_compute/render/join/delta_join.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Delta join execution dataflow construction.
11//!
12//! Consult [DeltaJoinPlan] documentation for details.
13
14#![allow(clippy::op_ref)]
15
16use std::collections::{BTreeMap, BTreeSet};
17
18use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
19use differential_dataflow::operators::arrange::Arranged;
20use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
21use differential_dataflow::{AsCollection, Collection};
22use mz_compute_types::plan::join::JoinClosure;
23use mz_compute_types::plan::join::delta_join::{DeltaJoinPlan, DeltaPathPlan, DeltaStagePlan};
24use mz_expr::MirScalarExpr;
25use mz_repr::fixed_length::ToDatumIter;
26use mz_repr::{DatumVec, Diff, Row, RowArena, SharedRow};
27use mz_storage_types::errors::DataflowError;
28use mz_timely_util::operator::{CollectionExt, SessionFor, StreamExt};
29use timely::container::CapacityContainerBuilder;
30use timely::dataflow::Scope;
31use timely::dataflow::channels::pact::Pipeline;
32use timely::dataflow::operators::{Map, OkErr};
33use timely::progress::Antichain;
34use timely::progress::timestamp::Refines;
35
36use crate::render::RenderTimestamp;
37use crate::render::context::{ArrangementFlavor, CollectionBundle, Context, ShutdownProbe};
38use crate::typedefs::{RowRowAgent, RowRowEnter};
39
40impl<G> Context<G>
41where
42 G: Scope,
43 G::Timestamp: RenderTimestamp,
44{
45 /// Renders `MirRelationExpr:Join` using dogs^3 delta query dataflows.
46 ///
47 /// The join is followed by the application of `map_filter_project`, whose
48 /// implementation will be pushed in to the join pipeline if at all possible.
49 pub fn render_delta_join(
50 &self,
51 inputs: Vec<CollectionBundle<G>>,
52 join_plan: DeltaJoinPlan,
53 ) -> CollectionBundle<G> {
54 // We create a new region to contain the dataflow paths for the delta join.
55 let (oks, errs) = self.scope.clone().region_named("Join(Delta)", |inner| {
56 // Collects error streams for the ambient scope.
57 let mut inner_errs = Vec::new();
58
59 // Deduplicate the error streams of multiply used arrangements.
60 let mut err_dedup = BTreeSet::new();
61
62 // Our plan is to iterate through each input relation, and attempt
63 // to find a plan that maximally uses existing keys (better: uses
64 // existing arrangements, to which we have access).
65 let mut join_results = Vec::new();
66
67 // First let's prepare the input arrangements we will need.
68 // This reduces redundant imports, and simplifies the dataflow structure.
69 // As the arrangements are all shared, it should not dramatically improve
70 // the efficiency, but the dataflow simplification is worth doing.
71 let mut arrangements = BTreeMap::new();
72 for path_plan in join_plan.path_plans.iter() {
73 for stage_plan in path_plan.stage_plans.iter() {
74 let lookup_idx = stage_plan.lookup_relation;
75 let lookup_key = stage_plan.lookup_key.clone();
76 arrangements
77 .entry((lookup_idx, lookup_key.clone()))
78 .or_insert_with(|| {
79 match inputs[lookup_idx]
80 .arrangement(&lookup_key)
81 .unwrap_or_else(|| {
82 panic!(
83 "Arrangement alarmingly absent!: {}, {:?}",
84 lookup_idx, lookup_key,
85 )
86 }) {
87 ArrangementFlavor::Local(oks, errs) => {
88 if err_dedup.insert((lookup_idx, lookup_key)) {
89 inner_errs.push(
90 errs.enter_region(inner)
91 .as_collection(|k, _v| k.clone()),
92 );
93 }
94 Ok(oks.enter_region(inner))
95 }
96 ArrangementFlavor::Trace(_gid, oks, errs) => {
97 if err_dedup.insert((lookup_idx, lookup_key)) {
98 inner_errs.push(
99 errs.enter_region(inner)
100 .as_collection(|k, _v| k.clone()),
101 );
102 }
103 Err(oks.enter_region(inner))
104 }
105 }
106 });
107 }
108 }
109
110 for path_plan in join_plan.path_plans {
111 // Deconstruct the stages of the path plan.
112 let DeltaPathPlan {
113 source_relation,
114 initial_closure,
115 stage_plans,
116 final_closure,
117 source_key,
118 } = path_plan;
119
120 // This collection determines changes that result from updates inbound
121 // from `inputs[relation]` and reflects all strictly prior updates and
122 // concurrent updates from relations prior to `relation`.
123 let name = format!("delta path {}", source_relation);
124 let path_results = inner.clone().region_named(&name, |region| {
125 // The plan is to move through each relation, starting from `relation` and in the order
126 // indicated in `orders[relation]`. At each moment, we will have the columns from the
127 // subset of relations encountered so far, and we will have applied as much as we can
128 // of the filters in `equivalences` and the logic in `map_filter_project`, based on the
129 // available columns.
130 //
131 // As we go, we will track the physical locations of each intended output column, as well
132 // as the locations of intermediate results from partial application of `map_filter_project`.
133 //
134 // Just before we apply the `lookup` function to perform a join, we will first use our
135 // available information to determine the filtering and logic that we can apply, and
136 // introduce that in to the `lookup` logic to cause it to happen in that operator.
137
138 // Collects error streams for the region scope. Concats before leaving.
139 let mut region_errs = Vec::with_capacity(inputs.len());
140
141 // Ensure this input is rendered, and extract its update stream.
142 let val = arrangements
143 .get(&(source_relation, source_key))
144 .expect("Arrangement promised by the planner is absent!");
145 let as_of = self.as_of_frontier.clone();
146 let update_stream = match val {
147 Ok(local) => {
148 let arranged = local.enter_region(region);
149 let (update_stream, err_stream) =
150 build_update_stream::<_, RowRowAgent<_, _>>(
151 arranged,
152 as_of,
153 source_relation,
154 initial_closure,
155 );
156 region_errs.push(err_stream);
157 update_stream
158 }
159 Err(trace) => {
160 let arranged = trace.enter_region(region);
161 let (update_stream, err_stream) =
162 build_update_stream::<_, RowRowEnter<_, _, _>>(
163 arranged,
164 as_of,
165 source_relation,
166 initial_closure,
167 );
168 region_errs.push(err_stream);
169 update_stream
170 }
171 };
172 // Promote `time` to a datum element.
173 //
174 // The `half_join` operator manipulates as "data" a pair `(data, time)`,
175 // while tracking the initial time `init_time` separately and without
176 // modification. The initial value for both times is the initial time.
177 let mut update_stream = update_stream
178 .inner
179 .map(|(v, t, d)| ((v, t.clone()), t, d))
180 .as_collection();
181
182 // Repeatedly update `update_stream` to reflect joins with more and more
183 // other relations, in the specified order.
184 for stage_plan in stage_plans {
185 let DeltaStagePlan {
186 lookup_relation,
187 stream_key,
188 stream_thinning,
189 lookup_key,
190 closure,
191 } = stage_plan;
192
193 // We require different logic based on the relative order of the two inputs.
194 // If the `source` relation precedes the `lookup` relation, we present all
195 // updates with less or equal `time`, and otherwise we present only updates
196 // with strictly less `time`.
197 //
198 // We need to write the logic twice, as there are two types of arrangement
199 // we might have: either dataflow-local or an imported trace.
200 let (oks, errs) =
201 match arrangements.get(&(lookup_relation, lookup_key)).unwrap() {
202 Ok(local) => {
203 if source_relation < lookup_relation {
204 build_halfjoin::<_, RowRowAgent<_, _>, _>(
205 update_stream,
206 local.enter_region(region),
207 stream_key,
208 stream_thinning,
209 |t1, t2| t1.le(t2),
210 closure,
211 self.shutdown_probe.clone(),
212 )
213 } else {
214 build_halfjoin::<_, RowRowAgent<_, _>, _>(
215 update_stream,
216 local.enter_region(region),
217 stream_key,
218 stream_thinning,
219 |t1, t2| t1.lt(t2),
220 closure,
221 self.shutdown_probe.clone(),
222 )
223 }
224 }
225 Err(trace) => {
226 if source_relation < lookup_relation {
227 build_halfjoin::<_, RowRowEnter<_, _, _>, _>(
228 update_stream,
229 trace.enter_region(region),
230 stream_key,
231 stream_thinning,
232 |t1, t2| t1.le(t2),
233 closure,
234 self.shutdown_probe.clone(),
235 )
236 } else {
237 build_halfjoin::<_, RowRowEnter<_, _, _>, _>(
238 update_stream,
239 trace.enter_region(region),
240 stream_key,
241 stream_thinning,
242 |t1, t2| t1.lt(t2),
243 closure,
244 self.shutdown_probe.clone(),
245 )
246 }
247 }
248 };
249 update_stream = oks;
250 region_errs.push(errs);
251 }
252
253 // Delay updates as appropriate.
254 //
255 // The `half_join` operator maintains a time that we now discard (the `_`),
256 // and replace with the `time` that is maintained with the data. The former
257 // exists to pin a consistent total order on updates throughout the process,
258 // while allowing `time` to vary upwards as a result of actions on time.
259 let mut update_stream = update_stream
260 .inner
261 .map(|((row, time), _, diff)| (row, time, diff))
262 .as_collection();
263
264 // We have completed the join building, but may have work remaining.
265 // For example, we may have expressions not pushed down (e.g. literals)
266 // and projections that could not be applied (e.g. column repetition).
267 if let Some(final_closure) = final_closure {
268 let name = "DeltaJoinFinalization";
269 type CB<C> = ConsolidatingContainerBuilder<C>;
270 let (updates, errors) = update_stream
271 .flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>(name, {
272 // Reuseable allocation for unpacking.
273 let mut datums = DatumVec::new();
274 move |row| {
275 let mut row_builder = SharedRow::get();
276 let temp_storage = RowArena::new();
277 let mut datums_local = datums.borrow_with(&row);
278 // TODO(mcsherry): re-use `row` allocation.
279 final_closure
280 .apply(&mut datums_local, &temp_storage, &mut row_builder)
281 .map(|row| row.cloned())
282 .map_err(DataflowError::from)
283 .transpose()
284 }
285 });
286
287 update_stream = updates;
288 region_errs.push(errors);
289 }
290
291 inner_errs.push(
292 differential_dataflow::collection::concatenate(region, region_errs)
293 .leave_region(),
294 );
295 update_stream.leave_region()
296 });
297
298 join_results.push(path_results);
299 }
300
301 // Concatenate the results of each delta query as the accumulated results.
302 (
303 differential_dataflow::collection::concatenate(inner, join_results).leave_region(),
304 differential_dataflow::collection::concatenate(inner, inner_errs).leave_region(),
305 )
306 });
307 CollectionBundle::from_collections(oks, errs)
308 }
309}
310
311/// Constructs a `half_join` from supplied arguments.
312///
313/// This method exists to factor common logic from four code paths that are generic over the type of trace.
314/// The `comparison` function should either be `le` or `lt` depending on which relation comes first in the
315/// total order on relations (in order to break ties consistently).
316///
317/// The input and output streams are of pairs `(data, time)` where the `time` component can be greater than
318/// the time of the update. This operator may manipulate `time` as part of this pair, but will not manipulate
319/// the time of the update. This is crucial for correctness, as the total order on times of updates is used
320/// to ensure that any two updates are matched at most once.
321fn build_halfjoin<G, Tr, CF>(
322 updates: Collection<G, (Row, G::Timestamp), Diff>,
323 trace: Arranged<G, Tr>,
324 prev_key: Vec<MirScalarExpr>,
325 prev_thinning: Vec<usize>,
326 comparison: CF,
327 closure: JoinClosure,
328 shutdown_probe: ShutdownProbe,
329) -> (
330 Collection<G, (Row, G::Timestamp), Diff>,
331 Collection<G, DataflowError, Diff>,
332)
333where
334 G: Scope,
335 G::Timestamp: RenderTimestamp,
336 Tr: TraceReader<KeyOwn = Row, Time = G::Timestamp, Diff = Diff> + Clone + 'static,
337 for<'a> Tr::Val<'a>: ToDatumIter,
338 CF: Fn(Tr::TimeGat<'_>, &G::Timestamp) -> bool + 'static,
339{
340 let name = "DeltaJoinKeyPreparation";
341 type CB<C> = CapacityContainerBuilder<C>;
342 let (updates, errs) = updates.map_fallible::<CB<_>, CB<_>, _, _, _>(name, {
343 // Reuseable allocation for unpacking.
344 let mut datums = DatumVec::new();
345 move |(row, time)| {
346 let temp_storage = RowArena::new();
347 let datums_local = datums.borrow_with(&row);
348 let mut row_builder = SharedRow::get();
349 row_builder.packer().try_extend(
350 prev_key
351 .iter()
352 .map(|e| e.eval(&datums_local, &temp_storage)),
353 )?;
354 let key = row_builder.clone();
355 row_builder
356 .packer()
357 .extend(prev_thinning.iter().map(|&c| datums_local[c]));
358 let row_value = row_builder.clone();
359
360 Ok((key, row_value, time))
361 }
362 });
363 let mut datums = DatumVec::new();
364
365 if closure.could_error() {
366 let (oks, errs2) = differential_dogs3::operators::half_join::half_join_internal_unsafe(
367 &updates,
368 trace,
369 |time, antichain| {
370 antichain.insert(time.step_back());
371 },
372 comparison,
373 // TODO(mcsherry): investigate/establish trade-offs here; time based had problems,
374 // in that we seem to yield too much and do too little work when we do.
375 |_timer, count| count > 1_000_000,
376 // TODO(mcsherry): consider `RefOrMut` in `half_join` interface to allow re-use.
377 move |session: &mut SessionFor<G, CB<Vec<_>>>,
378 key,
379 stream_row,
380 lookup_row,
381 initial,
382 diff1,
383 output| {
384 // Check the shutdown token to avoid doing unnecessary work when the dataflow is
385 // shutting down.
386 if shutdown_probe.in_shutdown() || output.is_empty() {
387 return;
388 }
389
390 let mut row_builder = SharedRow::get();
391 let temp_storage = RowArena::new();
392
393 let mut datums_local = datums.borrow();
394 datums_local.extend(key.iter());
395 datums_local.extend(stream_row.iter());
396 datums_local.extend(lookup_row.to_datum_iter());
397
398 let row = closure.apply(&mut datums_local, &temp_storage, &mut row_builder);
399
400 for (time, diff2) in output.drain(..) {
401 let row = row.as_ref().map(|row| row.cloned()).map_err(Clone::clone);
402 let diff = diff1.clone() * diff2.clone();
403 let data = ((row, time.clone()), initial.clone(), diff);
404 session.give(data);
405 }
406 },
407 )
408 .ok_err(|(data_time, init_time, diff)| {
409 // TODO(mcsherry): consider `ok_err()` for `Collection`.
410 match data_time {
411 (Ok(data), time) => Ok((data.map(|data| (data, time)), init_time, diff)),
412 (Err(err), _time) => Err((DataflowError::from(err), init_time, diff)),
413 }
414 });
415
416 (
417 oks.as_collection().flat_map(|x| x),
418 errs.concat(&errs2.as_collection()),
419 )
420 } else {
421 let oks = differential_dogs3::operators::half_join::half_join_internal_unsafe(
422 &updates,
423 trace,
424 |time, antichain| {
425 antichain.insert(time.step_back());
426 },
427 comparison,
428 // TODO(mcsherry): investigate/establish trade-offs here; time based had problems,
429 // in that we seem to yield too much and do too little work when we do.
430 |_timer, count| count > 1_000_000,
431 // TODO(mcsherry): consider `RefOrMut` in `half_join` interface to allow re-use.
432 move |session: &mut SessionFor<G, CB<Vec<_>>>,
433 key,
434 stream_row,
435 lookup_row,
436 initial,
437 diff1,
438 output| {
439 // Check the shutdown token to avoid doing unnecessary work when the dataflow is
440 // shutting down.
441 if shutdown_probe.in_shutdown() || output.is_empty() {
442 return;
443 }
444
445 let mut row_builder = SharedRow::get();
446 let temp_storage = RowArena::new();
447
448 let mut datums_local = datums.borrow();
449 datums_local.extend(key.iter());
450 datums_local.extend(stream_row.iter());
451 datums_local.extend(lookup_row.to_datum_iter());
452
453 if let Some(row) = closure
454 .apply(&mut datums_local, &temp_storage, &mut row_builder)
455 .expect("Closure claimed to never error")
456 {
457 for (time, diff2) in output.drain(..) {
458 let diff = diff1.clone() * diff2.clone();
459 session.give(((row.clone(), time.clone()), initial.clone(), diff));
460 }
461 }
462 },
463 );
464
465 (oks.as_collection(), errs)
466 }
467}
468
469/// Builds the beginning of the update stream of a delta path.
470///
471/// At start-up time only the delta path for the first relation sees updates, since any updates fed to the
472/// other delta paths would be discarded anyway due to the tie-breaking logic that avoids double-counting
473/// updates happening at the same time on different relations.
474fn build_update_stream<G, Tr>(
475 trace: Arranged<G, Tr>,
476 as_of: Antichain<mz_repr::Timestamp>,
477 source_relation: usize,
478 initial_closure: JoinClosure,
479) -> (Collection<G, Row, Diff>, Collection<G, DataflowError, Diff>)
480where
481 G: Scope,
482 G::Timestamp: RenderTimestamp,
483 for<'a, 'b> &'a G::Timestamp: PartialEq<Tr::TimeGat<'b>>,
484 Tr: for<'a> TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
485 for<'a> Tr::Key<'a>: ToDatumIter,
486 for<'a> Tr::Val<'a>: ToDatumIter,
487{
488 let mut inner_as_of = Antichain::new();
489 for event_time in as_of.elements().iter() {
490 inner_as_of.insert(<G::Timestamp>::to_inner(event_time.clone()));
491 }
492
493 let (ok_stream, err_stream) =
494 trace
495 .stream
496 .unary_fallible(Pipeline, "UpdateStream", move |_, _| {
497 let mut datums = DatumVec::new();
498 Box::new(move |input, ok_output, err_output| {
499 input.for_each(|time, data| {
500 let mut row_builder = SharedRow::get();
501 let mut ok_session = ok_output.session(&time);
502 let mut err_session = err_output.session(&time);
503
504 for wrapper in data.iter() {
505 let batch = &wrapper;
506 let mut cursor = batch.cursor();
507 while let Some(key) = cursor.get_key(batch) {
508 while let Some(val) = cursor.get_val(batch) {
509 cursor.map_times(batch, |time, diff| {
510 // note: only the delta path for the first relation will see
511 // updates at start-up time
512 if source_relation == 0
513 || inner_as_of.elements().iter().all(|e| e != time)
514 {
515 let time = Tr::owned_time(time);
516 let temp_storage = RowArena::new();
517
518 let mut datums_local = datums.borrow();
519 datums_local.extend(key.to_datum_iter());
520 datums_local.extend(val.to_datum_iter());
521
522 if !initial_closure.is_identity() {
523 match initial_closure
524 .apply(
525 &mut datums_local,
526 &temp_storage,
527 &mut row_builder,
528 )
529 .map(|row| row.cloned())
530 .transpose()
531 {
532 Some(Ok(row)) => ok_session.give((
533 row,
534 time,
535 Tr::owned_diff(diff),
536 )),
537 Some(Err(err)) => err_session.give((
538 err,
539 time,
540 Tr::owned_diff(diff),
541 )),
542 None => {}
543 }
544 } else {
545 let row = {
546 row_builder.packer().extend(&*datums_local);
547 row_builder.clone()
548 };
549 ok_session.give((row, time, Tr::owned_diff(diff)));
550 }
551 }
552 });
553 cursor.step_val(batch);
554 }
555 cursor.step_key(batch);
556 }
557 }
558 });
559 })
560 });
561
562 (
563 ok_stream.as_collection(),
564 err_stream.as_collection().map(DataflowError::from),
565 )
566}