mz_compute/render/join/
delta_join.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Delta join execution dataflow construction.
11//!
12//! Consult [DeltaJoinPlan] documentation for details.
13
14#![allow(clippy::op_ref)]
15
16use std::collections::{BTreeMap, BTreeSet};
17
18use columnar::Columnar;
19use differential_dataflow::IntoOwned;
20use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
21use differential_dataflow::operators::arrange::Arranged;
22use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
23use differential_dataflow::{AsCollection, Collection};
24use mz_compute_types::plan::join::JoinClosure;
25use mz_compute_types::plan::join::delta_join::{DeltaJoinPlan, DeltaPathPlan, DeltaStagePlan};
26use mz_expr::MirScalarExpr;
27use mz_repr::fixed_length::ToDatumIter;
28use mz_repr::{DatumVec, Diff, Row, RowArena, SharedRow};
29use mz_storage_types::errors::DataflowError;
30use mz_timely_util::operator::{CollectionExt, StreamExt};
31use timely::container::CapacityContainerBuilder;
32use timely::dataflow::Scope;
33use timely::dataflow::channels::pact::Pipeline;
34use timely::dataflow::operators::{Map, OkErr};
35use timely::progress::Antichain;
36use timely::progress::timestamp::Refines;
37
38use crate::render::RenderTimestamp;
39use crate::render::context::{ArrangementFlavor, CollectionBundle, Context, ShutdownToken};
40use crate::typedefs::{RowRowAgent, RowRowEnter};
41
42impl<G> Context<G>
43where
44    G: Scope,
45    G::Timestamp: RenderTimestamp,
46    <G::Timestamp as Columnar>::Container: Clone + Send,
47    for<'a> <G::Timestamp as Columnar>::Ref<'a>: Ord + Copy,
48{
49    /// Renders `MirRelationExpr:Join` using dogs^3 delta query dataflows.
50    ///
51    /// The join is followed by the application of `map_filter_project`, whose
52    /// implementation will be pushed in to the join pipeline if at all possible.
53    pub fn render_delta_join(
54        &self,
55        inputs: Vec<CollectionBundle<G>>,
56        join_plan: DeltaJoinPlan,
57    ) -> CollectionBundle<G> {
58        // We create a new region to contain the dataflow paths for the delta join.
59        let (oks, errs) = self.scope.clone().region_named("Join(Delta)", |inner| {
60            // Collects error streams for the ambient scope.
61            let mut inner_errs = Vec::new();
62
63            // Deduplicate the error streams of multiply used arrangements.
64            let mut err_dedup = BTreeSet::new();
65
66            // Our plan is to iterate through each input relation, and attempt
67            // to find a plan that maximally uses existing keys (better: uses
68            // existing arrangements, to which we have access).
69            let mut join_results = Vec::new();
70
71            // First let's prepare the input arrangements we will need.
72            // This reduces redundant imports, and simplifies the dataflow structure.
73            // As the arrangements are all shared, it should not dramatically improve
74            // the efficiency, but the dataflow simplification is worth doing.
75            let mut arrangements = BTreeMap::new();
76            for path_plan in join_plan.path_plans.iter() {
77                for stage_plan in path_plan.stage_plans.iter() {
78                    let lookup_idx = stage_plan.lookup_relation;
79                    let lookup_key = stage_plan.lookup_key.clone();
80                    arrangements
81                        .entry((lookup_idx, lookup_key.clone()))
82                        .or_insert_with(|| {
83                            match inputs[lookup_idx]
84                                .arrangement(&lookup_key)
85                                .unwrap_or_else(|| {
86                                    panic!(
87                                        "Arrangement alarmingly absent!: {}, {:?}",
88                                        lookup_idx, lookup_key,
89                                    )
90                                }) {
91                                ArrangementFlavor::Local(oks, errs) => {
92                                    if err_dedup.insert((lookup_idx, lookup_key)) {
93                                        inner_errs.push(
94                                            errs.enter_region(inner)
95                                                .as_collection(|k, _v| k.clone()),
96                                        );
97                                    }
98                                    Ok(oks.enter_region(inner))
99                                }
100                                ArrangementFlavor::Trace(_gid, oks, errs) => {
101                                    if err_dedup.insert((lookup_idx, lookup_key)) {
102                                        inner_errs.push(
103                                            errs.enter_region(inner)
104                                                .as_collection(|k, _v| k.clone()),
105                                        );
106                                    }
107                                    Err(oks.enter_region(inner))
108                                }
109                            }
110                        });
111                }
112            }
113
114            for path_plan in join_plan.path_plans {
115                // Deconstruct the stages of the path plan.
116                let DeltaPathPlan {
117                    source_relation,
118                    initial_closure,
119                    stage_plans,
120                    final_closure,
121                    source_key,
122                } = path_plan;
123
124                // This collection determines changes that result from updates inbound
125                // from `inputs[relation]` and reflects all strictly prior updates and
126                // concurrent updates from relations prior to `relation`.
127                let name = format!("delta path {}", source_relation);
128                let path_results = inner.clone().region_named(&name, |region| {
129                    // The plan is to move through each relation, starting from `relation` and in the order
130                    // indicated in `orders[relation]`. At each moment, we will have the columns from the
131                    // subset of relations encountered so far, and we will have applied as much as we can
132                    // of the filters in `equivalences` and the logic in `map_filter_project`, based on the
133                    // available columns.
134                    //
135                    // As we go, we will track the physical locations of each intended output column, as well
136                    // as the locations of intermediate results from partial application of `map_filter_project`.
137                    //
138                    // Just before we apply the `lookup` function to perform a join, we will first use our
139                    // available information to determine the filtering and logic that we can apply, and
140                    // introduce that in to the `lookup` logic to cause it to happen in that operator.
141
142                    // Collects error streams for the region scope. Concats before leaving.
143                    let mut region_errs = Vec::with_capacity(inputs.len());
144
145                    // Ensure this input is rendered, and extract its update stream.
146                    let val = arrangements
147                        .get(&(source_relation, source_key))
148                        .expect("Arrangement promised by the planner is absent!");
149                    let as_of = self.as_of_frontier.clone();
150                    let update_stream = match val {
151                        Ok(local) => {
152                            let arranged = local.enter_region(region);
153                            let (update_stream, err_stream) =
154                                build_update_stream::<_, RowRowAgent<_, _>>(
155                                    arranged,
156                                    as_of,
157                                    source_relation,
158                                    initial_closure,
159                                );
160                            region_errs.push(err_stream);
161                            update_stream
162                        }
163                        Err(trace) => {
164                            let arranged = trace.enter_region(region);
165                            let (update_stream, err_stream) =
166                                build_update_stream::<_, RowRowEnter<_, _, _>>(
167                                    arranged,
168                                    as_of,
169                                    source_relation,
170                                    initial_closure,
171                                );
172                            region_errs.push(err_stream);
173                            update_stream
174                        }
175                    };
176                    // Promote `time` to a datum element.
177                    //
178                    // The `half_join` operator manipulates as "data" a pair `(data, time)`,
179                    // while tracking the initial time `init_time` separately and without
180                    // modification. The initial value for both times is the initial time.
181                    let mut update_stream = update_stream
182                        .inner
183                        .map(|(v, t, d)| ((v, t.clone()), t, d))
184                        .as_collection();
185
186                    // Repeatedly update `update_stream` to reflect joins with more and more
187                    // other relations, in the specified order.
188                    for stage_plan in stage_plans {
189                        let DeltaStagePlan {
190                            lookup_relation,
191                            stream_key,
192                            stream_thinning,
193                            lookup_key,
194                            closure,
195                        } = stage_plan;
196
197                        // We require different logic based on the relative order of the two inputs.
198                        // If the `source` relation precedes the `lookup` relation, we present all
199                        // updates with less or equal `time`, and otherwise we present only updates
200                        // with strictly less `time`.
201                        //
202                        // We need to write the logic twice, as there are two types of arrangement
203                        // we might have: either dataflow-local or an imported trace.
204                        let (oks, errs) =
205                            match arrangements.get(&(lookup_relation, lookup_key)).unwrap() {
206                                Ok(local) => {
207                                    if source_relation < lookup_relation {
208                                        build_halfjoin::<_, RowRowAgent<_, _>, _>(
209                                            update_stream,
210                                            local.enter_region(region),
211                                            stream_key,
212                                            stream_thinning,
213                                            |t1, t2| t1.le(t2),
214                                            closure,
215                                            self.shutdown_token.clone(),
216                                        )
217                                    } else {
218                                        build_halfjoin::<_, RowRowAgent<_, _>, _>(
219                                            update_stream,
220                                            local.enter_region(region),
221                                            stream_key,
222                                            stream_thinning,
223                                            |t1, t2| t1.lt(t2),
224                                            closure,
225                                            self.shutdown_token.clone(),
226                                        )
227                                    }
228                                }
229                                Err(trace) => {
230                                    if source_relation < lookup_relation {
231                                        build_halfjoin::<_, RowRowEnter<_, _, _>, _>(
232                                            update_stream,
233                                            trace.enter_region(region),
234                                            stream_key,
235                                            stream_thinning,
236                                            |t1, t2| t1.le(t2),
237                                            closure,
238                                            self.shutdown_token.clone(),
239                                        )
240                                    } else {
241                                        build_halfjoin::<_, RowRowEnter<_, _, _>, _>(
242                                            update_stream,
243                                            trace.enter_region(region),
244                                            stream_key,
245                                            stream_thinning,
246                                            |t1, t2| t1.lt(t2),
247                                            closure,
248                                            self.shutdown_token.clone(),
249                                        )
250                                    }
251                                }
252                            };
253                        update_stream = oks;
254                        region_errs.push(errs);
255                    }
256
257                    // Delay updates as appropriate.
258                    //
259                    // The `half_join` operator maintains a time that we now discard (the `_`),
260                    // and replace with the `time` that is maintained with the data. The former
261                    // exists to pin a consistent total order on updates throughout the process,
262                    // while allowing `time` to vary upwards as a result of actions on time.
263                    let mut update_stream = update_stream
264                        .inner
265                        .map(|((row, time), _, diff)| (row, time, diff))
266                        .as_collection();
267
268                    // We have completed the join building, but may have work remaining.
269                    // For example, we may have expressions not pushed down (e.g. literals)
270                    // and projections that could not be applied (e.g. column repetition).
271                    if let Some(final_closure) = final_closure {
272                        let name = "DeltaJoinFinalization";
273                        type CB<C> = ConsolidatingContainerBuilder<C>;
274                        let (updates, errors) = update_stream
275                            .flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>(name, {
276                                // Reuseable allocation for unpacking.
277                                let mut datums = DatumVec::new();
278                                move |row| {
279                                    let binding = SharedRow::get();
280                                    let mut row_builder = binding.borrow_mut();
281                                    let temp_storage = RowArena::new();
282                                    let mut datums_local = datums.borrow_with(&row);
283                                    // TODO(mcsherry): re-use `row` allocation.
284                                    final_closure
285                                        .apply(&mut datums_local, &temp_storage, &mut row_builder)
286                                        .map(|row| row.cloned())
287                                        .map_err(DataflowError::from)
288                                        .transpose()
289                                }
290                            });
291
292                        update_stream = updates;
293                        region_errs.push(errors);
294                    }
295
296                    inner_errs.push(
297                        differential_dataflow::collection::concatenate(region, region_errs)
298                            .leave_region(),
299                    );
300                    update_stream.leave_region()
301                });
302
303                join_results.push(path_results);
304            }
305
306            // Concatenate the results of each delta query as the accumulated results.
307            (
308                differential_dataflow::collection::concatenate(inner, join_results).leave_region(),
309                differential_dataflow::collection::concatenate(inner, inner_errs).leave_region(),
310            )
311        });
312        CollectionBundle::from_collections(oks, errs)
313    }
314}
315
316/// Constructs a `half_join` from supplied arguments.
317///
318/// This method exists to factor common logic from four code paths that are generic over the type of trace.
319/// The `comparison` function should either be `le` or `lt` depending on which relation comes first in the
320/// total order on relations (in order to break ties consistently).
321///
322/// The input and output streams are of pairs `(data, time)` where the `time` component can be greater than
323/// the time of the update. This operator may manipulate `time` as part of this pair, but will not manipulate
324/// the time of the update. This is crucial for correctness, as the total order on times of updates is used
325/// to ensure that any two updates are matched at most once.
326fn build_halfjoin<G, Tr, CF>(
327    updates: Collection<G, (Row, G::Timestamp), Diff>,
328    trace: Arranged<G, Tr>,
329    prev_key: Vec<MirScalarExpr>,
330    prev_thinning: Vec<usize>,
331    comparison: CF,
332    closure: JoinClosure,
333    shutdown_token: ShutdownToken,
334) -> (
335    Collection<G, (Row, G::Timestamp), Diff>,
336    Collection<G, DataflowError, Diff>,
337)
338where
339    G: Scope,
340    G::Timestamp: RenderTimestamp,
341    <G::Timestamp as Columnar>::Container: Clone + Send,
342    for<'a> <G::Timestamp as Columnar>::Ref<'a>: Ord + Copy,
343    Tr: TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
344    for<'a> Tr::Key<'a>: IntoOwned<'a, Owned = Row>,
345    for<'a> Tr::Val<'a>: ToDatumIter,
346    CF: Fn(Tr::TimeGat<'_>, &G::Timestamp) -> bool + 'static,
347{
348    let name = "DeltaJoinKeyPreparation";
349    type CB<C> = CapacityContainerBuilder<C>;
350    let (updates, errs) = updates.map_fallible::<CB<_>, CB<_>, _, _, _>(name, {
351        // Reuseable allocation for unpacking.
352        let mut datums = DatumVec::new();
353        move |(row, time)| {
354            let temp_storage = RowArena::new();
355            let datums_local = datums.borrow_with(&row);
356            let binding = SharedRow::get();
357            let mut row_builder = binding.borrow_mut();
358            row_builder.packer().try_extend(
359                prev_key
360                    .iter()
361                    .map(|e| e.eval(&datums_local, &temp_storage)),
362            )?;
363            let key = row_builder.clone();
364            row_builder
365                .packer()
366                .extend(prev_thinning.iter().map(|&c| datums_local[c]));
367            let row_value = row_builder.clone();
368
369            Ok((key, row_value, time))
370        }
371    });
372    let mut datums = DatumVec::new();
373
374    if closure.could_error() {
375        let (oks, errs2) = differential_dogs3::operators::half_join::half_join_internal_unsafe(
376            &updates,
377            trace,
378            |time, antichain| {
379                antichain.insert(time.step_back());
380            },
381            comparison,
382            // TODO(mcsherry): investigate/establish trade-offs here; time based had problems,
383            // in that we seem to yield too much and do too little work when we do.
384            |_timer, count| count > 1_000_000,
385            // TODO(mcsherry): consider `RefOrMut` in `half_join` interface to allow re-use.
386            move |key, stream_row, lookup_row, initial, time, diff1, diff2| {
387                // Check the shutdown token to avoid doing unnecessary work when the dataflow is
388                // shutting down.
389                shutdown_token.probe()?;
390
391                let binding = SharedRow::get();
392                let mut row_builder = binding.borrow_mut();
393                let temp_storage = RowArena::new();
394
395                let mut datums_local = datums.borrow();
396                datums_local.extend(key.iter());
397                datums_local.extend(stream_row.iter());
398                datums_local.extend(lookup_row.to_datum_iter());
399
400                let row = closure
401                    .apply(&mut datums_local, &temp_storage, &mut row_builder)
402                    .map(|row| row.cloned());
403                let diff = diff1.clone() * diff2.clone();
404                let dout = (row, time.clone());
405                Some((dout, initial.clone(), diff))
406            },
407        )
408        .inner
409        .ok_err(|(data_time, init_time, diff)| {
410            // TODO(mcsherry): consider `ok_err()` for `Collection`.
411            match data_time {
412                (Ok(data), time) => Ok((data.map(|data| (data, time)), init_time, diff)),
413                (Err(err), _time) => Err((DataflowError::from(err), init_time, diff)),
414            }
415        });
416
417        (
418            oks.as_collection().flat_map(|x| x),
419            errs.concat(&errs2.as_collection()),
420        )
421    } else {
422        let oks = differential_dogs3::operators::half_join::half_join_internal_unsafe(
423            &updates,
424            trace,
425            |time, antichain| {
426                antichain.insert(time.step_back());
427            },
428            comparison,
429            // TODO(mcsherry): investigate/establish trade-offs here; time based had problems,
430            // in that we seem to yield too much and do too little work when we do.
431            |_timer, count| count > 1_000_000,
432            // TODO(mcsherry): consider `RefOrMut` in `half_join` interface to allow re-use.
433            move |key, stream_row, lookup_row, initial, time, diff1, diff2| {
434                // Check the shutdown token to avoid doing unnecessary work when the dataflow is
435                // shutting down.
436                shutdown_token.probe()?;
437
438                let binding = SharedRow::get();
439                let mut row_builder = binding.borrow_mut();
440                let temp_storage = RowArena::new();
441
442                let mut datums_local = datums.borrow();
443                datums_local.extend(key.iter());
444                datums_local.extend(stream_row.iter());
445                datums_local.extend(lookup_row.to_datum_iter());
446
447                let row = closure
448                    .apply(&mut datums_local, &temp_storage, &mut row_builder)
449                    .map(|row| row.cloned())
450                    .expect("Closure claimed to never errer");
451                let diff = diff1.clone() * diff2.clone();
452                row.map(|r| ((r, time.clone()), initial.clone(), diff))
453            },
454        );
455
456        (oks, errs)
457    }
458}
459
460/// Builds the beginning of the update stream of a delta path.
461///
462/// At start-up time only the delta path for the first relation sees updates, since any updates fed to the
463/// other delta paths would be discarded anyway due to the tie-breaking logic that avoids double-counting
464/// updates happening at the same time on different relations.
465fn build_update_stream<G, Tr>(
466    trace: Arranged<G, Tr>,
467    as_of: Antichain<mz_repr::Timestamp>,
468    source_relation: usize,
469    initial_closure: JoinClosure,
470) -> (Collection<G, Row, Diff>, Collection<G, DataflowError, Diff>)
471where
472    G: Scope,
473    G::Timestamp: RenderTimestamp,
474    <G::Timestamp as Columnar>::Container: Clone + Send,
475    for<'a> <G::Timestamp as Columnar>::Ref<'a>: Ord + Copy,
476    for<'a, 'b> &'a G::Timestamp: PartialEq<Tr::TimeGat<'b>>,
477    Tr: for<'a> TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
478    for<'a> Tr::Key<'a>: ToDatumIter,
479    for<'a> Tr::Val<'a>: ToDatumIter,
480{
481    let mut inner_as_of = Antichain::new();
482    for event_time in as_of.elements().iter() {
483        inner_as_of.insert(<G::Timestamp>::to_inner(event_time.clone()));
484    }
485
486    let (ok_stream, err_stream) =
487        trace
488            .stream
489            .unary_fallible(Pipeline, "UpdateStream", move |_, _| {
490                let mut datums = DatumVec::new();
491                Box::new(move |input, ok_output, err_output| {
492                    input.for_each(|time, data| {
493                        let binding = SharedRow::get();
494                        let mut row_builder = binding.borrow_mut();
495                        let mut ok_session = ok_output.session(&time);
496                        let mut err_session = err_output.session(&time);
497
498                        for wrapper in data.iter() {
499                            let batch = &wrapper;
500                            let mut cursor = batch.cursor();
501                            while let Some(key) = cursor.get_key(batch) {
502                                while let Some(val) = cursor.get_val(batch) {
503                                    cursor.map_times(batch, |time, diff| {
504                                        // note: only the delta path for the first relation will see
505                                        // updates at start-up time
506                                        if source_relation == 0
507                                            || inner_as_of.elements().iter().all(|e| e != time)
508                                        {
509                                            let time = time.into_owned();
510                                            let temp_storage = RowArena::new();
511
512                                            let mut datums_local = datums.borrow();
513                                            datums_local.extend(key.to_datum_iter());
514                                            datums_local.extend(val.to_datum_iter());
515
516                                            if !initial_closure.is_identity() {
517                                                match initial_closure
518                                                    .apply(
519                                                        &mut datums_local,
520                                                        &temp_storage,
521                                                        &mut row_builder,
522                                                    )
523                                                    .map(|row| row.cloned())
524                                                    .transpose()
525                                                {
526                                                    Some(Ok(row)) => ok_session.give((
527                                                        row,
528                                                        time,
529                                                        diff.into_owned(),
530                                                    )),
531                                                    Some(Err(err)) => err_session.give((
532                                                        err,
533                                                        time,
534                                                        diff.into_owned(),
535                                                    )),
536                                                    None => {}
537                                                }
538                                            } else {
539                                                let row = {
540                                                    row_builder.packer().extend(&*datums_local);
541                                                    row_builder.clone()
542                                                };
543                                                ok_session.give((row, time, diff.into_owned()));
544                                            }
545                                        }
546                                    });
547                                    cursor.step_val(batch);
548                                }
549                                cursor.step_key(batch);
550                            }
551                        }
552                    });
553                })
554            });
555
556    (
557        ok_stream.as_collection(),
558        err_stream.as_collection().map(DataflowError::from),
559    )
560}