mz_compute/render/join/
linear_join.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Rendering of linear join plans.
11//!
12//! Consult [LinearJoinPlan] documentation for details.
13
14use std::time::{Duration, Instant};
15
16use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
17use differential_dataflow::lattice::Lattice;
18use differential_dataflow::operators::arrange::arrangement::Arranged;
19use differential_dataflow::trace::TraceReader;
20use differential_dataflow::{AsCollection, Data, VecCollection};
21use mz_compute_types::dyncfgs::{ENABLE_MZ_JOIN_CORE, LINEAR_JOIN_YIELDING};
22use mz_compute_types::plan::join::JoinClosure;
23use mz_compute_types::plan::join::linear_join::{LinearJoinPlan, LinearStagePlan};
24use mz_dyncfg::ConfigSet;
25use mz_repr::fixed_length::ToDatumIter;
26use mz_repr::{DatumVec, Diff, Row, RowArena, SharedRow};
27use mz_storage_types::errors::DataflowError;
28use mz_timely_util::columnar::builder::ColumnBuilder;
29use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
30use mz_timely_util::operator::{CollectionExt, StreamExt};
31use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
32use timely::dataflow::operators::OkErr;
33use timely::dataflow::scopes::Child;
34use timely::dataflow::{Scope, ScopeParent};
35use timely::progress::timestamp::Refines;
36
37use crate::extensions::arrange::MzArrangeCore;
38use crate::render::RenderTimestamp;
39use crate::render::context::{ArrangementFlavor, CollectionBundle, Context};
40use crate::render::join::mz_join_core::mz_join_core;
41use crate::row_spine::{RowRowBuilder, RowRowSpine};
42use crate::typedefs::{MzTimestamp, RowRowAgent, RowRowEnter};
43
44/// Available linear join implementations.
45///
46/// See the `mz_join_core` module docs for our rationale for providing two join implementations.
47#[derive(Clone, Copy)]
48enum LinearJoinImpl {
49    Materialize,
50    DifferentialDataflow,
51}
52
53/// Specification of how linear joins are to be executed.
54///
55/// Note that currently `yielding` only affects the `Materialize` join implementation, as the DD
56/// join doesn't allow configuring its yielding behavior. Merging [#390] would fix this.
57///
58/// [#390]: https://github.com/TimelyDataflow/differential-dataflow/pull/390
59#[derive(Clone, Copy)]
60pub struct LinearJoinSpec {
61    implementation: LinearJoinImpl,
62    yielding: YieldSpec,
63}
64
65impl Default for LinearJoinSpec {
66    fn default() -> Self {
67        Self {
68            implementation: LinearJoinImpl::Materialize,
69            yielding: Default::default(),
70        }
71    }
72}
73
74impl LinearJoinSpec {
75    /// Create a `LinearJoinSpec` based on the given config.
76    pub fn from_config(config: &ConfigSet) -> Self {
77        let implementation = if ENABLE_MZ_JOIN_CORE.get(config) {
78            LinearJoinImpl::Materialize
79        } else {
80            LinearJoinImpl::DifferentialDataflow
81        };
82
83        let yielding_raw = LINEAR_JOIN_YIELDING.get(config);
84        let yielding = YieldSpec::try_from_str(&yielding_raw).unwrap_or_else(|| {
85            tracing::error!("invalid LINEAR_JOIN_YIELDING config: {yielding_raw}");
86            YieldSpec::default()
87        });
88
89        Self {
90            implementation,
91            yielding,
92        }
93    }
94
95    /// Render a join operator according to this specification.
96    fn render<G, Tr1, Tr2, L, I>(
97        &self,
98        arranged1: &Arranged<G, Tr1>,
99        arranged2: &Arranged<G, Tr2>,
100        result: L,
101    ) -> VecCollection<G, I::Item, Diff>
102    where
103        G: Scope,
104        G::Timestamp: Lattice,
105        Tr1: TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
106        Tr2: for<'a> TraceReader<Key<'a> = Tr1::Key<'a>, Time = G::Timestamp, Diff = Diff>
107            + Clone
108            + 'static,
109        L: FnMut(Tr1::Key<'_>, Tr1::Val<'_>, Tr2::Val<'_>) -> I + 'static,
110        I: IntoIterator<Item: Data> + 'static,
111    {
112        use LinearJoinImpl::*;
113
114        match (
115            self.implementation,
116            self.yielding.after_work,
117            self.yielding.after_time,
118        ) {
119            (DifferentialDataflow, _, _) => arranged1.join_core(arranged2, result),
120            (Materialize, Some(work_limit), Some(time_limit)) => {
121                let yield_fn =
122                    move |start: Instant, work| work >= work_limit || start.elapsed() >= time_limit;
123                mz_join_core(arranged1, arranged2, result, yield_fn).as_collection()
124            }
125            (Materialize, Some(work_limit), None) => {
126                let yield_fn = move |_start, work| work >= work_limit;
127                mz_join_core(arranged1, arranged2, result, yield_fn).as_collection()
128            }
129            (Materialize, None, Some(time_limit)) => {
130                let yield_fn = move |start: Instant, _work| start.elapsed() >= time_limit;
131                mz_join_core(arranged1, arranged2, result, yield_fn).as_collection()
132            }
133            (Materialize, None, None) => {
134                let yield_fn = |_start, _work| false;
135                mz_join_core(arranged1, arranged2, result, yield_fn).as_collection()
136            }
137        }
138    }
139}
140
141/// Specification of a dataflow operator's yielding behavior.
142#[derive(Clone, Copy)]
143struct YieldSpec {
144    /// Yield after the given amount of work was performed.
145    after_work: Option<usize>,
146    /// Yield after the given amount of time has elapsed.
147    after_time: Option<Duration>,
148}
149
150impl Default for YieldSpec {
151    fn default() -> Self {
152        Self {
153            after_work: Some(1_000_000),
154            after_time: Some(Duration::from_millis(100)),
155        }
156    }
157}
158
159impl YieldSpec {
160    fn try_from_str(s: &str) -> Option<Self> {
161        let mut after_work = None;
162        let mut after_time = None;
163
164        let options = s.split(',').map(|o| o.trim());
165        for option in options {
166            let mut iter = option.split(':').map(|p| p.trim());
167            match std::array::from_fn(|_| iter.next()) {
168                [Some("work"), Some(amount), None] => {
169                    let amount = amount.parse().ok()?;
170                    after_work = Some(amount);
171                }
172                [Some("time"), Some(millis), None] => {
173                    let millis = millis.parse().ok()?;
174                    let duration = Duration::from_millis(millis);
175                    after_time = Some(duration);
176                }
177                _ => return None,
178            }
179        }
180
181        Some(Self {
182            after_work,
183            after_time,
184        })
185    }
186}
187
188/// Different forms the streamed data might take.
189enum JoinedFlavor<G, T>
190where
191    G: Scope,
192    G::Timestamp: Refines<T> + MzTimestamp,
193    T: MzTimestamp,
194{
195    /// Streamed data as a collection.
196    Collection(VecCollection<G, Row, Diff>),
197    /// A dataflow-local arrangement.
198    Local(Arranged<G, RowRowAgent<G::Timestamp, Diff>>),
199    /// An imported arrangement.
200    Trace(Arranged<G, RowRowEnter<T, Diff, G::Timestamp>>),
201}
202
203impl<G, T> Context<G, T>
204where
205    G: Scope,
206    G::Timestamp: Lattice + Refines<T> + RenderTimestamp,
207    T: MzTimestamp,
208{
209    pub(crate) fn render_join(
210        &self,
211        inputs: Vec<CollectionBundle<G, T>>,
212        linear_plan: LinearJoinPlan,
213    ) -> CollectionBundle<G, T> {
214        self.scope.clone().region_named("Join(Linear)", |inner| {
215            self.render_join_inner(inputs, linear_plan, inner)
216        })
217    }
218
219    fn render_join_inner(
220        &self,
221        inputs: Vec<CollectionBundle<G, T>>,
222        linear_plan: LinearJoinPlan,
223        inner: &mut Child<G, <G as ScopeParent>::Timestamp>,
224    ) -> CollectionBundle<G, T> {
225        // Collect all error streams, and concatenate them at the end.
226        let mut errors = Vec::new();
227
228        // Determine which form our maintained spine of updates will initially take.
229        // First, just check out the availability of an appropriate arrangement.
230        // This will be `None` in the degenerate single-input join case, which ensures
231        // that we do not panic if we never go around the `stage_plans` loop.
232        let arrangement = linear_plan
233            .stage_plans
234            .get(0)
235            .and_then(|stage| inputs[linear_plan.source_relation].arrangement(&stage.stream_key));
236        // We can use an arrangement if it exists and an initial closure does not.
237        let mut joined = match (arrangement, linear_plan.initial_closure) {
238            (Some(ArrangementFlavor::Local(oks, errs)), None) => {
239                errors.push(errs.as_collection(|k, _v| k.clone()).enter_region(inner));
240                JoinedFlavor::Local(oks.enter_region(inner))
241            }
242            (Some(ArrangementFlavor::Trace(_gid, oks, errs)), None) => {
243                errors.push(errs.as_collection(|k, _v| k.clone()).enter_region(inner));
244                JoinedFlavor::Trace(oks.enter_region(inner))
245            }
246            (_, initial_closure) => {
247                // TODO: extract closure from the first stage in the join plan, should it exist.
248                // TODO: apply that closure in `flat_map_ref` rather than calling `.collection`.
249                let (joined, errs) = inputs[linear_plan.source_relation]
250                    .as_specific_collection(linear_plan.source_key.as_deref(), &self.config_set);
251                errors.push(errs.enter_region(inner));
252                let mut joined = joined.enter_region(inner);
253
254                // In the current code this should always be `None`, but we have this here should
255                // we change that and want to know what we should be doing.
256                if let Some(closure) = initial_closure {
257                    // If there is no starting arrangement, then we can run filters
258                    // directly on the starting collection.
259                    // If there is only one input, we are done joining, so run filters
260                    let name = "LinearJoinInitialization";
261                    type CB<C> = ConsolidatingContainerBuilder<C>;
262                    let (j, errs) = joined.flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>(name, {
263                        // Reuseable allocation for unpacking.
264                        let mut datums = DatumVec::new();
265                        move |row| {
266                            let mut row_builder = SharedRow::get();
267                            let temp_storage = RowArena::new();
268                            let mut datums_local = datums.borrow_with(&row);
269                            // TODO(mcsherry): re-use `row` allocation.
270                            closure
271                                .apply(&mut datums_local, &temp_storage, &mut row_builder)
272                                .map(|row| row.cloned())
273                                .map_err(DataflowError::from)
274                                .transpose()
275                        }
276                    });
277                    joined = j;
278                    errors.push(errs);
279                }
280
281                JoinedFlavor::Collection(joined)
282            }
283        };
284
285        // progress through stages, updating partial results and errors.
286        for stage_plan in linear_plan.stage_plans.into_iter() {
287            // Different variants of `joined` implement this differently,
288            // and the logic is centralized there.
289            let stream = self.differential_join(
290                joined,
291                inputs[stage_plan.lookup_relation].enter_region(inner),
292                stage_plan,
293                &mut errors,
294            );
295            // Update joined results and capture any errors.
296            joined = JoinedFlavor::Collection(stream);
297        }
298
299        // We have completed the join building, but may have work remaining.
300        // For example, we may have expressions not pushed down (e.g. literals)
301        // and projections that could not be applied (e.g. column repetition).
302        let bundle = if let JoinedFlavor::Collection(mut joined) = joined {
303            if let Some(closure) = linear_plan.final_closure {
304                let name = "LinearJoinFinalization";
305                type CB<C> = ConsolidatingContainerBuilder<C>;
306                let (updates, errs) = joined.flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>(name, {
307                    // Reuseable allocation for unpacking.
308                    let mut datums = DatumVec::new();
309                    move |row| {
310                        let mut row_builder = SharedRow::get();
311                        let temp_storage = RowArena::new();
312                        let mut datums_local = datums.borrow_with(&row);
313                        // TODO(mcsherry): re-use `row` allocation.
314                        closure
315                            .apply(&mut datums_local, &temp_storage, &mut row_builder)
316                            .map(|row| row.cloned())
317                            .map_err(DataflowError::from)
318                            .transpose()
319                    }
320                });
321
322                joined = updates;
323                errors.push(errs);
324            }
325
326            // Return joined results and all produced errors collected together.
327            CollectionBundle::from_collections(
328                joined,
329                differential_dataflow::collection::concatenate(inner, errors),
330            )
331        } else {
332            panic!("Unexpectedly arranged join output");
333        };
334        bundle.leave_region()
335    }
336
337    /// Looks up the arrangement for the next input and joins it to the arranged
338    /// version of the join of previous inputs.
339    fn differential_join<S>(
340        &self,
341        mut joined: JoinedFlavor<S, T>,
342        lookup_relation: CollectionBundle<S, T>,
343        LinearStagePlan {
344            stream_key,
345            stream_thinning,
346            lookup_key,
347            closure,
348            lookup_relation: _,
349        }: LinearStagePlan,
350        errors: &mut Vec<VecCollection<S, DataflowError, Diff>>,
351    ) -> VecCollection<S, Row, Diff>
352    where
353        S: Scope<Timestamp = G::Timestamp>,
354    {
355        // If we have only a streamed collection, we must first form an arrangement.
356        if let JoinedFlavor::Collection(stream) = joined {
357            let name = "LinearJoinKeyPreparation";
358            let (keyed, errs) = stream
359                .inner
360                .unary_fallible::<ColumnBuilder<((Row, Row), S::Timestamp, Diff)>, _, _, _>(
361                    Pipeline,
362                    name,
363                    |_, _| {
364                        Box::new(move |input, ok, errs| {
365                            let mut temp_storage = RowArena::new();
366                            let mut key_buf = Row::default();
367                            let mut val_buf = Row::default();
368                            let mut datums = DatumVec::new();
369                            while let Some((time, data)) = input.next() {
370                                let mut ok_session = ok.session_with_builder(&time);
371                                let mut err_session = errs.session(&time);
372                                for (row, time, diff) in data.iter() {
373                                    temp_storage.clear();
374                                    let datums_local = datums.borrow_with(row);
375                                    let datums = stream_key
376                                        .iter()
377                                        .map(|e| e.eval(&datums_local, &temp_storage));
378                                    let result = key_buf.packer().try_extend(datums);
379                                    match result {
380                                        Ok(()) => {
381                                            val_buf.packer().extend(
382                                                stream_thinning.iter().map(|e| datums_local[*e]),
383                                            );
384                                            ok_session.give(((&key_buf, &val_buf), time, diff));
385                                        }
386                                        Err(e) => {
387                                            err_session.give((e.into(), time.clone(), *diff));
388                                        }
389                                    }
390                                }
391                            }
392                        })
393                    },
394                );
395
396            errors.push(errs.as_collection());
397
398            let arranged = keyed
399                .mz_arrange_core::<_, Col2ValBatcher<_, _,_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
400                    ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<Row, Row, S::Timestamp, Diff>),"JoinStage"
401                );
402            joined = JoinedFlavor::Local(arranged);
403        }
404
405        // Demultiplex the four different cross products of arrangement types we might have.
406        let arrangement = lookup_relation
407            .arrangement(&lookup_key[..])
408            .expect("Arrangement absent despite explicit construction");
409
410        match joined {
411            JoinedFlavor::Collection(_) => {
412                unreachable!("JoinedFlavor::VecCollection variant avoided at top of method");
413            }
414            JoinedFlavor::Local(local) => match arrangement {
415                ArrangementFlavor::Local(oks, errs1) => {
416                    let (oks, errs2) = self
417                        .differential_join_inner::<_, RowRowAgent<_, _>, RowRowAgent<_, _>>(
418                            local, oks, closure,
419                        );
420
421                    errors.push(errs1.as_collection(|k, _v| k.clone()));
422                    errors.extend(errs2);
423                    oks
424                }
425                ArrangementFlavor::Trace(_gid, oks, errs1) => {
426                    let (oks, errs2) = self
427                        .differential_join_inner::<_, RowRowAgent<_, _>, RowRowEnter<_, _, _>>(
428                            local, oks, closure,
429                        );
430
431                    errors.push(errs1.as_collection(|k, _v| k.clone()));
432                    errors.extend(errs2);
433                    oks
434                }
435            },
436            JoinedFlavor::Trace(trace) => match arrangement {
437                ArrangementFlavor::Local(oks, errs1) => {
438                    let (oks, errs2) = self
439                        .differential_join_inner::<_, RowRowEnter<_, _, _>, RowRowAgent<_, _>>(
440                            trace, oks, closure,
441                        );
442
443                    errors.push(errs1.as_collection(|k, _v| k.clone()));
444                    errors.extend(errs2);
445                    oks
446                }
447                ArrangementFlavor::Trace(_gid, oks, errs1) => {
448                    let (oks, errs2) = self
449                        .differential_join_inner::<_, RowRowEnter<_, _, _>, RowRowEnter<_, _, _>>(
450                            trace, oks, closure,
451                        );
452
453                    errors.push(errs1.as_collection(|k, _v| k.clone()));
454                    errors.extend(errs2);
455                    oks
456                }
457            },
458        }
459    }
460
461    /// Joins the arrangement for `next_input` to the arranged version of the
462    /// join of previous inputs. This is split into its own method to enable
463    /// reuse of code with different types of `next_input`.
464    ///
465    /// The return type includes an optional error collection, which may be
466    /// `None` if we can determine that `closure` cannot error.
467    fn differential_join_inner<S, Tr1, Tr2>(
468        &self,
469        prev_keyed: Arranged<S, Tr1>,
470        next_input: Arranged<S, Tr2>,
471        closure: JoinClosure,
472    ) -> (
473        VecCollection<S, Row, Diff>,
474        Option<VecCollection<S, DataflowError, Diff>>,
475    )
476    where
477        S: Scope<Timestamp = G::Timestamp>,
478        Tr1: TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
479        Tr2: for<'a> TraceReader<Key<'a> = Tr1::Key<'a>, Time = G::Timestamp, Diff = Diff>
480            + Clone
481            + 'static,
482        for<'a> Tr1::Key<'a>: ToDatumIter,
483        for<'a> Tr1::Val<'a>: ToDatumIter,
484        for<'a> Tr2::Val<'a>: ToDatumIter,
485    {
486        // Reuseable allocation for unpacking.
487        let mut datums = DatumVec::new();
488
489        if closure.could_error() {
490            let (oks, err) = self
491                .linear_join_spec
492                .render(&prev_keyed, &next_input, move |key, old, new| {
493                    let mut row_builder = SharedRow::get();
494                    let temp_storage = RowArena::new();
495
496                    let mut datums_local = datums.borrow();
497                    datums_local.extend(key.to_datum_iter());
498                    datums_local.extend(old.to_datum_iter());
499                    datums_local.extend(new.to_datum_iter());
500
501                    closure
502                        .apply(&mut datums_local, &temp_storage, &mut row_builder)
503                        .map(|row| row.cloned())
504                        .map_err(DataflowError::from)
505                        .transpose()
506                })
507                .inner
508                .ok_err(|(x, t, d)| {
509                    // TODO(mcsherry): consider `ok_err()` for `Collection`.
510                    match x {
511                        Ok(x) => Ok((x, t, d)),
512                        Err(x) => Err((x, t, d)),
513                    }
514                });
515
516            (oks.as_collection(), Some(err.as_collection()))
517        } else {
518            let oks =
519                self.linear_join_spec
520                    .render(&prev_keyed, &next_input, move |key, old, new| {
521                        let mut row_builder = SharedRow::get();
522                        let temp_storage = RowArena::new();
523
524                        let mut datums_local = datums.borrow();
525                        datums_local.extend(key.to_datum_iter());
526                        datums_local.extend(old.to_datum_iter());
527                        datums_local.extend(new.to_datum_iter());
528
529                        closure
530                            .apply(&mut datums_local, &temp_storage, &mut row_builder)
531                            .expect("Closure claimed to never error")
532                            .cloned()
533                    });
534
535            (oks, None)
536        }
537    }
538}