mz_compute/render/join/
delta_join.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Delta join execution dataflow construction.
11//!
12//! Consult [DeltaJoinPlan] documentation for details.
13
14#![allow(clippy::op_ref)]
15
16use std::collections::{BTreeMap, BTreeSet};
17
18use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
19use differential_dataflow::operators::arrange::Arranged;
20use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
21use differential_dataflow::{AsCollection, VecCollection};
22use mz_compute_types::plan::join::JoinClosure;
23use mz_compute_types::plan::join::delta_join::{DeltaJoinPlan, DeltaPathPlan, DeltaStagePlan};
24use mz_expr::MirScalarExpr;
25use mz_repr::fixed_length::ToDatumIter;
26use mz_repr::{DatumVec, Diff, Row, RowArena, SharedRow};
27use mz_storage_types::errors::DataflowError;
28use mz_timely_util::operator::{CollectionExt, StreamExt};
29use timely::container::CapacityContainerBuilder;
30use timely::dataflow::Scope;
31use timely::dataflow::channels::pact::Pipeline;
32use timely::dataflow::operators::generic::Session;
33use timely::dataflow::operators::{Map, OkErr};
34use timely::progress::Antichain;
35use timely::progress::timestamp::Refines;
36
37use crate::render::RenderTimestamp;
38use crate::render::context::{ArrangementFlavor, CollectionBundle, Context};
39use crate::typedefs::{RowRowAgent, RowRowEnter};
40
41impl<G> Context<G>
42where
43    G: Scope,
44    G::Timestamp: RenderTimestamp,
45{
46    /// Renders `MirRelationExpr:Join` using dogs^3 delta query dataflows.
47    ///
48    /// The join is followed by the application of `map_filter_project`, whose
49    /// implementation will be pushed in to the join pipeline if at all possible.
50    pub fn render_delta_join(
51        &self,
52        inputs: Vec<CollectionBundle<G>>,
53        join_plan: DeltaJoinPlan,
54    ) -> CollectionBundle<G> {
55        // We create a new region to contain the dataflow paths for the delta join.
56        let (oks, errs) = self.scope.clone().region_named("Join(Delta)", |inner| {
57            // Collects error streams for the ambient scope.
58            let mut inner_errs = Vec::new();
59
60            // Deduplicate the error streams of multiply used arrangements.
61            let mut err_dedup = BTreeSet::new();
62
63            // Our plan is to iterate through each input relation, and attempt
64            // to find a plan that maximally uses existing keys (better: uses
65            // existing arrangements, to which we have access).
66            let mut join_results = Vec::new();
67
68            // First let's prepare the input arrangements we will need.
69            // This reduces redundant imports, and simplifies the dataflow structure.
70            // As the arrangements are all shared, it should not dramatically improve
71            // the efficiency, but the dataflow simplification is worth doing.
72            let mut arrangements = BTreeMap::new();
73            for path_plan in join_plan.path_plans.iter() {
74                for stage_plan in path_plan.stage_plans.iter() {
75                    let lookup_idx = stage_plan.lookup_relation;
76                    let lookup_key = stage_plan.lookup_key.clone();
77                    arrangements
78                        .entry((lookup_idx, lookup_key.clone()))
79                        .or_insert_with(|| {
80                            match inputs[lookup_idx]
81                                .arrangement(&lookup_key)
82                                .unwrap_or_else(|| {
83                                    panic!(
84                                        "Arrangement alarmingly absent!: {}, {:?}",
85                                        lookup_idx, lookup_key,
86                                    )
87                                }) {
88                                ArrangementFlavor::Local(oks, errs) => {
89                                    if err_dedup.insert((lookup_idx, lookup_key)) {
90                                        inner_errs.push(
91                                            errs.enter_region(inner)
92                                                .as_collection(|k, _v| k.clone()),
93                                        );
94                                    }
95                                    Ok(oks.enter_region(inner))
96                                }
97                                ArrangementFlavor::Trace(_gid, oks, errs) => {
98                                    if err_dedup.insert((lookup_idx, lookup_key)) {
99                                        inner_errs.push(
100                                            errs.enter_region(inner)
101                                                .as_collection(|k, _v| k.clone()),
102                                        );
103                                    }
104                                    Err(oks.enter_region(inner))
105                                }
106                            }
107                        });
108                }
109            }
110
111            for path_plan in join_plan.path_plans {
112                // Deconstruct the stages of the path plan.
113                let DeltaPathPlan {
114                    source_relation,
115                    initial_closure,
116                    stage_plans,
117                    final_closure,
118                    source_key,
119                } = path_plan;
120
121                // This collection determines changes that result from updates inbound
122                // from `inputs[relation]` and reflects all strictly prior updates and
123                // concurrent updates from relations prior to `relation`.
124                let name = format!("delta path {}", source_relation);
125                let path_results = inner.clone().region_named(&name, |region| {
126                    // The plan is to move through each relation, starting from `relation` and in the order
127                    // indicated in `orders[relation]`. At each moment, we will have the columns from the
128                    // subset of relations encountered so far, and we will have applied as much as we can
129                    // of the filters in `equivalences` and the logic in `map_filter_project`, based on the
130                    // available columns.
131                    //
132                    // As we go, we will track the physical locations of each intended output column, as well
133                    // as the locations of intermediate results from partial application of `map_filter_project`.
134                    //
135                    // Just before we apply the `lookup` function to perform a join, we will first use our
136                    // available information to determine the filtering and logic that we can apply, and
137                    // introduce that in to the `lookup` logic to cause it to happen in that operator.
138
139                    // Collects error streams for the region scope. Concats before leaving.
140                    let mut region_errs = Vec::with_capacity(inputs.len());
141
142                    // Ensure this input is rendered, and extract its update stream.
143                    let val = arrangements
144                        .get(&(source_relation, source_key))
145                        .expect("Arrangement promised by the planner is absent!");
146                    let as_of = self.as_of_frontier.clone();
147                    let update_stream = match val {
148                        Ok(local) => {
149                            let arranged = local.enter_region(region);
150                            let (update_stream, err_stream) =
151                                build_update_stream::<_, RowRowAgent<_, _>>(
152                                    arranged,
153                                    as_of,
154                                    source_relation,
155                                    initial_closure,
156                                );
157                            region_errs.push(err_stream);
158                            update_stream
159                        }
160                        Err(trace) => {
161                            let arranged = trace.enter_region(region);
162                            let (update_stream, err_stream) =
163                                build_update_stream::<_, RowRowEnter<_, _, _>>(
164                                    arranged,
165                                    as_of,
166                                    source_relation,
167                                    initial_closure,
168                                );
169                            region_errs.push(err_stream);
170                            update_stream
171                        }
172                    };
173                    // Promote `time` to a datum element.
174                    //
175                    // The `half_join` operator manipulates as "data" a pair `(data, time)`,
176                    // while tracking the initial time `init_time` separately and without
177                    // modification. The initial value for both times is the initial time.
178                    let mut update_stream = update_stream
179                        .inner
180                        .map(|(v, t, d)| ((v, t.clone()), t, d))
181                        .as_collection();
182
183                    // Repeatedly update `update_stream` to reflect joins with more and more
184                    // other relations, in the specified order.
185                    for stage_plan in stage_plans {
186                        let DeltaStagePlan {
187                            lookup_relation,
188                            stream_key,
189                            stream_thinning,
190                            lookup_key,
191                            closure,
192                        } = stage_plan;
193
194                        // We require different logic based on the relative order of the two inputs.
195                        // If the `source` relation precedes the `lookup` relation, we present all
196                        // updates with less or equal `time`, and otherwise we present only updates
197                        // with strictly less `time`.
198                        //
199                        // We need to write the logic twice, as there are two types of arrangement
200                        // we might have: either dataflow-local or an imported trace.
201                        let (oks, errs) =
202                            match arrangements.get(&(lookup_relation, lookup_key)).unwrap() {
203                                Ok(local) => {
204                                    if source_relation < lookup_relation {
205                                        build_halfjoin::<_, RowRowAgent<_, _>, _>(
206                                            update_stream,
207                                            local.enter_region(region),
208                                            stream_key,
209                                            stream_thinning,
210                                            |t1, t2| t1.le(t2),
211                                            closure,
212                                        )
213                                    } else {
214                                        build_halfjoin::<_, RowRowAgent<_, _>, _>(
215                                            update_stream,
216                                            local.enter_region(region),
217                                            stream_key,
218                                            stream_thinning,
219                                            |t1, t2| t1.lt(t2),
220                                            closure,
221                                        )
222                                    }
223                                }
224                                Err(trace) => {
225                                    if source_relation < lookup_relation {
226                                        build_halfjoin::<_, RowRowEnter<_, _, _>, _>(
227                                            update_stream,
228                                            trace.enter_region(region),
229                                            stream_key,
230                                            stream_thinning,
231                                            |t1, t2| t1.le(t2),
232                                            closure,
233                                        )
234                                    } else {
235                                        build_halfjoin::<_, RowRowEnter<_, _, _>, _>(
236                                            update_stream,
237                                            trace.enter_region(region),
238                                            stream_key,
239                                            stream_thinning,
240                                            |t1, t2| t1.lt(t2),
241                                            closure,
242                                        )
243                                    }
244                                }
245                            };
246                        update_stream = oks;
247                        region_errs.push(errs);
248                    }
249
250                    // Delay updates as appropriate.
251                    //
252                    // The `half_join` operator maintains a time that we now discard (the `_`),
253                    // and replace with the `time` that is maintained with the data. The former
254                    // exists to pin a consistent total order on updates throughout the process,
255                    // while allowing `time` to vary upwards as a result of actions on time.
256                    let mut update_stream = update_stream
257                        .inner
258                        .map(|((row, time), _, diff)| (row, time, diff))
259                        .as_collection();
260
261                    // We have completed the join building, but may have work remaining.
262                    // For example, we may have expressions not pushed down (e.g. literals)
263                    // and projections that could not be applied (e.g. column repetition).
264                    if let Some(final_closure) = final_closure {
265                        let name = "DeltaJoinFinalization";
266                        type CB<C> = ConsolidatingContainerBuilder<C>;
267                        let (updates, errors) = update_stream
268                            .flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>(name, {
269                                // Reuseable allocation for unpacking.
270                                let mut datums = DatumVec::new();
271                                move |row| {
272                                    let mut row_builder = SharedRow::get();
273                                    let temp_storage = RowArena::new();
274                                    let mut datums_local = datums.borrow_with(&row);
275                                    // TODO(mcsherry): re-use `row` allocation.
276                                    final_closure
277                                        .apply(&mut datums_local, &temp_storage, &mut row_builder)
278                                        .map(|row| row.cloned())
279                                        .map_err(DataflowError::from)
280                                        .transpose()
281                                }
282                            });
283
284                        update_stream = updates;
285                        region_errs.push(errors);
286                    }
287
288                    inner_errs.push(
289                        differential_dataflow::collection::concatenate(region, region_errs)
290                            .leave_region(),
291                    );
292                    update_stream.leave_region()
293                });
294
295                join_results.push(path_results);
296            }
297
298            // Concatenate the results of each delta query as the accumulated results.
299            (
300                differential_dataflow::collection::concatenate(inner, join_results).leave_region(),
301                differential_dataflow::collection::concatenate(inner, inner_errs).leave_region(),
302            )
303        });
304        CollectionBundle::from_collections(oks, errs)
305    }
306}
307
308/// Constructs a `half_join` from supplied arguments.
309///
310/// This method exists to factor common logic from four code paths that are generic over the type of trace.
311/// The `comparison` function should either be `le` or `lt` depending on which relation comes first in the
312/// total order on relations (in order to break ties consistently).
313///
314/// The input and output streams are of pairs `(data, time)` where the `time` component can be greater than
315/// the time of the update. This operator may manipulate `time` as part of this pair, but will not manipulate
316/// the time of the update. This is crucial for correctness, as the total order on times of updates is used
317/// to ensure that any two updates are matched at most once.
318fn build_halfjoin<G, Tr, CF>(
319    updates: VecCollection<G, (Row, G::Timestamp), Diff>,
320    trace: Arranged<G, Tr>,
321    prev_key: Vec<MirScalarExpr>,
322    prev_thinning: Vec<usize>,
323    comparison: CF,
324    closure: JoinClosure,
325) -> (
326    VecCollection<G, (Row, G::Timestamp), Diff>,
327    VecCollection<G, DataflowError, Diff>,
328)
329where
330    G: Scope,
331    G::Timestamp: RenderTimestamp,
332    Tr: TraceReader<KeyOwn = Row, Time = G::Timestamp, Diff = Diff> + Clone + 'static,
333    for<'a> Tr::Val<'a>: ToDatumIter,
334    CF: Fn(Tr::TimeGat<'_>, &G::Timestamp) -> bool + 'static,
335{
336    let name = "DeltaJoinKeyPreparation";
337    type CB<C> = CapacityContainerBuilder<C>;
338    let (updates, errs) = updates.map_fallible::<CB<_>, CB<_>, _, _, _>(name, {
339        // Reuseable allocation for unpacking.
340        let mut datums = DatumVec::new();
341        move |(row, time)| {
342            let temp_storage = RowArena::new();
343            let datums_local = datums.borrow_with(&row);
344            let mut row_builder = SharedRow::get();
345            row_builder.packer().try_extend(
346                prev_key
347                    .iter()
348                    .map(|e| e.eval(&datums_local, &temp_storage)),
349            )?;
350            let key = row_builder.clone();
351            row_builder
352                .packer()
353                .extend(prev_thinning.iter().map(|&c| datums_local[c]));
354            let row_value = row_builder.clone();
355
356            Ok((key, row_value, time))
357        }
358    });
359    let mut datums = DatumVec::new();
360
361    if closure.could_error() {
362        let (oks, errs2) = differential_dogs3::operators::half_join::half_join_internal_unsafe(
363            &updates,
364            trace,
365            |time, antichain| {
366                antichain.insert(time.step_back());
367            },
368            comparison,
369            // TODO(mcsherry): investigate/establish trade-offs here; time based had problems,
370            // in that we seem to yield too much and do too little work when we do.
371            |_timer, count| count > 1_000_000,
372            // TODO(mcsherry): consider `RefOrMut` in `half_join` interface to allow re-use.
373            move |session: &mut Session<'_, '_, G::Timestamp, CB<Vec<_>>, _>,
374                  key,
375                  stream_row,
376                  lookup_row,
377                  initial,
378                  diff1,
379                  output| {
380                let mut row_builder = SharedRow::get();
381                let temp_storage = RowArena::new();
382
383                let mut datums_local = datums.borrow();
384                datums_local.extend(key.iter());
385                datums_local.extend(stream_row.iter());
386                datums_local.extend(lookup_row.to_datum_iter());
387
388                let row = closure.apply(&mut datums_local, &temp_storage, &mut row_builder);
389
390                for (time, diff2) in output.drain(..) {
391                    let row = row.as_ref().map(|row| row.cloned()).map_err(Clone::clone);
392                    let diff = diff1.clone() * diff2.clone();
393                    let data = ((row, time.clone()), initial.clone(), diff);
394                    session.give(data);
395                }
396            },
397        )
398        .ok_err(|(data_time, init_time, diff)| {
399            // TODO(mcsherry): consider `ok_err()` for `Collection`.
400            match data_time {
401                (Ok(data), time) => Ok((data.map(|data| (data, time)), init_time, diff)),
402                (Err(err), _time) => Err((DataflowError::from(err), init_time, diff)),
403            }
404        });
405
406        (
407            oks.as_collection().flat_map(|x| x),
408            errs.concat(&errs2.as_collection()),
409        )
410    } else {
411        let oks = differential_dogs3::operators::half_join::half_join_internal_unsafe(
412            &updates,
413            trace,
414            |time, antichain| {
415                antichain.insert(time.step_back());
416            },
417            comparison,
418            // TODO(mcsherry): investigate/establish trade-offs here; time based had problems,
419            // in that we seem to yield too much and do too little work when we do.
420            |_timer, count| count > 1_000_000,
421            // TODO(mcsherry): consider `RefOrMut` in `half_join` interface to allow re-use.
422            move |session: &mut Session<'_, '_, G::Timestamp, CB<Vec<_>>, _>,
423                  key,
424                  stream_row,
425                  lookup_row,
426                  initial,
427                  diff1,
428                  output| {
429                if output.is_empty() {
430                    return;
431                }
432
433                let mut row_builder = SharedRow::get();
434                let temp_storage = RowArena::new();
435
436                let mut datums_local = datums.borrow();
437                datums_local.extend(key.iter());
438                datums_local.extend(stream_row.iter());
439                datums_local.extend(lookup_row.to_datum_iter());
440
441                if let Some(row) = closure
442                    .apply(&mut datums_local, &temp_storage, &mut row_builder)
443                    .expect("Closure claimed to never error")
444                {
445                    for (time, diff2) in output.drain(..) {
446                        let diff = diff1.clone() * diff2.clone();
447                        session.give(((row.clone(), time.clone()), initial.clone(), diff));
448                    }
449                }
450            },
451        );
452
453        (oks.as_collection(), errs)
454    }
455}
456
457/// Builds the beginning of the update stream of a delta path.
458///
459/// At start-up time only the delta path for the first relation sees updates, since any updates fed to the
460/// other delta paths would be discarded anyway due to the tie-breaking logic that avoids double-counting
461/// updates happening at the same time on different relations.
462fn build_update_stream<G, Tr>(
463    trace: Arranged<G, Tr>,
464    as_of: Antichain<mz_repr::Timestamp>,
465    source_relation: usize,
466    initial_closure: JoinClosure,
467) -> (
468    VecCollection<G, Row, Diff>,
469    VecCollection<G, DataflowError, Diff>,
470)
471where
472    G: Scope,
473    G::Timestamp: RenderTimestamp,
474    for<'a, 'b> &'a G::Timestamp: PartialEq<Tr::TimeGat<'b>>,
475    Tr: for<'a> TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
476    for<'a> Tr::Key<'a>: ToDatumIter,
477    for<'a> Tr::Val<'a>: ToDatumIter,
478{
479    let mut inner_as_of = Antichain::new();
480    for event_time in as_of.elements().iter() {
481        inner_as_of.insert(<G::Timestamp>::to_inner(event_time.clone()));
482    }
483
484    let (ok_stream, err_stream) =
485        trace
486            .stream
487            .unary_fallible(Pipeline, "UpdateStream", move |_, _| {
488                let mut datums = DatumVec::new();
489                Box::new(move |input, ok_output, err_output| {
490                    input.for_each(|time, data| {
491                        let mut row_builder = SharedRow::get();
492                        let mut ok_session = ok_output.session(&time);
493                        let mut err_session = err_output.session(&time);
494
495                        for wrapper in data.iter() {
496                            let batch = &wrapper;
497                            let mut cursor = batch.cursor();
498                            while let Some(key) = cursor.get_key(batch) {
499                                while let Some(val) = cursor.get_val(batch) {
500                                    cursor.map_times(batch, |time, diff| {
501                                        // note: only the delta path for the first relation will see
502                                        // updates at start-up time
503                                        if source_relation == 0
504                                            || inner_as_of.elements().iter().all(|e| e != time)
505                                        {
506                                            let time = Tr::owned_time(time);
507                                            let temp_storage = RowArena::new();
508
509                                            let mut datums_local = datums.borrow();
510                                            datums_local.extend(key.to_datum_iter());
511                                            datums_local.extend(val.to_datum_iter());
512
513                                            if !initial_closure.is_identity() {
514                                                match initial_closure
515                                                    .apply(
516                                                        &mut datums_local,
517                                                        &temp_storage,
518                                                        &mut row_builder,
519                                                    )
520                                                    .map(|row| row.cloned())
521                                                    .transpose()
522                                                {
523                                                    Some(Ok(row)) => ok_session.give((
524                                                        row,
525                                                        time,
526                                                        Tr::owned_diff(diff),
527                                                    )),
528                                                    Some(Err(err)) => err_session.give((
529                                                        err,
530                                                        time,
531                                                        Tr::owned_diff(diff),
532                                                    )),
533                                                    None => {}
534                                                }
535                                            } else {
536                                                let row = {
537                                                    row_builder.packer().extend(&*datums_local);
538                                                    row_builder.clone()
539                                                };
540                                                ok_session.give((row, time, Tr::owned_diff(diff)));
541                                            }
542                                        }
543                                    });
544                                    cursor.step_val(batch);
545                                }
546                                cursor.step_key(batch);
547                            }
548                        }
549                    });
550                })
551            });
552
553    (
554        ok_stream.as_collection(),
555        err_stream.as_collection().map(DataflowError::from),
556    )
557}