mz_compute/render/join/
linear_join.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Rendering of linear join plans.
11//!
12//! Consult [LinearJoinPlan] documentation for details.
13
14use std::time::{Duration, Instant};
15
16use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
17use differential_dataflow::lattice::Lattice;
18use differential_dataflow::operators::arrange::arrangement::Arranged;
19use differential_dataflow::trace::TraceReader;
20use differential_dataflow::{AsCollection, Collection, Data};
21use mz_compute_types::dyncfgs::{ENABLE_MZ_JOIN_CORE, LINEAR_JOIN_YIELDING};
22use mz_compute_types::plan::join::JoinClosure;
23use mz_compute_types::plan::join::linear_join::{LinearJoinPlan, LinearStagePlan};
24use mz_dyncfg::ConfigSet;
25use mz_repr::fixed_length::ToDatumIter;
26use mz_repr::{DatumVec, Diff, Row, RowArena, SharedRow};
27use mz_storage_types::errors::DataflowError;
28use mz_timely_util::columnar::builder::ColumnBuilder;
29use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
30use mz_timely_util::operator::{CollectionExt, StreamExt};
31use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
32use timely::dataflow::operators::OkErr;
33use timely::dataflow::scopes::Child;
34use timely::dataflow::{Scope, ScopeParent};
35use timely::progress::timestamp::Refines;
36
37use crate::extensions::arrange::MzArrangeCore;
38use crate::render::RenderTimestamp;
39use crate::render::context::{ArrangementFlavor, CollectionBundle, Context, ShutdownProbe};
40use crate::render::join::mz_join_core::mz_join_core;
41use crate::row_spine::{RowRowBuilder, RowRowSpine};
42use crate::typedefs::{MzTimestamp, RowRowAgent, RowRowEnter};
43
44/// Available linear join implementations.
45///
46/// See the `mz_join_core` module docs for our rationale for providing two join implementations.
47#[derive(Clone, Copy)]
48enum LinearJoinImpl {
49    Materialize,
50    DifferentialDataflow,
51}
52
53/// Specification of how linear joins are to be executed.
54///
55/// Note that currently `yielding` only affects the `Materialize` join implementation, as the DD
56/// join doesn't allow configuring its yielding behavior. Merging [#390] would fix this.
57///
58/// [#390]: https://github.com/TimelyDataflow/differential-dataflow/pull/390
59#[derive(Clone, Copy)]
60pub struct LinearJoinSpec {
61    implementation: LinearJoinImpl,
62    yielding: YieldSpec,
63}
64
65impl Default for LinearJoinSpec {
66    fn default() -> Self {
67        Self {
68            implementation: LinearJoinImpl::Materialize,
69            yielding: Default::default(),
70        }
71    }
72}
73
74impl LinearJoinSpec {
75    /// Create a `LinearJoinSpec` based on the given config.
76    pub fn from_config(config: &ConfigSet) -> Self {
77        let implementation = if ENABLE_MZ_JOIN_CORE.get(config) {
78            LinearJoinImpl::Materialize
79        } else {
80            LinearJoinImpl::DifferentialDataflow
81        };
82
83        let yielding_raw = LINEAR_JOIN_YIELDING.get(config);
84        let yielding = YieldSpec::try_from_str(&yielding_raw).unwrap_or_else(|| {
85            tracing::error!("invalid LINEAR_JOIN_YIELDING config: {yielding_raw}");
86            YieldSpec::default()
87        });
88
89        Self {
90            implementation,
91            yielding,
92        }
93    }
94
95    /// Render a join operator according to this specification.
96    fn render<G, Tr1, Tr2, L, I>(
97        &self,
98        arranged1: &Arranged<G, Tr1>,
99        arranged2: &Arranged<G, Tr2>,
100        shutdown_probe: ShutdownProbe,
101        result: L,
102    ) -> Collection<G, I::Item, Diff>
103    where
104        G: Scope,
105        G::Timestamp: Lattice,
106        Tr1: TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
107        Tr2: for<'a> TraceReader<Key<'a> = Tr1::Key<'a>, Time = G::Timestamp, Diff = Diff>
108            + Clone
109            + 'static,
110        L: FnMut(Tr1::Key<'_>, Tr1::Val<'_>, Tr2::Val<'_>) -> I + 'static,
111        I: IntoIterator<Item: Data> + 'static,
112    {
113        use LinearJoinImpl::*;
114
115        match (
116            self.implementation,
117            self.yielding.after_work,
118            self.yielding.after_time,
119        ) {
120            (DifferentialDataflow, _, _) => arranged1.join_core(arranged2, result),
121            (Materialize, Some(work_limit), Some(time_limit)) => {
122                let yield_fn =
123                    move |start: Instant, work| work >= work_limit || start.elapsed() >= time_limit;
124                mz_join_core(arranged1, arranged2, shutdown_probe, result, yield_fn).as_collection()
125            }
126            (Materialize, Some(work_limit), None) => {
127                let yield_fn = move |_start, work| work >= work_limit;
128                mz_join_core(arranged1, arranged2, shutdown_probe, result, yield_fn).as_collection()
129            }
130            (Materialize, None, Some(time_limit)) => {
131                let yield_fn = move |start: Instant, _work| start.elapsed() >= time_limit;
132                mz_join_core(arranged1, arranged2, shutdown_probe, result, yield_fn).as_collection()
133            }
134            (Materialize, None, None) => {
135                let yield_fn = |_start, _work| false;
136                mz_join_core(arranged1, arranged2, shutdown_probe, result, yield_fn).as_collection()
137            }
138        }
139    }
140}
141
142/// Specification of a dataflow operator's yielding behavior.
143#[derive(Clone, Copy)]
144struct YieldSpec {
145    /// Yield after the given amount of work was performed.
146    after_work: Option<usize>,
147    /// Yield after the given amount of time has elapsed.
148    after_time: Option<Duration>,
149}
150
151impl Default for YieldSpec {
152    fn default() -> Self {
153        Self {
154            after_work: Some(1_000_000),
155            after_time: Some(Duration::from_millis(100)),
156        }
157    }
158}
159
160impl YieldSpec {
161    fn try_from_str(s: &str) -> Option<Self> {
162        let mut after_work = None;
163        let mut after_time = None;
164
165        let options = s.split(',').map(|o| o.trim());
166        for option in options {
167            let mut iter = option.split(':').map(|p| p.trim());
168            match std::array::from_fn(|_| iter.next()) {
169                [Some("work"), Some(amount), None] => {
170                    let amount = amount.parse().ok()?;
171                    after_work = Some(amount);
172                }
173                [Some("time"), Some(millis), None] => {
174                    let millis = millis.parse().ok()?;
175                    let duration = Duration::from_millis(millis);
176                    after_time = Some(duration);
177                }
178                _ => return None,
179            }
180        }
181
182        Some(Self {
183            after_work,
184            after_time,
185        })
186    }
187}
188
189/// Different forms the streamed data might take.
190enum JoinedFlavor<G, T>
191where
192    G: Scope,
193    G::Timestamp: Refines<T> + MzTimestamp,
194    T: MzTimestamp,
195{
196    /// Streamed data as a collection.
197    Collection(Collection<G, Row, Diff>),
198    /// A dataflow-local arrangement.
199    Local(Arranged<G, RowRowAgent<G::Timestamp, Diff>>),
200    /// An imported arrangement.
201    Trace(Arranged<G, RowRowEnter<T, Diff, G::Timestamp>>),
202}
203
204impl<G, T> Context<G, T>
205where
206    G: Scope,
207    G::Timestamp: Lattice + Refines<T> + RenderTimestamp,
208    T: MzTimestamp,
209{
210    pub(crate) fn render_join(
211        &self,
212        inputs: Vec<CollectionBundle<G, T>>,
213        linear_plan: LinearJoinPlan,
214    ) -> CollectionBundle<G, T> {
215        self.scope.clone().region_named("Join(Linear)", |inner| {
216            self.render_join_inner(inputs, linear_plan, inner)
217        })
218    }
219
220    fn render_join_inner(
221        &self,
222        inputs: Vec<CollectionBundle<G, T>>,
223        linear_plan: LinearJoinPlan,
224        inner: &mut Child<G, <G as ScopeParent>::Timestamp>,
225    ) -> CollectionBundle<G, T> {
226        // Collect all error streams, and concatenate them at the end.
227        let mut errors = Vec::new();
228
229        // Determine which form our maintained spine of updates will initially take.
230        // First, just check out the availability of an appropriate arrangement.
231        // This will be `None` in the degenerate single-input join case, which ensures
232        // that we do not panic if we never go around the `stage_plans` loop.
233        let arrangement = linear_plan
234            .stage_plans
235            .get(0)
236            .and_then(|stage| inputs[linear_plan.source_relation].arrangement(&stage.stream_key));
237        // We can use an arrangement if it exists and an initial closure does not.
238        let mut joined = match (arrangement, linear_plan.initial_closure) {
239            (Some(ArrangementFlavor::Local(oks, errs)), None) => {
240                errors.push(errs.as_collection(|k, _v| k.clone()).enter_region(inner));
241                JoinedFlavor::Local(oks.enter_region(inner))
242            }
243            (Some(ArrangementFlavor::Trace(_gid, oks, errs)), None) => {
244                errors.push(errs.as_collection(|k, _v| k.clone()).enter_region(inner));
245                JoinedFlavor::Trace(oks.enter_region(inner))
246            }
247            (_, initial_closure) => {
248                // TODO: extract closure from the first stage in the join plan, should it exist.
249                // TODO: apply that closure in `flat_map_ref` rather than calling `.collection`.
250                let (joined, errs) = inputs[linear_plan.source_relation]
251                    .as_specific_collection(linear_plan.source_key.as_deref(), &self.config_set);
252                errors.push(errs.enter_region(inner));
253                let mut joined = joined.enter_region(inner);
254
255                // In the current code this should always be `None`, but we have this here should
256                // we change that and want to know what we should be doing.
257                if let Some(closure) = initial_closure {
258                    // If there is no starting arrangement, then we can run filters
259                    // directly on the starting collection.
260                    // If there is only one input, we are done joining, so run filters
261                    let name = "LinearJoinInitialization";
262                    type CB<C> = ConsolidatingContainerBuilder<C>;
263                    let (j, errs) = joined.flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>(name, {
264                        // Reuseable allocation for unpacking.
265                        let mut datums = DatumVec::new();
266                        move |row| {
267                            let mut row_builder = SharedRow::get();
268                            let temp_storage = RowArena::new();
269                            let mut datums_local = datums.borrow_with(&row);
270                            // TODO(mcsherry): re-use `row` allocation.
271                            closure
272                                .apply(&mut datums_local, &temp_storage, &mut row_builder)
273                                .map(|row| row.cloned())
274                                .map_err(DataflowError::from)
275                                .transpose()
276                        }
277                    });
278                    joined = j;
279                    errors.push(errs);
280                }
281
282                JoinedFlavor::Collection(joined)
283            }
284        };
285
286        // progress through stages, updating partial results and errors.
287        for stage_plan in linear_plan.stage_plans.into_iter() {
288            // Different variants of `joined` implement this differently,
289            // and the logic is centralized there.
290            let stream = self.differential_join(
291                joined,
292                inputs[stage_plan.lookup_relation].enter_region(inner),
293                stage_plan,
294                &mut errors,
295            );
296            // Update joined results and capture any errors.
297            joined = JoinedFlavor::Collection(stream);
298        }
299
300        // We have completed the join building, but may have work remaining.
301        // For example, we may have expressions not pushed down (e.g. literals)
302        // and projections that could not be applied (e.g. column repetition).
303        let bundle = if let JoinedFlavor::Collection(mut joined) = joined {
304            if let Some(closure) = linear_plan.final_closure {
305                let name = "LinearJoinFinalization";
306                type CB<C> = ConsolidatingContainerBuilder<C>;
307                let (updates, errs) = joined.flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>(name, {
308                    // Reuseable allocation for unpacking.
309                    let mut datums = DatumVec::new();
310                    move |row| {
311                        let mut row_builder = SharedRow::get();
312                        let temp_storage = RowArena::new();
313                        let mut datums_local = datums.borrow_with(&row);
314                        // TODO(mcsherry): re-use `row` allocation.
315                        closure
316                            .apply(&mut datums_local, &temp_storage, &mut row_builder)
317                            .map(|row| row.cloned())
318                            .map_err(DataflowError::from)
319                            .transpose()
320                    }
321                });
322
323                joined = updates;
324                errors.push(errs);
325            }
326
327            // Return joined results and all produced errors collected together.
328            CollectionBundle::from_collections(
329                joined,
330                differential_dataflow::collection::concatenate(inner, errors),
331            )
332        } else {
333            panic!("Unexpectedly arranged join output");
334        };
335        bundle.leave_region()
336    }
337
338    /// Looks up the arrangement for the next input and joins it to the arranged
339    /// version of the join of previous inputs.
340    fn differential_join<S>(
341        &self,
342        mut joined: JoinedFlavor<S, T>,
343        lookup_relation: CollectionBundle<S, T>,
344        LinearStagePlan {
345            stream_key,
346            stream_thinning,
347            lookup_key,
348            closure,
349            lookup_relation: _,
350        }: LinearStagePlan,
351        errors: &mut Vec<Collection<S, DataflowError, Diff>>,
352    ) -> Collection<S, Row, Diff>
353    where
354        S: Scope<Timestamp = G::Timestamp>,
355    {
356        // If we have only a streamed collection, we must first form an arrangement.
357        if let JoinedFlavor::Collection(stream) = joined {
358            let name = "LinearJoinKeyPreparation";
359            let (keyed, errs) = stream
360                .inner
361                .unary_fallible::<ColumnBuilder<((Row, Row), S::Timestamp, Diff)>, _, _, _>(
362                    Pipeline,
363                    name,
364                    |_, _| {
365                        Box::new(move |input, ok, errs| {
366                            let mut temp_storage = RowArena::new();
367                            let mut key_buf = Row::default();
368                            let mut val_buf = Row::default();
369                            let mut datums = DatumVec::new();
370                            while let Some((time, data)) = input.next() {
371                                let mut ok_session = ok.session_with_builder(&time);
372                                let mut err_session = errs.session(&time);
373                                for (row, time, diff) in data.iter() {
374                                    temp_storage.clear();
375                                    let datums_local = datums.borrow_with(row);
376                                    let datums = stream_key
377                                        .iter()
378                                        .map(|e| e.eval(&datums_local, &temp_storage));
379                                    let result = key_buf.packer().try_extend(datums);
380                                    match result {
381                                        Ok(()) => {
382                                            val_buf.packer().extend(
383                                                stream_thinning.iter().map(|e| datums_local[*e]),
384                                            );
385                                            ok_session.give(((&key_buf, &val_buf), time, diff));
386                                        }
387                                        Err(e) => {
388                                            err_session.give((e.into(), time.clone(), *diff));
389                                        }
390                                    }
391                                }
392                            }
393                        })
394                    },
395                );
396
397            errors.push(errs.as_collection());
398
399            let arranged = keyed
400                .mz_arrange_core::<_, Col2ValBatcher<_, _,_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
401                    ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<Row, Row, S::Timestamp, Diff>),"JoinStage"
402                );
403            joined = JoinedFlavor::Local(arranged);
404        }
405
406        // Demultiplex the four different cross products of arrangement types we might have.
407        let arrangement = lookup_relation
408            .arrangement(&lookup_key[..])
409            .expect("Arrangement absent despite explicit construction");
410
411        match joined {
412            JoinedFlavor::Collection(_) => {
413                unreachable!("JoinedFlavor::Collection variant avoided at top of method");
414            }
415            JoinedFlavor::Local(local) => match arrangement {
416                ArrangementFlavor::Local(oks, errs1) => {
417                    let (oks, errs2) = self
418                        .differential_join_inner::<_, RowRowAgent<_, _>, RowRowAgent<_, _>>(
419                            local, oks, closure,
420                        );
421
422                    errors.push(errs1.as_collection(|k, _v| k.clone()));
423                    errors.extend(errs2);
424                    oks
425                }
426                ArrangementFlavor::Trace(_gid, oks, errs1) => {
427                    let (oks, errs2) = self
428                        .differential_join_inner::<_, RowRowAgent<_, _>, RowRowEnter<_, _, _>>(
429                            local, oks, closure,
430                        );
431
432                    errors.push(errs1.as_collection(|k, _v| k.clone()));
433                    errors.extend(errs2);
434                    oks
435                }
436            },
437            JoinedFlavor::Trace(trace) => match arrangement {
438                ArrangementFlavor::Local(oks, errs1) => {
439                    let (oks, errs2) = self
440                        .differential_join_inner::<_, RowRowEnter<_, _, _>, RowRowAgent<_, _>>(
441                            trace, oks, closure,
442                        );
443
444                    errors.push(errs1.as_collection(|k, _v| k.clone()));
445                    errors.extend(errs2);
446                    oks
447                }
448                ArrangementFlavor::Trace(_gid, oks, errs1) => {
449                    let (oks, errs2) = self
450                        .differential_join_inner::<_, RowRowEnter<_, _, _>, RowRowEnter<_, _, _>>(
451                            trace, oks, closure,
452                        );
453
454                    errors.push(errs1.as_collection(|k, _v| k.clone()));
455                    errors.extend(errs2);
456                    oks
457                }
458            },
459        }
460    }
461
462    /// Joins the arrangement for `next_input` to the arranged version of the
463    /// join of previous inputs. This is split into its own method to enable
464    /// reuse of code with different types of `next_input`.
465    ///
466    /// The return type includes an optional error collection, which may be
467    /// `None` if we can determine that `closure` cannot error.
468    fn differential_join_inner<S, Tr1, Tr2>(
469        &self,
470        prev_keyed: Arranged<S, Tr1>,
471        next_input: Arranged<S, Tr2>,
472        closure: JoinClosure,
473    ) -> (
474        Collection<S, Row, Diff>,
475        Option<Collection<S, DataflowError, Diff>>,
476    )
477    where
478        S: Scope<Timestamp = G::Timestamp>,
479        Tr1: TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
480        Tr2: for<'a> TraceReader<Key<'a> = Tr1::Key<'a>, Time = G::Timestamp, Diff = Diff>
481            + Clone
482            + 'static,
483        for<'a> Tr1::Key<'a>: ToDatumIter,
484        for<'a> Tr1::Val<'a>: ToDatumIter,
485        for<'a> Tr2::Val<'a>: ToDatumIter,
486    {
487        // Reuseable allocation for unpacking.
488        let mut datums = DatumVec::new();
489
490        if closure.could_error() {
491            let (oks, err) = self
492                .linear_join_spec
493                .render(
494                    &prev_keyed,
495                    &next_input,
496                    self.shutdown_probe.clone(),
497                    move |key, old, new| {
498                        let mut row_builder = SharedRow::get();
499                        let temp_storage = RowArena::new();
500
501                        let mut datums_local = datums.borrow();
502                        datums_local.extend(key.to_datum_iter());
503                        datums_local.extend(old.to_datum_iter());
504                        datums_local.extend(new.to_datum_iter());
505
506                        closure
507                            .apply(&mut datums_local, &temp_storage, &mut row_builder)
508                            .map(|row| row.cloned())
509                            .map_err(DataflowError::from)
510                            .transpose()
511                    },
512                )
513                .inner
514                .ok_err(|(x, t, d)| {
515                    // TODO(mcsherry): consider `ok_err()` for `Collection`.
516                    match x {
517                        Ok(x) => Ok((x, t, d)),
518                        Err(x) => Err((x, t, d)),
519                    }
520                });
521
522            (oks.as_collection(), Some(err.as_collection()))
523        } else {
524            let oks = self.linear_join_spec.render(
525                &prev_keyed,
526                &next_input,
527                self.shutdown_probe.clone(),
528                move |key, old, new| {
529                    let mut row_builder = SharedRow::get();
530                    let temp_storage = RowArena::new();
531
532                    let mut datums_local = datums.borrow();
533                    datums_local.extend(key.to_datum_iter());
534                    datums_local.extend(old.to_datum_iter());
535                    datums_local.extend(new.to_datum_iter());
536
537                    closure
538                        .apply(&mut datums_local, &temp_storage, &mut row_builder)
539                        .expect("Closure claimed to never error")
540                        .cloned()
541                },
542            );
543
544            (oks, None)
545        }
546    }
547}