mz_compute/render/join/
delta_join.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Delta join execution dataflow construction.
11//!
12//! Consult [DeltaJoinPlan] documentation for details.
13
14#![allow(clippy::op_ref)]
15
16use std::collections::{BTreeMap, BTreeSet};
17
18use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
19use differential_dataflow::operators::arrange::Arranged;
20use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
21use differential_dataflow::{AsCollection, Collection};
22use mz_compute_types::plan::join::JoinClosure;
23use mz_compute_types::plan::join::delta_join::{DeltaJoinPlan, DeltaPathPlan, DeltaStagePlan};
24use mz_expr::MirScalarExpr;
25use mz_repr::fixed_length::ToDatumIter;
26use mz_repr::{DatumVec, Diff, Row, RowArena, SharedRow};
27use mz_storage_types::errors::DataflowError;
28use mz_timely_util::operator::{CollectionExt, SessionFor, StreamExt};
29use timely::container::CapacityContainerBuilder;
30use timely::dataflow::Scope;
31use timely::dataflow::channels::pact::Pipeline;
32use timely::dataflow::operators::{Map, OkErr};
33use timely::progress::Antichain;
34use timely::progress::timestamp::Refines;
35
36use crate::render::RenderTimestamp;
37use crate::render::context::{ArrangementFlavor, CollectionBundle, Context, ShutdownProbe};
38use crate::typedefs::{RowRowAgent, RowRowEnter};
39
40impl<G> Context<G>
41where
42    G: Scope,
43    G::Timestamp: RenderTimestamp,
44{
45    /// Renders `MirRelationExpr:Join` using dogs^3 delta query dataflows.
46    ///
47    /// The join is followed by the application of `map_filter_project`, whose
48    /// implementation will be pushed in to the join pipeline if at all possible.
49    pub fn render_delta_join(
50        &self,
51        inputs: Vec<CollectionBundle<G>>,
52        join_plan: DeltaJoinPlan,
53    ) -> CollectionBundle<G> {
54        // We create a new region to contain the dataflow paths for the delta join.
55        let (oks, errs) = self.scope.clone().region_named("Join(Delta)", |inner| {
56            // Collects error streams for the ambient scope.
57            let mut inner_errs = Vec::new();
58
59            // Deduplicate the error streams of multiply used arrangements.
60            let mut err_dedup = BTreeSet::new();
61
62            // Our plan is to iterate through each input relation, and attempt
63            // to find a plan that maximally uses existing keys (better: uses
64            // existing arrangements, to which we have access).
65            let mut join_results = Vec::new();
66
67            // First let's prepare the input arrangements we will need.
68            // This reduces redundant imports, and simplifies the dataflow structure.
69            // As the arrangements are all shared, it should not dramatically improve
70            // the efficiency, but the dataflow simplification is worth doing.
71            let mut arrangements = BTreeMap::new();
72            for path_plan in join_plan.path_plans.iter() {
73                for stage_plan in path_plan.stage_plans.iter() {
74                    let lookup_idx = stage_plan.lookup_relation;
75                    let lookup_key = stage_plan.lookup_key.clone();
76                    arrangements
77                        .entry((lookup_idx, lookup_key.clone()))
78                        .or_insert_with(|| {
79                            match inputs[lookup_idx]
80                                .arrangement(&lookup_key)
81                                .unwrap_or_else(|| {
82                                    panic!(
83                                        "Arrangement alarmingly absent!: {}, {:?}",
84                                        lookup_idx, lookup_key,
85                                    )
86                                }) {
87                                ArrangementFlavor::Local(oks, errs) => {
88                                    if err_dedup.insert((lookup_idx, lookup_key)) {
89                                        inner_errs.push(
90                                            errs.enter_region(inner)
91                                                .as_collection(|k, _v| k.clone()),
92                                        );
93                                    }
94                                    Ok(oks.enter_region(inner))
95                                }
96                                ArrangementFlavor::Trace(_gid, oks, errs) => {
97                                    if err_dedup.insert((lookup_idx, lookup_key)) {
98                                        inner_errs.push(
99                                            errs.enter_region(inner)
100                                                .as_collection(|k, _v| k.clone()),
101                                        );
102                                    }
103                                    Err(oks.enter_region(inner))
104                                }
105                            }
106                        });
107                }
108            }
109
110            for path_plan in join_plan.path_plans {
111                // Deconstruct the stages of the path plan.
112                let DeltaPathPlan {
113                    source_relation,
114                    initial_closure,
115                    stage_plans,
116                    final_closure,
117                    source_key,
118                } = path_plan;
119
120                // This collection determines changes that result from updates inbound
121                // from `inputs[relation]` and reflects all strictly prior updates and
122                // concurrent updates from relations prior to `relation`.
123                let name = format!("delta path {}", source_relation);
124                let path_results = inner.clone().region_named(&name, |region| {
125                    // The plan is to move through each relation, starting from `relation` and in the order
126                    // indicated in `orders[relation]`. At each moment, we will have the columns from the
127                    // subset of relations encountered so far, and we will have applied as much as we can
128                    // of the filters in `equivalences` and the logic in `map_filter_project`, based on the
129                    // available columns.
130                    //
131                    // As we go, we will track the physical locations of each intended output column, as well
132                    // as the locations of intermediate results from partial application of `map_filter_project`.
133                    //
134                    // Just before we apply the `lookup` function to perform a join, we will first use our
135                    // available information to determine the filtering and logic that we can apply, and
136                    // introduce that in to the `lookup` logic to cause it to happen in that operator.
137
138                    // Collects error streams for the region scope. Concats before leaving.
139                    let mut region_errs = Vec::with_capacity(inputs.len());
140
141                    // Ensure this input is rendered, and extract its update stream.
142                    let val = arrangements
143                        .get(&(source_relation, source_key))
144                        .expect("Arrangement promised by the planner is absent!");
145                    let as_of = self.as_of_frontier.clone();
146                    let update_stream = match val {
147                        Ok(local) => {
148                            let arranged = local.enter_region(region);
149                            let (update_stream, err_stream) =
150                                build_update_stream::<_, RowRowAgent<_, _>>(
151                                    arranged,
152                                    as_of,
153                                    source_relation,
154                                    initial_closure,
155                                );
156                            region_errs.push(err_stream);
157                            update_stream
158                        }
159                        Err(trace) => {
160                            let arranged = trace.enter_region(region);
161                            let (update_stream, err_stream) =
162                                build_update_stream::<_, RowRowEnter<_, _, _>>(
163                                    arranged,
164                                    as_of,
165                                    source_relation,
166                                    initial_closure,
167                                );
168                            region_errs.push(err_stream);
169                            update_stream
170                        }
171                    };
172                    // Promote `time` to a datum element.
173                    //
174                    // The `half_join` operator manipulates as "data" a pair `(data, time)`,
175                    // while tracking the initial time `init_time` separately and without
176                    // modification. The initial value for both times is the initial time.
177                    let mut update_stream = update_stream
178                        .inner
179                        .map(|(v, t, d)| ((v, t.clone()), t, d))
180                        .as_collection();
181
182                    // Repeatedly update `update_stream` to reflect joins with more and more
183                    // other relations, in the specified order.
184                    for stage_plan in stage_plans {
185                        let DeltaStagePlan {
186                            lookup_relation,
187                            stream_key,
188                            stream_thinning,
189                            lookup_key,
190                            closure,
191                        } = stage_plan;
192
193                        // We require different logic based on the relative order of the two inputs.
194                        // If the `source` relation precedes the `lookup` relation, we present all
195                        // updates with less or equal `time`, and otherwise we present only updates
196                        // with strictly less `time`.
197                        //
198                        // We need to write the logic twice, as there are two types of arrangement
199                        // we might have: either dataflow-local or an imported trace.
200                        let (oks, errs) =
201                            match arrangements.get(&(lookup_relation, lookup_key)).unwrap() {
202                                Ok(local) => {
203                                    if source_relation < lookup_relation {
204                                        build_halfjoin::<_, RowRowAgent<_, _>, _>(
205                                            update_stream,
206                                            local.enter_region(region),
207                                            stream_key,
208                                            stream_thinning,
209                                            |t1, t2| t1.le(t2),
210                                            closure,
211                                            self.shutdown_probe.clone(),
212                                        )
213                                    } else {
214                                        build_halfjoin::<_, RowRowAgent<_, _>, _>(
215                                            update_stream,
216                                            local.enter_region(region),
217                                            stream_key,
218                                            stream_thinning,
219                                            |t1, t2| t1.lt(t2),
220                                            closure,
221                                            self.shutdown_probe.clone(),
222                                        )
223                                    }
224                                }
225                                Err(trace) => {
226                                    if source_relation < lookup_relation {
227                                        build_halfjoin::<_, RowRowEnter<_, _, _>, _>(
228                                            update_stream,
229                                            trace.enter_region(region),
230                                            stream_key,
231                                            stream_thinning,
232                                            |t1, t2| t1.le(t2),
233                                            closure,
234                                            self.shutdown_probe.clone(),
235                                        )
236                                    } else {
237                                        build_halfjoin::<_, RowRowEnter<_, _, _>, _>(
238                                            update_stream,
239                                            trace.enter_region(region),
240                                            stream_key,
241                                            stream_thinning,
242                                            |t1, t2| t1.lt(t2),
243                                            closure,
244                                            self.shutdown_probe.clone(),
245                                        )
246                                    }
247                                }
248                            };
249                        update_stream = oks;
250                        region_errs.push(errs);
251                    }
252
253                    // Delay updates as appropriate.
254                    //
255                    // The `half_join` operator maintains a time that we now discard (the `_`),
256                    // and replace with the `time` that is maintained with the data. The former
257                    // exists to pin a consistent total order on updates throughout the process,
258                    // while allowing `time` to vary upwards as a result of actions on time.
259                    let mut update_stream = update_stream
260                        .inner
261                        .map(|((row, time), _, diff)| (row, time, diff))
262                        .as_collection();
263
264                    // We have completed the join building, but may have work remaining.
265                    // For example, we may have expressions not pushed down (e.g. literals)
266                    // and projections that could not be applied (e.g. column repetition).
267                    if let Some(final_closure) = final_closure {
268                        let name = "DeltaJoinFinalization";
269                        type CB<C> = ConsolidatingContainerBuilder<C>;
270                        let (updates, errors) = update_stream
271                            .flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>(name, {
272                                // Reuseable allocation for unpacking.
273                                let mut datums = DatumVec::new();
274                                move |row| {
275                                    let mut row_builder = SharedRow::get();
276                                    let temp_storage = RowArena::new();
277                                    let mut datums_local = datums.borrow_with(&row);
278                                    // TODO(mcsherry): re-use `row` allocation.
279                                    final_closure
280                                        .apply(&mut datums_local, &temp_storage, &mut row_builder)
281                                        .map(|row| row.cloned())
282                                        .map_err(DataflowError::from)
283                                        .transpose()
284                                }
285                            });
286
287                        update_stream = updates;
288                        region_errs.push(errors);
289                    }
290
291                    inner_errs.push(
292                        differential_dataflow::collection::concatenate(region, region_errs)
293                            .leave_region(),
294                    );
295                    update_stream.leave_region()
296                });
297
298                join_results.push(path_results);
299            }
300
301            // Concatenate the results of each delta query as the accumulated results.
302            (
303                differential_dataflow::collection::concatenate(inner, join_results).leave_region(),
304                differential_dataflow::collection::concatenate(inner, inner_errs).leave_region(),
305            )
306        });
307        CollectionBundle::from_collections(oks, errs)
308    }
309}
310
311/// Constructs a `half_join` from supplied arguments.
312///
313/// This method exists to factor common logic from four code paths that are generic over the type of trace.
314/// The `comparison` function should either be `le` or `lt` depending on which relation comes first in the
315/// total order on relations (in order to break ties consistently).
316///
317/// The input and output streams are of pairs `(data, time)` where the `time` component can be greater than
318/// the time of the update. This operator may manipulate `time` as part of this pair, but will not manipulate
319/// the time of the update. This is crucial for correctness, as the total order on times of updates is used
320/// to ensure that any two updates are matched at most once.
321fn build_halfjoin<G, Tr, CF>(
322    updates: Collection<G, (Row, G::Timestamp), Diff>,
323    trace: Arranged<G, Tr>,
324    prev_key: Vec<MirScalarExpr>,
325    prev_thinning: Vec<usize>,
326    comparison: CF,
327    closure: JoinClosure,
328    shutdown_probe: ShutdownProbe,
329) -> (
330    Collection<G, (Row, G::Timestamp), Diff>,
331    Collection<G, DataflowError, Diff>,
332)
333where
334    G: Scope,
335    G::Timestamp: RenderTimestamp,
336    Tr: TraceReader<KeyOwn = Row, Time = G::Timestamp, Diff = Diff> + Clone + 'static,
337    for<'a> Tr::Val<'a>: ToDatumIter,
338    CF: Fn(Tr::TimeGat<'_>, &G::Timestamp) -> bool + 'static,
339{
340    let name = "DeltaJoinKeyPreparation";
341    type CB<C> = CapacityContainerBuilder<C>;
342    let (updates, errs) = updates.map_fallible::<CB<_>, CB<_>, _, _, _>(name, {
343        // Reuseable allocation for unpacking.
344        let mut datums = DatumVec::new();
345        move |(row, time)| {
346            let temp_storage = RowArena::new();
347            let datums_local = datums.borrow_with(&row);
348            let mut row_builder = SharedRow::get();
349            row_builder.packer().try_extend(
350                prev_key
351                    .iter()
352                    .map(|e| e.eval(&datums_local, &temp_storage)),
353            )?;
354            let key = row_builder.clone();
355            row_builder
356                .packer()
357                .extend(prev_thinning.iter().map(|&c| datums_local[c]));
358            let row_value = row_builder.clone();
359
360            Ok((key, row_value, time))
361        }
362    });
363    let mut datums = DatumVec::new();
364
365    if closure.could_error() {
366        let (oks, errs2) = differential_dogs3::operators::half_join::half_join_internal_unsafe(
367            &updates,
368            trace,
369            |time, antichain| {
370                antichain.insert(time.step_back());
371            },
372            comparison,
373            // TODO(mcsherry): investigate/establish trade-offs here; time based had problems,
374            // in that we seem to yield too much and do too little work when we do.
375            |_timer, count| count > 1_000_000,
376            // TODO(mcsherry): consider `RefOrMut` in `half_join` interface to allow re-use.
377            move |session: &mut SessionFor<G, CB<Vec<_>>>,
378                  key,
379                  stream_row,
380                  lookup_row,
381                  initial,
382                  diff1,
383                  output| {
384                // Check the shutdown token to avoid doing unnecessary work when the dataflow is
385                // shutting down.
386                if shutdown_probe.in_shutdown() || output.is_empty() {
387                    return;
388                }
389
390                let mut row_builder = SharedRow::get();
391                let temp_storage = RowArena::new();
392
393                let mut datums_local = datums.borrow();
394                datums_local.extend(key.iter());
395                datums_local.extend(stream_row.iter());
396                datums_local.extend(lookup_row.to_datum_iter());
397
398                let row = closure.apply(&mut datums_local, &temp_storage, &mut row_builder);
399
400                for (time, diff2) in output.drain(..) {
401                    let row = row.as_ref().map(|row| row.cloned()).map_err(Clone::clone);
402                    let diff = diff1.clone() * diff2.clone();
403                    let data = ((row, time.clone()), initial.clone(), diff);
404                    session.give(data);
405                }
406            },
407        )
408        .ok_err(|(data_time, init_time, diff)| {
409            // TODO(mcsherry): consider `ok_err()` for `Collection`.
410            match data_time {
411                (Ok(data), time) => Ok((data.map(|data| (data, time)), init_time, diff)),
412                (Err(err), _time) => Err((DataflowError::from(err), init_time, diff)),
413            }
414        });
415
416        (
417            oks.as_collection().flat_map(|x| x),
418            errs.concat(&errs2.as_collection()),
419        )
420    } else {
421        let oks = differential_dogs3::operators::half_join::half_join_internal_unsafe(
422            &updates,
423            trace,
424            |time, antichain| {
425                antichain.insert(time.step_back());
426            },
427            comparison,
428            // TODO(mcsherry): investigate/establish trade-offs here; time based had problems,
429            // in that we seem to yield too much and do too little work when we do.
430            |_timer, count| count > 1_000_000,
431            // TODO(mcsherry): consider `RefOrMut` in `half_join` interface to allow re-use.
432            move |session: &mut SessionFor<G, CB<Vec<_>>>,
433                  key,
434                  stream_row,
435                  lookup_row,
436                  initial,
437                  diff1,
438                  output| {
439                // Check the shutdown token to avoid doing unnecessary work when the dataflow is
440                // shutting down.
441                if shutdown_probe.in_shutdown() || output.is_empty() {
442                    return;
443                }
444
445                let mut row_builder = SharedRow::get();
446                let temp_storage = RowArena::new();
447
448                let mut datums_local = datums.borrow();
449                datums_local.extend(key.iter());
450                datums_local.extend(stream_row.iter());
451                datums_local.extend(lookup_row.to_datum_iter());
452
453                if let Some(row) = closure
454                    .apply(&mut datums_local, &temp_storage, &mut row_builder)
455                    .expect("Closure claimed to never error")
456                {
457                    for (time, diff2) in output.drain(..) {
458                        let diff = diff1.clone() * diff2.clone();
459                        session.give(((row.clone(), time.clone()), initial.clone(), diff));
460                    }
461                }
462            },
463        );
464
465        (oks.as_collection(), errs)
466    }
467}
468
469/// Builds the beginning of the update stream of a delta path.
470///
471/// At start-up time only the delta path for the first relation sees updates, since any updates fed to the
472/// other delta paths would be discarded anyway due to the tie-breaking logic that avoids double-counting
473/// updates happening at the same time on different relations.
474fn build_update_stream<G, Tr>(
475    trace: Arranged<G, Tr>,
476    as_of: Antichain<mz_repr::Timestamp>,
477    source_relation: usize,
478    initial_closure: JoinClosure,
479) -> (Collection<G, Row, Diff>, Collection<G, DataflowError, Diff>)
480where
481    G: Scope,
482    G::Timestamp: RenderTimestamp,
483    for<'a, 'b> &'a G::Timestamp: PartialEq<Tr::TimeGat<'b>>,
484    Tr: for<'a> TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
485    for<'a> Tr::Key<'a>: ToDatumIter,
486    for<'a> Tr::Val<'a>: ToDatumIter,
487{
488    let mut inner_as_of = Antichain::new();
489    for event_time in as_of.elements().iter() {
490        inner_as_of.insert(<G::Timestamp>::to_inner(event_time.clone()));
491    }
492
493    let (ok_stream, err_stream) =
494        trace
495            .stream
496            .unary_fallible(Pipeline, "UpdateStream", move |_, _| {
497                let mut datums = DatumVec::new();
498                Box::new(move |input, ok_output, err_output| {
499                    input.for_each(|time, data| {
500                        let mut row_builder = SharedRow::get();
501                        let mut ok_session = ok_output.session(&time);
502                        let mut err_session = err_output.session(&time);
503
504                        for wrapper in data.iter() {
505                            let batch = &wrapper;
506                            let mut cursor = batch.cursor();
507                            while let Some(key) = cursor.get_key(batch) {
508                                while let Some(val) = cursor.get_val(batch) {
509                                    cursor.map_times(batch, |time, diff| {
510                                        // note: only the delta path for the first relation will see
511                                        // updates at start-up time
512                                        if source_relation == 0
513                                            || inner_as_of.elements().iter().all(|e| e != time)
514                                        {
515                                            let time = Tr::owned_time(time);
516                                            let temp_storage = RowArena::new();
517
518                                            let mut datums_local = datums.borrow();
519                                            datums_local.extend(key.to_datum_iter());
520                                            datums_local.extend(val.to_datum_iter());
521
522                                            if !initial_closure.is_identity() {
523                                                match initial_closure
524                                                    .apply(
525                                                        &mut datums_local,
526                                                        &temp_storage,
527                                                        &mut row_builder,
528                                                    )
529                                                    .map(|row| row.cloned())
530                                                    .transpose()
531                                                {
532                                                    Some(Ok(row)) => ok_session.give((
533                                                        row,
534                                                        time,
535                                                        Tr::owned_diff(diff),
536                                                    )),
537                                                    Some(Err(err)) => err_session.give((
538                                                        err,
539                                                        time,
540                                                        Tr::owned_diff(diff),
541                                                    )),
542                                                    None => {}
543                                                }
544                                            } else {
545                                                let row = {
546                                                    row_builder.packer().extend(&*datums_local);
547                                                    row_builder.clone()
548                                                };
549                                                ok_session.give((row, time, Tr::owned_diff(diff)));
550                                            }
551                                        }
552                                    });
553                                    cursor.step_val(batch);
554                                }
555                                cursor.step_key(batch);
556                            }
557                        }
558                    });
559                })
560            });
561
562    (
563        ok_stream.as_collection(),
564        err_stream.as_collection().map(DataflowError::from),
565    )
566}