mz_compute/render/join/
delta_join.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Delta join execution dataflow construction.
11//!
12//! Consult [DeltaJoinPlan] documentation for details.
13
14#![allow(clippy::op_ref)]
15
16use std::collections::{BTreeMap, BTreeSet};
17
18use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
19use differential_dataflow::operators::arrange::Arranged;
20use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
21use differential_dataflow::{AsCollection, Collection};
22use mz_compute_types::plan::join::JoinClosure;
23use mz_compute_types::plan::join::delta_join::{DeltaJoinPlan, DeltaPathPlan, DeltaStagePlan};
24use mz_expr::MirScalarExpr;
25use mz_repr::fixed_length::ToDatumIter;
26use mz_repr::{DatumVec, Diff, Row, RowArena, SharedRow};
27use mz_storage_types::errors::DataflowError;
28use mz_timely_util::operator::{CollectionExt, StreamExt};
29use timely::container::{CapacityContainerBuilder, ContainerBuilder};
30use timely::dataflow::channels::pact::Pipeline;
31use timely::dataflow::channels::pushers::Tee;
32use timely::dataflow::channels::pushers::buffer::Session;
33use timely::dataflow::operators::{Map, OkErr};
34use timely::dataflow::{Scope, ScopeParent};
35use timely::progress::Antichain;
36use timely::progress::timestamp::Refines;
37
38use crate::render::RenderTimestamp;
39use crate::render::context::{ArrangementFlavor, CollectionBundle, Context, ShutdownProbe};
40use crate::typedefs::{RowRowAgent, RowRowEnter};
41
42impl<G> Context<G>
43where
44    G: Scope,
45    G::Timestamp: RenderTimestamp,
46{
47    /// Renders `MirRelationExpr:Join` using dogs^3 delta query dataflows.
48    ///
49    /// The join is followed by the application of `map_filter_project`, whose
50    /// implementation will be pushed in to the join pipeline if at all possible.
51    pub fn render_delta_join(
52        &self,
53        inputs: Vec<CollectionBundle<G>>,
54        join_plan: DeltaJoinPlan,
55    ) -> CollectionBundle<G> {
56        // We create a new region to contain the dataflow paths for the delta join.
57        let (oks, errs) = self.scope.clone().region_named("Join(Delta)", |inner| {
58            // Collects error streams for the ambient scope.
59            let mut inner_errs = Vec::new();
60
61            // Deduplicate the error streams of multiply used arrangements.
62            let mut err_dedup = BTreeSet::new();
63
64            // Our plan is to iterate through each input relation, and attempt
65            // to find a plan that maximally uses existing keys (better: uses
66            // existing arrangements, to which we have access).
67            let mut join_results = Vec::new();
68
69            // First let's prepare the input arrangements we will need.
70            // This reduces redundant imports, and simplifies the dataflow structure.
71            // As the arrangements are all shared, it should not dramatically improve
72            // the efficiency, but the dataflow simplification is worth doing.
73            let mut arrangements = BTreeMap::new();
74            for path_plan in join_plan.path_plans.iter() {
75                for stage_plan in path_plan.stage_plans.iter() {
76                    let lookup_idx = stage_plan.lookup_relation;
77                    let lookup_key = stage_plan.lookup_key.clone();
78                    arrangements
79                        .entry((lookup_idx, lookup_key.clone()))
80                        .or_insert_with(|| {
81                            match inputs[lookup_idx]
82                                .arrangement(&lookup_key)
83                                .unwrap_or_else(|| {
84                                    panic!(
85                                        "Arrangement alarmingly absent!: {}, {:?}",
86                                        lookup_idx, lookup_key,
87                                    )
88                                }) {
89                                ArrangementFlavor::Local(oks, errs) => {
90                                    if err_dedup.insert((lookup_idx, lookup_key)) {
91                                        inner_errs.push(
92                                            errs.enter_region(inner)
93                                                .as_collection(|k, _v| k.clone()),
94                                        );
95                                    }
96                                    Ok(oks.enter_region(inner))
97                                }
98                                ArrangementFlavor::Trace(_gid, oks, errs) => {
99                                    if err_dedup.insert((lookup_idx, lookup_key)) {
100                                        inner_errs.push(
101                                            errs.enter_region(inner)
102                                                .as_collection(|k, _v| k.clone()),
103                                        );
104                                    }
105                                    Err(oks.enter_region(inner))
106                                }
107                            }
108                        });
109                }
110            }
111
112            for path_plan in join_plan.path_plans {
113                // Deconstruct the stages of the path plan.
114                let DeltaPathPlan {
115                    source_relation,
116                    initial_closure,
117                    stage_plans,
118                    final_closure,
119                    source_key,
120                } = path_plan;
121
122                // This collection determines changes that result from updates inbound
123                // from `inputs[relation]` and reflects all strictly prior updates and
124                // concurrent updates from relations prior to `relation`.
125                let name = format!("delta path {}", source_relation);
126                let path_results = inner.clone().region_named(&name, |region| {
127                    // The plan is to move through each relation, starting from `relation` and in the order
128                    // indicated in `orders[relation]`. At each moment, we will have the columns from the
129                    // subset of relations encountered so far, and we will have applied as much as we can
130                    // of the filters in `equivalences` and the logic in `map_filter_project`, based on the
131                    // available columns.
132                    //
133                    // As we go, we will track the physical locations of each intended output column, as well
134                    // as the locations of intermediate results from partial application of `map_filter_project`.
135                    //
136                    // Just before we apply the `lookup` function to perform a join, we will first use our
137                    // available information to determine the filtering and logic that we can apply, and
138                    // introduce that in to the `lookup` logic to cause it to happen in that operator.
139
140                    // Collects error streams for the region scope. Concats before leaving.
141                    let mut region_errs = Vec::with_capacity(inputs.len());
142
143                    // Ensure this input is rendered, and extract its update stream.
144                    let val = arrangements
145                        .get(&(source_relation, source_key))
146                        .expect("Arrangement promised by the planner is absent!");
147                    let as_of = self.as_of_frontier.clone();
148                    let update_stream = match val {
149                        Ok(local) => {
150                            let arranged = local.enter_region(region);
151                            let (update_stream, err_stream) =
152                                build_update_stream::<_, RowRowAgent<_, _>>(
153                                    arranged,
154                                    as_of,
155                                    source_relation,
156                                    initial_closure,
157                                );
158                            region_errs.push(err_stream);
159                            update_stream
160                        }
161                        Err(trace) => {
162                            let arranged = trace.enter_region(region);
163                            let (update_stream, err_stream) =
164                                build_update_stream::<_, RowRowEnter<_, _, _>>(
165                                    arranged,
166                                    as_of,
167                                    source_relation,
168                                    initial_closure,
169                                );
170                            region_errs.push(err_stream);
171                            update_stream
172                        }
173                    };
174                    // Promote `time` to a datum element.
175                    //
176                    // The `half_join` operator manipulates as "data" a pair `(data, time)`,
177                    // while tracking the initial time `init_time` separately and without
178                    // modification. The initial value for both times is the initial time.
179                    let mut update_stream = update_stream
180                        .inner
181                        .map(|(v, t, d)| ((v, t.clone()), t, d))
182                        .as_collection();
183
184                    // Repeatedly update `update_stream` to reflect joins with more and more
185                    // other relations, in the specified order.
186                    for stage_plan in stage_plans {
187                        let DeltaStagePlan {
188                            lookup_relation,
189                            stream_key,
190                            stream_thinning,
191                            lookup_key,
192                            closure,
193                        } = stage_plan;
194
195                        // We require different logic based on the relative order of the two inputs.
196                        // If the `source` relation precedes the `lookup` relation, we present all
197                        // updates with less or equal `time`, and otherwise we present only updates
198                        // with strictly less `time`.
199                        //
200                        // We need to write the logic twice, as there are two types of arrangement
201                        // we might have: either dataflow-local or an imported trace.
202                        let (oks, errs) =
203                            match arrangements.get(&(lookup_relation, lookup_key)).unwrap() {
204                                Ok(local) => {
205                                    if source_relation < lookup_relation {
206                                        build_halfjoin::<_, RowRowAgent<_, _>, _>(
207                                            update_stream,
208                                            local.enter_region(region),
209                                            stream_key,
210                                            stream_thinning,
211                                            |t1, t2| t1.le(t2),
212                                            closure,
213                                            self.shutdown_probe.clone(),
214                                        )
215                                    } else {
216                                        build_halfjoin::<_, RowRowAgent<_, _>, _>(
217                                            update_stream,
218                                            local.enter_region(region),
219                                            stream_key,
220                                            stream_thinning,
221                                            |t1, t2| t1.lt(t2),
222                                            closure,
223                                            self.shutdown_probe.clone(),
224                                        )
225                                    }
226                                }
227                                Err(trace) => {
228                                    if source_relation < lookup_relation {
229                                        build_halfjoin::<_, RowRowEnter<_, _, _>, _>(
230                                            update_stream,
231                                            trace.enter_region(region),
232                                            stream_key,
233                                            stream_thinning,
234                                            |t1, t2| t1.le(t2),
235                                            closure,
236                                            self.shutdown_probe.clone(),
237                                        )
238                                    } else {
239                                        build_halfjoin::<_, RowRowEnter<_, _, _>, _>(
240                                            update_stream,
241                                            trace.enter_region(region),
242                                            stream_key,
243                                            stream_thinning,
244                                            |t1, t2| t1.lt(t2),
245                                            closure,
246                                            self.shutdown_probe.clone(),
247                                        )
248                                    }
249                                }
250                            };
251                        update_stream = oks;
252                        region_errs.push(errs);
253                    }
254
255                    // Delay updates as appropriate.
256                    //
257                    // The `half_join` operator maintains a time that we now discard (the `_`),
258                    // and replace with the `time` that is maintained with the data. The former
259                    // exists to pin a consistent total order on updates throughout the process,
260                    // while allowing `time` to vary upwards as a result of actions on time.
261                    let mut update_stream = update_stream
262                        .inner
263                        .map(|((row, time), _, diff)| (row, time, diff))
264                        .as_collection();
265
266                    // We have completed the join building, but may have work remaining.
267                    // For example, we may have expressions not pushed down (e.g. literals)
268                    // and projections that could not be applied (e.g. column repetition).
269                    if let Some(final_closure) = final_closure {
270                        let name = "DeltaJoinFinalization";
271                        type CB<C> = ConsolidatingContainerBuilder<C>;
272                        let (updates, errors) = update_stream
273                            .flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>(name, {
274                                // Reuseable allocation for unpacking.
275                                let mut datums = DatumVec::new();
276                                move |row| {
277                                    let mut row_builder = SharedRow::get();
278                                    let temp_storage = RowArena::new();
279                                    let mut datums_local = datums.borrow_with(&row);
280                                    // TODO(mcsherry): re-use `row` allocation.
281                                    final_closure
282                                        .apply(&mut datums_local, &temp_storage, &mut row_builder)
283                                        .map(|row| row.cloned())
284                                        .map_err(DataflowError::from)
285                                        .transpose()
286                                }
287                            });
288
289                        update_stream = updates;
290                        region_errs.push(errors);
291                    }
292
293                    inner_errs.push(
294                        differential_dataflow::collection::concatenate(region, region_errs)
295                            .leave_region(),
296                    );
297                    update_stream.leave_region()
298                });
299
300                join_results.push(path_results);
301            }
302
303            // Concatenate the results of each delta query as the accumulated results.
304            (
305                differential_dataflow::collection::concatenate(inner, join_results).leave_region(),
306                differential_dataflow::collection::concatenate(inner, inner_errs).leave_region(),
307            )
308        });
309        CollectionBundle::from_collections(oks, errs)
310    }
311}
312
313/// A session with lifetime `'a` in a scope `G` with a container builder `CB`.
314///
315/// This is a shorthand primarily for the reson of readability.
316type SessionFor<'a, G, CB> = Session<
317    'a,
318    <G as ScopeParent>::Timestamp,
319    CB,
320    timely::dataflow::channels::pushers::Counter<
321        <G as ScopeParent>::Timestamp,
322        <CB as ContainerBuilder>::Container,
323        Tee<<G as ScopeParent>::Timestamp, <CB as ContainerBuilder>::Container>,
324    >,
325>;
326
327/// Constructs a `half_join` from supplied arguments.
328///
329/// This method exists to factor common logic from four code paths that are generic over the type of trace.
330/// The `comparison` function should either be `le` or `lt` depending on which relation comes first in the
331/// total order on relations (in order to break ties consistently).
332///
333/// The input and output streams are of pairs `(data, time)` where the `time` component can be greater than
334/// the time of the update. This operator may manipulate `time` as part of this pair, but will not manipulate
335/// the time of the update. This is crucial for correctness, as the total order on times of updates is used
336/// to ensure that any two updates are matched at most once.
337fn build_halfjoin<G, Tr, CF>(
338    updates: Collection<G, (Row, G::Timestamp), Diff>,
339    trace: Arranged<G, Tr>,
340    prev_key: Vec<MirScalarExpr>,
341    prev_thinning: Vec<usize>,
342    comparison: CF,
343    closure: JoinClosure,
344    shutdown_probe: ShutdownProbe,
345) -> (
346    Collection<G, (Row, G::Timestamp), Diff>,
347    Collection<G, DataflowError, Diff>,
348)
349where
350    G: Scope,
351    G::Timestamp: RenderTimestamp,
352    Tr: TraceReader<KeyOwn = Row, Time = G::Timestamp, Diff = Diff> + Clone + 'static,
353    for<'a> Tr::Val<'a>: ToDatumIter,
354    CF: Fn(Tr::TimeGat<'_>, &G::Timestamp) -> bool + 'static,
355{
356    let name = "DeltaJoinKeyPreparation";
357    type CB<C> = CapacityContainerBuilder<C>;
358    let (updates, errs) = updates.map_fallible::<CB<_>, CB<_>, _, _, _>(name, {
359        // Reuseable allocation for unpacking.
360        let mut datums = DatumVec::new();
361        move |(row, time)| {
362            let temp_storage = RowArena::new();
363            let datums_local = datums.borrow_with(&row);
364            let mut row_builder = SharedRow::get();
365            row_builder.packer().try_extend(
366                prev_key
367                    .iter()
368                    .map(|e| e.eval(&datums_local, &temp_storage)),
369            )?;
370            let key = row_builder.clone();
371            row_builder
372                .packer()
373                .extend(prev_thinning.iter().map(|&c| datums_local[c]));
374            let row_value = row_builder.clone();
375
376            Ok((key, row_value, time))
377        }
378    });
379    let mut datums = DatumVec::new();
380
381    if closure.could_error() {
382        let (oks, errs2) = differential_dogs3::operators::half_join::half_join_internal_unsafe(
383            &updates,
384            trace,
385            |time, antichain| {
386                antichain.insert(time.step_back());
387            },
388            comparison,
389            // TODO(mcsherry): investigate/establish trade-offs here; time based had problems,
390            // in that we seem to yield too much and do too little work when we do.
391            |_timer, count| count > 1_000_000,
392            // TODO(mcsherry): consider `RefOrMut` in `half_join` interface to allow re-use.
393            move |session: &mut SessionFor<G, CB<Vec<_>>>,
394                  key,
395                  stream_row,
396                  lookup_row,
397                  initial,
398                  diff1,
399                  output| {
400                // Check the shutdown token to avoid doing unnecessary work when the dataflow is
401                // shutting down.
402                if shutdown_probe.in_shutdown() || output.is_empty() {
403                    return;
404                }
405
406                let mut row_builder = SharedRow::get();
407                let temp_storage = RowArena::new();
408
409                let mut datums_local = datums.borrow();
410                datums_local.extend(key.iter());
411                datums_local.extend(stream_row.iter());
412                datums_local.extend(lookup_row.to_datum_iter());
413
414                let row = closure.apply(&mut datums_local, &temp_storage, &mut row_builder);
415
416                for (time, diff2) in output.drain(..) {
417                    let row = row.as_ref().map(|row| row.cloned()).map_err(Clone::clone);
418                    let diff = diff1.clone() * diff2.clone();
419                    let data = ((row, time.clone()), initial.clone(), diff);
420                    session.give(data);
421                }
422            },
423        )
424        .ok_err(|(data_time, init_time, diff)| {
425            // TODO(mcsherry): consider `ok_err()` for `Collection`.
426            match data_time {
427                (Ok(data), time) => Ok((data.map(|data| (data, time)), init_time, diff)),
428                (Err(err), _time) => Err((DataflowError::from(err), init_time, diff)),
429            }
430        });
431
432        (
433            oks.as_collection().flat_map(|x| x),
434            errs.concat(&errs2.as_collection()),
435        )
436    } else {
437        let oks = differential_dogs3::operators::half_join::half_join_internal_unsafe(
438            &updates,
439            trace,
440            |time, antichain| {
441                antichain.insert(time.step_back());
442            },
443            comparison,
444            // TODO(mcsherry): investigate/establish trade-offs here; time based had problems,
445            // in that we seem to yield too much and do too little work when we do.
446            |_timer, count| count > 1_000_000,
447            // TODO(mcsherry): consider `RefOrMut` in `half_join` interface to allow re-use.
448            move |session: &mut SessionFor<G, CB<Vec<_>>>,
449                  key,
450                  stream_row,
451                  lookup_row,
452                  initial,
453                  diff1,
454                  output| {
455                // Check the shutdown token to avoid doing unnecessary work when the dataflow is
456                // shutting down.
457                if shutdown_probe.in_shutdown() || output.is_empty() {
458                    return;
459                }
460
461                let mut row_builder = SharedRow::get();
462                let temp_storage = RowArena::new();
463
464                let mut datums_local = datums.borrow();
465                datums_local.extend(key.iter());
466                datums_local.extend(stream_row.iter());
467                datums_local.extend(lookup_row.to_datum_iter());
468
469                if let Some(row) = closure
470                    .apply(&mut datums_local, &temp_storage, &mut row_builder)
471                    .expect("Closure claimed to never error")
472                {
473                    for (time, diff2) in output.drain(..) {
474                        let diff = diff1.clone() * diff2.clone();
475                        session.give(((row.clone(), time.clone()), initial.clone(), diff));
476                    }
477                }
478            },
479        );
480
481        (oks.as_collection(), errs)
482    }
483}
484
485/// Builds the beginning of the update stream of a delta path.
486///
487/// At start-up time only the delta path for the first relation sees updates, since any updates fed to the
488/// other delta paths would be discarded anyway due to the tie-breaking logic that avoids double-counting
489/// updates happening at the same time on different relations.
490fn build_update_stream<G, Tr>(
491    trace: Arranged<G, Tr>,
492    as_of: Antichain<mz_repr::Timestamp>,
493    source_relation: usize,
494    initial_closure: JoinClosure,
495) -> (Collection<G, Row, Diff>, Collection<G, DataflowError, Diff>)
496where
497    G: Scope,
498    G::Timestamp: RenderTimestamp,
499    for<'a, 'b> &'a G::Timestamp: PartialEq<Tr::TimeGat<'b>>,
500    Tr: for<'a> TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
501    for<'a> Tr::Key<'a>: ToDatumIter,
502    for<'a> Tr::Val<'a>: ToDatumIter,
503{
504    let mut inner_as_of = Antichain::new();
505    for event_time in as_of.elements().iter() {
506        inner_as_of.insert(<G::Timestamp>::to_inner(event_time.clone()));
507    }
508
509    let (ok_stream, err_stream) =
510        trace
511            .stream
512            .unary_fallible(Pipeline, "UpdateStream", move |_, _| {
513                let mut datums = DatumVec::new();
514                Box::new(move |input, ok_output, err_output| {
515                    input.for_each(|time, data| {
516                        let mut row_builder = SharedRow::get();
517                        let mut ok_session = ok_output.session(&time);
518                        let mut err_session = err_output.session(&time);
519
520                        for wrapper in data.iter() {
521                            let batch = &wrapper;
522                            let mut cursor = batch.cursor();
523                            while let Some(key) = cursor.get_key(batch) {
524                                while let Some(val) = cursor.get_val(batch) {
525                                    cursor.map_times(batch, |time, diff| {
526                                        // note: only the delta path for the first relation will see
527                                        // updates at start-up time
528                                        if source_relation == 0
529                                            || inner_as_of.elements().iter().all(|e| e != time)
530                                        {
531                                            let time = Tr::owned_time(time);
532                                            let temp_storage = RowArena::new();
533
534                                            let mut datums_local = datums.borrow();
535                                            datums_local.extend(key.to_datum_iter());
536                                            datums_local.extend(val.to_datum_iter());
537
538                                            if !initial_closure.is_identity() {
539                                                match initial_closure
540                                                    .apply(
541                                                        &mut datums_local,
542                                                        &temp_storage,
543                                                        &mut row_builder,
544                                                    )
545                                                    .map(|row| row.cloned())
546                                                    .transpose()
547                                                {
548                                                    Some(Ok(row)) => ok_session.give((
549                                                        row,
550                                                        time,
551                                                        Tr::owned_diff(diff),
552                                                    )),
553                                                    Some(Err(err)) => err_session.give((
554                                                        err,
555                                                        time,
556                                                        Tr::owned_diff(diff),
557                                                    )),
558                                                    None => {}
559                                                }
560                                            } else {
561                                                let row = {
562                                                    row_builder.packer().extend(&*datums_local);
563                                                    row_builder.clone()
564                                                };
565                                                ok_session.give((row, time, Tr::owned_diff(diff)));
566                                            }
567                                        }
568                                    });
569                                    cursor.step_val(batch);
570                                }
571                                cursor.step_key(batch);
572                            }
573                        }
574                    });
575                })
576            });
577
578    (
579        ok_stream.as_collection(),
580        err_stream.as_collection().map(DataflowError::from),
581    )
582}