mz_compute/render/join/delta_join.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Delta join execution dataflow construction.
11//!
12//! Consult [DeltaJoinPlan] documentation for details.
13
14#![allow(clippy::op_ref)]
15
16use std::collections::{BTreeMap, BTreeSet};
17
18use columnar::Columnar;
19use differential_dataflow::IntoOwned;
20use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
21use differential_dataflow::operators::arrange::Arranged;
22use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
23use differential_dataflow::{AsCollection, Collection};
24use mz_compute_types::plan::join::JoinClosure;
25use mz_compute_types::plan::join::delta_join::{DeltaJoinPlan, DeltaPathPlan, DeltaStagePlan};
26use mz_expr::MirScalarExpr;
27use mz_repr::fixed_length::ToDatumIter;
28use mz_repr::{DatumVec, Diff, Row, RowArena, SharedRow};
29use mz_storage_types::errors::DataflowError;
30use mz_timely_util::operator::{CollectionExt, StreamExt};
31use timely::container::CapacityContainerBuilder;
32use timely::dataflow::Scope;
33use timely::dataflow::channels::pact::Pipeline;
34use timely::dataflow::operators::{Map, OkErr};
35use timely::progress::Antichain;
36use timely::progress::timestamp::Refines;
37
38use crate::render::RenderTimestamp;
39use crate::render::context::{ArrangementFlavor, CollectionBundle, Context, ShutdownToken};
40use crate::typedefs::{RowRowAgent, RowRowEnter};
41
42impl<G> Context<G>
43where
44 G: Scope,
45 G::Timestamp: RenderTimestamp,
46 <G::Timestamp as Columnar>::Container: Clone + Send,
47 for<'a> <G::Timestamp as Columnar>::Ref<'a>: Ord + Copy,
48{
49 /// Renders `MirRelationExpr:Join` using dogs^3 delta query dataflows.
50 ///
51 /// The join is followed by the application of `map_filter_project`, whose
52 /// implementation will be pushed in to the join pipeline if at all possible.
53 pub fn render_delta_join(
54 &self,
55 inputs: Vec<CollectionBundle<G>>,
56 join_plan: DeltaJoinPlan,
57 ) -> CollectionBundle<G> {
58 // We create a new region to contain the dataflow paths for the delta join.
59 let (oks, errs) = self.scope.clone().region_named("Join(Delta)", |inner| {
60 // Collects error streams for the ambient scope.
61 let mut inner_errs = Vec::new();
62
63 // Deduplicate the error streams of multiply used arrangements.
64 let mut err_dedup = BTreeSet::new();
65
66 // Our plan is to iterate through each input relation, and attempt
67 // to find a plan that maximally uses existing keys (better: uses
68 // existing arrangements, to which we have access).
69 let mut join_results = Vec::new();
70
71 // First let's prepare the input arrangements we will need.
72 // This reduces redundant imports, and simplifies the dataflow structure.
73 // As the arrangements are all shared, it should not dramatically improve
74 // the efficiency, but the dataflow simplification is worth doing.
75 let mut arrangements = BTreeMap::new();
76 for path_plan in join_plan.path_plans.iter() {
77 for stage_plan in path_plan.stage_plans.iter() {
78 let lookup_idx = stage_plan.lookup_relation;
79 let lookup_key = stage_plan.lookup_key.clone();
80 arrangements
81 .entry((lookup_idx, lookup_key.clone()))
82 .or_insert_with(|| {
83 match inputs[lookup_idx]
84 .arrangement(&lookup_key)
85 .unwrap_or_else(|| {
86 panic!(
87 "Arrangement alarmingly absent!: {}, {:?}",
88 lookup_idx, lookup_key,
89 )
90 }) {
91 ArrangementFlavor::Local(oks, errs) => {
92 if err_dedup.insert((lookup_idx, lookup_key)) {
93 inner_errs.push(
94 errs.enter_region(inner)
95 .as_collection(|k, _v| k.clone()),
96 );
97 }
98 Ok(oks.enter_region(inner))
99 }
100 ArrangementFlavor::Trace(_gid, oks, errs) => {
101 if err_dedup.insert((lookup_idx, lookup_key)) {
102 inner_errs.push(
103 errs.enter_region(inner)
104 .as_collection(|k, _v| k.clone()),
105 );
106 }
107 Err(oks.enter_region(inner))
108 }
109 }
110 });
111 }
112 }
113
114 for path_plan in join_plan.path_plans {
115 // Deconstruct the stages of the path plan.
116 let DeltaPathPlan {
117 source_relation,
118 initial_closure,
119 stage_plans,
120 final_closure,
121 source_key,
122 } = path_plan;
123
124 // This collection determines changes that result from updates inbound
125 // from `inputs[relation]` and reflects all strictly prior updates and
126 // concurrent updates from relations prior to `relation`.
127 let name = format!("delta path {}", source_relation);
128 let path_results = inner.clone().region_named(&name, |region| {
129 // The plan is to move through each relation, starting from `relation` and in the order
130 // indicated in `orders[relation]`. At each moment, we will have the columns from the
131 // subset of relations encountered so far, and we will have applied as much as we can
132 // of the filters in `equivalences` and the logic in `map_filter_project`, based on the
133 // available columns.
134 //
135 // As we go, we will track the physical locations of each intended output column, as well
136 // as the locations of intermediate results from partial application of `map_filter_project`.
137 //
138 // Just before we apply the `lookup` function to perform a join, we will first use our
139 // available information to determine the filtering and logic that we can apply, and
140 // introduce that in to the `lookup` logic to cause it to happen in that operator.
141
142 // Collects error streams for the region scope. Concats before leaving.
143 let mut region_errs = Vec::with_capacity(inputs.len());
144
145 // Ensure this input is rendered, and extract its update stream.
146 let val = arrangements
147 .get(&(source_relation, source_key))
148 .expect("Arrangement promised by the planner is absent!");
149 let as_of = self.as_of_frontier.clone();
150 let update_stream = match val {
151 Ok(local) => {
152 let arranged = local.enter_region(region);
153 let (update_stream, err_stream) =
154 build_update_stream::<_, RowRowAgent<_, _>>(
155 arranged,
156 as_of,
157 source_relation,
158 initial_closure,
159 );
160 region_errs.push(err_stream);
161 update_stream
162 }
163 Err(trace) => {
164 let arranged = trace.enter_region(region);
165 let (update_stream, err_stream) =
166 build_update_stream::<_, RowRowEnter<_, _, _>>(
167 arranged,
168 as_of,
169 source_relation,
170 initial_closure,
171 );
172 region_errs.push(err_stream);
173 update_stream
174 }
175 };
176 // Promote `time` to a datum element.
177 //
178 // The `half_join` operator manipulates as "data" a pair `(data, time)`,
179 // while tracking the initial time `init_time` separately and without
180 // modification. The initial value for both times is the initial time.
181 let mut update_stream = update_stream
182 .inner
183 .map(|(v, t, d)| ((v, t.clone()), t, d))
184 .as_collection();
185
186 // Repeatedly update `update_stream` to reflect joins with more and more
187 // other relations, in the specified order.
188 for stage_plan in stage_plans {
189 let DeltaStagePlan {
190 lookup_relation,
191 stream_key,
192 stream_thinning,
193 lookup_key,
194 closure,
195 } = stage_plan;
196
197 // We require different logic based on the relative order of the two inputs.
198 // If the `source` relation precedes the `lookup` relation, we present all
199 // updates with less or equal `time`, and otherwise we present only updates
200 // with strictly less `time`.
201 //
202 // We need to write the logic twice, as there are two types of arrangement
203 // we might have: either dataflow-local or an imported trace.
204 let (oks, errs) =
205 match arrangements.get(&(lookup_relation, lookup_key)).unwrap() {
206 Ok(local) => {
207 if source_relation < lookup_relation {
208 build_halfjoin::<_, RowRowAgent<_, _>, _>(
209 update_stream,
210 local.enter_region(region),
211 stream_key,
212 stream_thinning,
213 |t1, t2| t1.le(t2),
214 closure,
215 self.shutdown_token.clone(),
216 )
217 } else {
218 build_halfjoin::<_, RowRowAgent<_, _>, _>(
219 update_stream,
220 local.enter_region(region),
221 stream_key,
222 stream_thinning,
223 |t1, t2| t1.lt(t2),
224 closure,
225 self.shutdown_token.clone(),
226 )
227 }
228 }
229 Err(trace) => {
230 if source_relation < lookup_relation {
231 build_halfjoin::<_, RowRowEnter<_, _, _>, _>(
232 update_stream,
233 trace.enter_region(region),
234 stream_key,
235 stream_thinning,
236 |t1, t2| t1.le(t2),
237 closure,
238 self.shutdown_token.clone(),
239 )
240 } else {
241 build_halfjoin::<_, RowRowEnter<_, _, _>, _>(
242 update_stream,
243 trace.enter_region(region),
244 stream_key,
245 stream_thinning,
246 |t1, t2| t1.lt(t2),
247 closure,
248 self.shutdown_token.clone(),
249 )
250 }
251 }
252 };
253 update_stream = oks;
254 region_errs.push(errs);
255 }
256
257 // Delay updates as appropriate.
258 //
259 // The `half_join` operator maintains a time that we now discard (the `_`),
260 // and replace with the `time` that is maintained with the data. The former
261 // exists to pin a consistent total order on updates throughout the process,
262 // while allowing `time` to vary upwards as a result of actions on time.
263 let mut update_stream = update_stream
264 .inner
265 .map(|((row, time), _, diff)| (row, time, diff))
266 .as_collection();
267
268 // We have completed the join building, but may have work remaining.
269 // For example, we may have expressions not pushed down (e.g. literals)
270 // and projections that could not be applied (e.g. column repetition).
271 if let Some(final_closure) = final_closure {
272 let name = "DeltaJoinFinalization";
273 type CB<C> = ConsolidatingContainerBuilder<C>;
274 let (updates, errors) = update_stream
275 .flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>(name, {
276 // Reuseable allocation for unpacking.
277 let mut datums = DatumVec::new();
278 move |row| {
279 let binding = SharedRow::get();
280 let mut row_builder = binding.borrow_mut();
281 let temp_storage = RowArena::new();
282 let mut datums_local = datums.borrow_with(&row);
283 // TODO(mcsherry): re-use `row` allocation.
284 final_closure
285 .apply(&mut datums_local, &temp_storage, &mut row_builder)
286 .map(|row| row.cloned())
287 .map_err(DataflowError::from)
288 .transpose()
289 }
290 });
291
292 update_stream = updates;
293 region_errs.push(errors);
294 }
295
296 inner_errs.push(
297 differential_dataflow::collection::concatenate(region, region_errs)
298 .leave_region(),
299 );
300 update_stream.leave_region()
301 });
302
303 join_results.push(path_results);
304 }
305
306 // Concatenate the results of each delta query as the accumulated results.
307 (
308 differential_dataflow::collection::concatenate(inner, join_results).leave_region(),
309 differential_dataflow::collection::concatenate(inner, inner_errs).leave_region(),
310 )
311 });
312 CollectionBundle::from_collections(oks, errs)
313 }
314}
315
316/// Constructs a `half_join` from supplied arguments.
317///
318/// This method exists to factor common logic from four code paths that are generic over the type of trace.
319/// The `comparison` function should either be `le` or `lt` depending on which relation comes first in the
320/// total order on relations (in order to break ties consistently).
321///
322/// The input and output streams are of pairs `(data, time)` where the `time` component can be greater than
323/// the time of the update. This operator may manipulate `time` as part of this pair, but will not manipulate
324/// the time of the update. This is crucial for correctness, as the total order on times of updates is used
325/// to ensure that any two updates are matched at most once.
326fn build_halfjoin<G, Tr, CF>(
327 updates: Collection<G, (Row, G::Timestamp), Diff>,
328 trace: Arranged<G, Tr>,
329 prev_key: Vec<MirScalarExpr>,
330 prev_thinning: Vec<usize>,
331 comparison: CF,
332 closure: JoinClosure,
333 shutdown_token: ShutdownToken,
334) -> (
335 Collection<G, (Row, G::Timestamp), Diff>,
336 Collection<G, DataflowError, Diff>,
337)
338where
339 G: Scope,
340 G::Timestamp: RenderTimestamp,
341 <G::Timestamp as Columnar>::Container: Clone + Send,
342 for<'a> <G::Timestamp as Columnar>::Ref<'a>: Ord + Copy,
343 Tr: TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
344 for<'a> Tr::Key<'a>: IntoOwned<'a, Owned = Row>,
345 for<'a> Tr::Val<'a>: ToDatumIter,
346 CF: Fn(Tr::TimeGat<'_>, &G::Timestamp) -> bool + 'static,
347{
348 let name = "DeltaJoinKeyPreparation";
349 type CB<C> = CapacityContainerBuilder<C>;
350 let (updates, errs) = updates.map_fallible::<CB<_>, CB<_>, _, _, _>(name, {
351 // Reuseable allocation for unpacking.
352 let mut datums = DatumVec::new();
353 move |(row, time)| {
354 let temp_storage = RowArena::new();
355 let datums_local = datums.borrow_with(&row);
356 let binding = SharedRow::get();
357 let mut row_builder = binding.borrow_mut();
358 row_builder.packer().try_extend(
359 prev_key
360 .iter()
361 .map(|e| e.eval(&datums_local, &temp_storage)),
362 )?;
363 let key = row_builder.clone();
364 row_builder
365 .packer()
366 .extend(prev_thinning.iter().map(|&c| datums_local[c]));
367 let row_value = row_builder.clone();
368
369 Ok((key, row_value, time))
370 }
371 });
372 let mut datums = DatumVec::new();
373
374 if closure.could_error() {
375 let (oks, errs2) = differential_dogs3::operators::half_join::half_join_internal_unsafe(
376 &updates,
377 trace,
378 |time, antichain| {
379 antichain.insert(time.step_back());
380 },
381 comparison,
382 // TODO(mcsherry): investigate/establish trade-offs here; time based had problems,
383 // in that we seem to yield too much and do too little work when we do.
384 |_timer, count| count > 1_000_000,
385 // TODO(mcsherry): consider `RefOrMut` in `half_join` interface to allow re-use.
386 move |key, stream_row, lookup_row, initial, time, diff1, diff2| {
387 // Check the shutdown token to avoid doing unnecessary work when the dataflow is
388 // shutting down.
389 shutdown_token.probe()?;
390
391 let binding = SharedRow::get();
392 let mut row_builder = binding.borrow_mut();
393 let temp_storage = RowArena::new();
394
395 let mut datums_local = datums.borrow();
396 datums_local.extend(key.iter());
397 datums_local.extend(stream_row.iter());
398 datums_local.extend(lookup_row.to_datum_iter());
399
400 let row = closure
401 .apply(&mut datums_local, &temp_storage, &mut row_builder)
402 .map(|row| row.cloned());
403 let diff = diff1.clone() * diff2.clone();
404 let dout = (row, time.clone());
405 Some((dout, initial.clone(), diff))
406 },
407 )
408 .inner
409 .ok_err(|(data_time, init_time, diff)| {
410 // TODO(mcsherry): consider `ok_err()` for `Collection`.
411 match data_time {
412 (Ok(data), time) => Ok((data.map(|data| (data, time)), init_time, diff)),
413 (Err(err), _time) => Err((DataflowError::from(err), init_time, diff)),
414 }
415 });
416
417 (
418 oks.as_collection().flat_map(|x| x),
419 errs.concat(&errs2.as_collection()),
420 )
421 } else {
422 let oks = differential_dogs3::operators::half_join::half_join_internal_unsafe(
423 &updates,
424 trace,
425 |time, antichain| {
426 antichain.insert(time.step_back());
427 },
428 comparison,
429 // TODO(mcsherry): investigate/establish trade-offs here; time based had problems,
430 // in that we seem to yield too much and do too little work when we do.
431 |_timer, count| count > 1_000_000,
432 // TODO(mcsherry): consider `RefOrMut` in `half_join` interface to allow re-use.
433 move |key, stream_row, lookup_row, initial, time, diff1, diff2| {
434 // Check the shutdown token to avoid doing unnecessary work when the dataflow is
435 // shutting down.
436 shutdown_token.probe()?;
437
438 let binding = SharedRow::get();
439 let mut row_builder = binding.borrow_mut();
440 let temp_storage = RowArena::new();
441
442 let mut datums_local = datums.borrow();
443 datums_local.extend(key.iter());
444 datums_local.extend(stream_row.iter());
445 datums_local.extend(lookup_row.to_datum_iter());
446
447 let row = closure
448 .apply(&mut datums_local, &temp_storage, &mut row_builder)
449 .map(|row| row.cloned())
450 .expect("Closure claimed to never errer");
451 let diff = diff1.clone() * diff2.clone();
452 row.map(|r| ((r, time.clone()), initial.clone(), diff))
453 },
454 );
455
456 (oks, errs)
457 }
458}
459
460/// Builds the beginning of the update stream of a delta path.
461///
462/// At start-up time only the delta path for the first relation sees updates, since any updates fed to the
463/// other delta paths would be discarded anyway due to the tie-breaking logic that avoids double-counting
464/// updates happening at the same time on different relations.
465fn build_update_stream<G, Tr>(
466 trace: Arranged<G, Tr>,
467 as_of: Antichain<mz_repr::Timestamp>,
468 source_relation: usize,
469 initial_closure: JoinClosure,
470) -> (Collection<G, Row, Diff>, Collection<G, DataflowError, Diff>)
471where
472 G: Scope,
473 G::Timestamp: RenderTimestamp,
474 <G::Timestamp as Columnar>::Container: Clone + Send,
475 for<'a> <G::Timestamp as Columnar>::Ref<'a>: Ord + Copy,
476 for<'a, 'b> &'a G::Timestamp: PartialEq<Tr::TimeGat<'b>>,
477 Tr: for<'a> TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
478 for<'a> Tr::Key<'a>: ToDatumIter,
479 for<'a> Tr::Val<'a>: ToDatumIter,
480{
481 let mut inner_as_of = Antichain::new();
482 for event_time in as_of.elements().iter() {
483 inner_as_of.insert(<G::Timestamp>::to_inner(event_time.clone()));
484 }
485
486 let (ok_stream, err_stream) =
487 trace
488 .stream
489 .unary_fallible(Pipeline, "UpdateStream", move |_, _| {
490 let mut datums = DatumVec::new();
491 Box::new(move |input, ok_output, err_output| {
492 input.for_each(|time, data| {
493 let binding = SharedRow::get();
494 let mut row_builder = binding.borrow_mut();
495 let mut ok_session = ok_output.session(&time);
496 let mut err_session = err_output.session(&time);
497
498 for wrapper in data.iter() {
499 let batch = &wrapper;
500 let mut cursor = batch.cursor();
501 while let Some(key) = cursor.get_key(batch) {
502 while let Some(val) = cursor.get_val(batch) {
503 cursor.map_times(batch, |time, diff| {
504 // note: only the delta path for the first relation will see
505 // updates at start-up time
506 if source_relation == 0
507 || inner_as_of.elements().iter().all(|e| e != time)
508 {
509 let time = time.into_owned();
510 let temp_storage = RowArena::new();
511
512 let mut datums_local = datums.borrow();
513 datums_local.extend(key.to_datum_iter());
514 datums_local.extend(val.to_datum_iter());
515
516 if !initial_closure.is_identity() {
517 match initial_closure
518 .apply(
519 &mut datums_local,
520 &temp_storage,
521 &mut row_builder,
522 )
523 .map(|row| row.cloned())
524 .transpose()
525 {
526 Some(Ok(row)) => ok_session.give((
527 row,
528 time,
529 diff.into_owned(),
530 )),
531 Some(Err(err)) => err_session.give((
532 err,
533 time,
534 diff.into_owned(),
535 )),
536 None => {}
537 }
538 } else {
539 let row = {
540 row_builder.packer().extend(&*datums_local);
541 row_builder.clone()
542 };
543 ok_session.give((row, time, diff.into_owned()));
544 }
545 }
546 });
547 cursor.step_val(batch);
548 }
549 cursor.step_key(batch);
550 }
551 }
552 });
553 })
554 });
555
556 (
557 ok_stream.as_collection(),
558 err_stream.as_collection().map(DataflowError::from),
559 )
560}