Skip to main content

mz_compute/render/join/
linear_join.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Rendering of linear join plans.
11//!
12//! Consult [LinearJoinPlan] documentation for details.
13
14use std::time::{Duration, Instant};
15
16use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
17use differential_dataflow::lattice::Lattice;
18use differential_dataflow::operators::arrange::arrangement::Arranged;
19use differential_dataflow::trace::TraceReader;
20use differential_dataflow::{AsCollection, Data, VecCollection};
21use mz_compute_types::dyncfgs::{ENABLE_MZ_JOIN_CORE, LINEAR_JOIN_YIELDING};
22use mz_compute_types::plan::join::JoinClosure;
23use mz_compute_types::plan::join::linear_join::{LinearJoinPlan, LinearStagePlan};
24use mz_dyncfg::ConfigSet;
25use mz_repr::fixed_length::ToDatumIter;
26use mz_repr::{DatumVec, Diff, Row, RowArena, SharedRow};
27use mz_storage_types::errors::DataflowError;
28use mz_timely_util::columnar::builder::ColumnBuilder;
29use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
30use mz_timely_util::operator::{CollectionExt, StreamExt};
31use timely::dataflow::Scope;
32use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
33use timely::dataflow::operators::OkErr;
34
35use crate::extensions::arrange::MzArrangeCore;
36use crate::render::RenderTimestamp;
37use crate::render::context::{ArrangementFlavor, CollectionBundle, Context};
38use crate::render::join::mz_join_core::mz_join_core;
39use crate::row_spine::{RowRowBuilder, RowRowSpine};
40use crate::typedefs::{RowRowAgent, RowRowEnter};
41
42/// Available linear join implementations.
43///
44/// See the `mz_join_core` module docs for our rationale for providing two join implementations.
45#[derive(Clone, Copy)]
46enum LinearJoinImpl {
47    Materialize,
48    DifferentialDataflow,
49}
50
51/// Specification of how linear joins are to be executed.
52///
53/// Note that currently `yielding` only affects the `Materialize` join implementation, as the DD
54/// join doesn't allow configuring its yielding behavior. Merging [#390] would fix this.
55///
56/// [#390]: https://github.com/TimelyDataflow/differential-dataflow/pull/390
57#[derive(Clone, Copy)]
58pub struct LinearJoinSpec {
59    implementation: LinearJoinImpl,
60    yielding: YieldSpec,
61}
62
63impl Default for LinearJoinSpec {
64    fn default() -> Self {
65        Self {
66            implementation: LinearJoinImpl::Materialize,
67            yielding: Default::default(),
68        }
69    }
70}
71
72impl LinearJoinSpec {
73    /// Create a `LinearJoinSpec` based on the given config.
74    pub fn from_config(config: &ConfigSet) -> Self {
75        let implementation = if ENABLE_MZ_JOIN_CORE.get(config) {
76            LinearJoinImpl::Materialize
77        } else {
78            LinearJoinImpl::DifferentialDataflow
79        };
80
81        let yielding_raw = LINEAR_JOIN_YIELDING.get(config);
82        let yielding = YieldSpec::try_from_str(&yielding_raw).unwrap_or_else(|| {
83            tracing::error!("invalid LINEAR_JOIN_YIELDING config: {yielding_raw}");
84            YieldSpec::default()
85        });
86
87        Self {
88            implementation,
89            yielding,
90        }
91    }
92
93    /// Render a join operator according to this specification.
94    fn render<'s, T, Tr1, Tr2, L, I>(
95        &self,
96        arranged1: Arranged<'s, Tr1>,
97        arranged2: Arranged<'s, Tr2>,
98        result: L,
99    ) -> VecCollection<'s, T, I::Item, Diff>
100    where
101        T: Lattice + timely::progress::Timestamp,
102        Tr1: TraceReader<Time = T, Diff = Diff> + Clone + 'static,
103        Tr2: for<'a> TraceReader<Key<'a> = Tr1::Key<'a>, Time = T, Diff = Diff> + Clone + 'static,
104        L: FnMut(Tr1::Key<'_>, Tr1::Val<'_>, Tr2::Val<'_>) -> I + 'static,
105        I: IntoIterator<Item: Data> + 'static,
106    {
107        use LinearJoinImpl::*;
108
109        match (
110            self.implementation,
111            self.yielding.after_work,
112            self.yielding.after_time,
113        ) {
114            (DifferentialDataflow, _, _) => arranged1.join_core(arranged2, result),
115            (Materialize, Some(work_limit), Some(time_limit)) => {
116                let yield_fn =
117                    move |start: Instant, work| work >= work_limit || start.elapsed() >= time_limit;
118                mz_join_core(arranged1, arranged2, result, yield_fn).as_collection()
119            }
120            (Materialize, Some(work_limit), None) => {
121                let yield_fn = move |_start, work| work >= work_limit;
122                mz_join_core(arranged1, arranged2, result, yield_fn).as_collection()
123            }
124            (Materialize, None, Some(time_limit)) => {
125                let yield_fn = move |start: Instant, _work| start.elapsed() >= time_limit;
126                mz_join_core(arranged1, arranged2, result, yield_fn).as_collection()
127            }
128            (Materialize, None, None) => {
129                let yield_fn = |_start, _work| false;
130                mz_join_core(arranged1, arranged2, result, yield_fn).as_collection()
131            }
132        }
133    }
134}
135
136/// Specification of a dataflow operator's yielding behavior.
137#[derive(Clone, Copy)]
138struct YieldSpec {
139    /// Yield after the given amount of work was performed.
140    after_work: Option<usize>,
141    /// Yield after the given amount of time has elapsed.
142    after_time: Option<Duration>,
143}
144
145impl Default for YieldSpec {
146    fn default() -> Self {
147        Self {
148            after_work: Some(1_000_000),
149            after_time: Some(Duration::from_millis(100)),
150        }
151    }
152}
153
154impl YieldSpec {
155    fn try_from_str(s: &str) -> Option<Self> {
156        let mut after_work = None;
157        let mut after_time = None;
158
159        let options = s.split(',').map(|o| o.trim());
160        for option in options {
161            let mut iter = option.split(':').map(|p| p.trim());
162            match std::array::from_fn(|_| iter.next()) {
163                [Some("work"), Some(amount), None] => {
164                    let amount = amount.parse().ok()?;
165                    after_work = Some(amount);
166                }
167                [Some("time"), Some(millis), None] => {
168                    let millis = millis.parse().ok()?;
169                    let duration = Duration::from_millis(millis);
170                    after_time = Some(duration);
171                }
172                _ => return None,
173            }
174        }
175
176        Some(Self {
177            after_work,
178            after_time,
179        })
180    }
181}
182
183/// Different forms the streamed data might take.
184enum JoinedFlavor<'scope, T: RenderTimestamp> {
185    /// Streamed data as a collection.
186    Collection(VecCollection<'scope, T, Row, Diff>),
187    /// A dataflow-local arrangement.
188    Local(Arranged<'scope, RowRowAgent<T, Diff>>),
189    /// An imported arrangement.
190    Trace(Arranged<'scope, RowRowEnter<mz_repr::Timestamp, Diff, T>>),
191}
192
193impl<'scope, T> Context<'scope, T>
194where
195    T: Lattice + RenderTimestamp,
196{
197    pub(crate) fn render_join(
198        &self,
199        inputs: Vec<CollectionBundle<'scope, T>>,
200        linear_plan: LinearJoinPlan,
201    ) -> CollectionBundle<'scope, T> {
202        self.scope.clone().region_named("Join(Linear)", |inner| {
203            self.render_join_inner(inputs, linear_plan, inner)
204        })
205    }
206
207    fn render_join_inner(
208        &self,
209        inputs: Vec<CollectionBundle<'scope, T>>,
210        linear_plan: LinearJoinPlan,
211        inner: Scope<'_, T>,
212    ) -> CollectionBundle<'scope, T> {
213        // Collect all error streams, and concatenate them at the end.
214        let mut errors = Vec::new();
215
216        // Determine which form our maintained spine of updates will initially take.
217        // First, just check out the availability of an appropriate arrangement.
218        // This will be `None` in the degenerate single-input join case, which ensures
219        // that we do not panic if we never go around the `stage_plans` loop.
220        let arrangement = linear_plan
221            .stage_plans
222            .get(0)
223            .and_then(|stage| inputs[linear_plan.source_relation].arrangement(&stage.stream_key));
224        // We can use an arrangement if it exists and an initial closure does not.
225        let mut joined = match (arrangement, linear_plan.initial_closure) {
226            (Some(ArrangementFlavor::Local(oks, errs)), None) => {
227                errors.push(errs.as_collection(|k, _v| k.clone()).enter_region(inner));
228                JoinedFlavor::Local(oks.enter_region(inner))
229            }
230            (Some(ArrangementFlavor::Trace(_gid, oks, errs)), None) => {
231                errors.push(errs.as_collection(|k, _v| k.clone()).enter_region(inner));
232                JoinedFlavor::Trace(oks.enter_region(inner))
233            }
234            (_, initial_closure) => {
235                // TODO: extract closure from the first stage in the join plan, should it exist.
236                // TODO: apply that closure in `flat_map_ref` rather than calling `.collection`.
237                let (joined, errs) = inputs[linear_plan.source_relation]
238                    .as_specific_collection(linear_plan.source_key.as_deref(), &self.config_set);
239                errors.push(errs.enter_region(inner));
240                let mut joined = joined.enter_region(inner);
241
242                // In the current code this should always be `None`, but we have this here should
243                // we change that and want to know what we should be doing.
244                if let Some(closure) = initial_closure {
245                    // If there is no starting arrangement, then we can run filters
246                    // directly on the starting collection.
247                    // If there is only one input, we are done joining, so run filters
248                    let name = "LinearJoinInitialization";
249                    type CB<C> = ConsolidatingContainerBuilder<C>;
250                    let (j, errs) = joined.flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>(name, {
251                        // Reuseable allocation for unpacking.
252                        let mut datums = DatumVec::new();
253                        move |row| {
254                            let mut row_builder = SharedRow::get();
255                            let temp_storage = RowArena::new();
256                            let mut datums_local = datums.borrow_with(&row);
257                            // TODO(mcsherry): re-use `row` allocation.
258                            closure
259                                .apply(&mut datums_local, &temp_storage, &mut row_builder)
260                                .map(|row| row.cloned())
261                                .map_err(DataflowError::from)
262                                .transpose()
263                        }
264                    });
265                    joined = j;
266                    errors.push(errs);
267                }
268
269                JoinedFlavor::Collection(joined)
270            }
271        };
272
273        // progress through stages, updating partial results and errors.
274        for stage_plan in linear_plan.stage_plans.into_iter() {
275            // Different variants of `joined` implement this differently,
276            // and the logic is centralized there.
277            let stream = self.differential_join(
278                joined,
279                inputs[stage_plan.lookup_relation].enter_region(inner),
280                stage_plan,
281                &mut errors,
282            );
283            // Update joined results and capture any errors.
284            joined = JoinedFlavor::Collection(stream);
285        }
286
287        // We have completed the join building, but may have work remaining.
288        // For example, we may have expressions not pushed down (e.g. literals)
289        // and projections that could not be applied (e.g. column repetition).
290        let bundle = if let JoinedFlavor::Collection(mut joined) = joined {
291            if let Some(closure) = linear_plan.final_closure {
292                let name = "LinearJoinFinalization";
293                type CB<C> = ConsolidatingContainerBuilder<C>;
294                let (updates, errs) = joined.flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>(name, {
295                    // Reuseable allocation for unpacking.
296                    let mut datums = DatumVec::new();
297                    move |row| {
298                        let mut row_builder = SharedRow::get();
299                        let temp_storage = RowArena::new();
300                        let mut datums_local = datums.borrow_with(&row);
301                        // TODO(mcsherry): re-use `row` allocation.
302                        closure
303                            .apply(&mut datums_local, &temp_storage, &mut row_builder)
304                            .map(|row| row.cloned())
305                            .map_err(DataflowError::from)
306                            .transpose()
307                    }
308                });
309
310                joined = updates;
311                errors.push(errs);
312            }
313
314            // Return joined results and all produced errors collected together.
315            CollectionBundle::from_collections(
316                joined,
317                differential_dataflow::collection::concatenate(inner, errors),
318            )
319        } else {
320            panic!("Unexpectedly arranged join output");
321        };
322        bundle.leave_region(self.scope)
323    }
324
325    /// Looks up the arrangement for the next input and joins it to the arranged
326    /// version of the join of previous inputs.
327    fn differential_join<'s>(
328        &self,
329        mut joined: JoinedFlavor<'s, T>,
330        lookup_relation: CollectionBundle<'s, T>,
331        LinearStagePlan {
332            stream_key,
333            stream_thinning,
334            lookup_key,
335            closure,
336            lookup_relation: _,
337        }: LinearStagePlan,
338        errors: &mut Vec<VecCollection<'s, T, DataflowError, Diff>>,
339    ) -> VecCollection<'s, T, Row, Diff> {
340        // If we have only a streamed collection, we must first form an arrangement.
341        if let JoinedFlavor::Collection(stream) = joined {
342            let name = "LinearJoinKeyPreparation";
343            let (keyed, errs) = stream
344                .inner
345                .unary_fallible::<ColumnBuilder<((Row, Row), T, Diff)>, _, _, _>(
346                    Pipeline,
347                    name,
348                    |_, _| {
349                        Box::new(move |input, ok, errs| {
350                            let mut temp_storage = RowArena::new();
351                            let mut key_buf = Row::default();
352                            let mut val_buf = Row::default();
353                            let mut datums = DatumVec::new();
354                            input.for_each(|time, data| {
355                                let mut ok_session = ok.session_with_builder(&time);
356                                let mut err_session = errs.session(&time);
357                                for (row, time, diff) in data.iter() {
358                                    temp_storage.clear();
359                                    let datums_local = datums.borrow_with(row);
360                                    let datums = stream_key
361                                        .iter()
362                                        .map(|e| e.eval(&datums_local, &temp_storage));
363                                    let result = key_buf.packer().try_extend(datums);
364                                    match result {
365                                        Ok(()) => {
366                                            val_buf.packer().extend(
367                                                stream_thinning.iter().map(|e| datums_local[*e]),
368                                            );
369                                            ok_session.give(((&key_buf, &val_buf), time, diff));
370                                        }
371                                        Err(e) => {
372                                            err_session.give((e.into(), time.clone(), *diff));
373                                        }
374                                    }
375                                }
376                            });
377                        })
378                    },
379                );
380
381            errors.push(errs.as_collection());
382
383            let arranged = keyed
384                .mz_arrange_core::<
385                    _,
386                    Col2ValBatcher<_, _, _, _>,
387                    RowRowBuilder<_, _>,
388                    RowRowSpine<_, _>,
389                >(
390                    ExchangeCore::<ColumnBuilder<_>, _>::new_core(
391                        columnar_exchange::<Row, Row, T, Diff>,
392                    ),
393                    "JoinStage"
394                );
395            joined = JoinedFlavor::Local(arranged);
396        }
397
398        // Demultiplex the four different cross products of arrangement types we might have.
399        let arrangement = lookup_relation
400            .arrangement(&lookup_key[..])
401            .expect("Arrangement absent despite explicit construction");
402
403        match joined {
404            JoinedFlavor::Collection(_) => {
405                unreachable!("JoinedFlavor::VecCollection variant avoided at top of method");
406            }
407            JoinedFlavor::Local(local) => match arrangement {
408                ArrangementFlavor::Local(oks, errs1) => {
409                    let (oks, errs2) = self
410                        .differential_join_inner::<RowRowAgent<_, _>, RowRowAgent<_, _>>(
411                            local, oks, closure,
412                        );
413
414                    errors.push(errs1.as_collection(|k, _v| k.clone()));
415                    errors.extend(errs2);
416                    oks
417                }
418                ArrangementFlavor::Trace(_gid, oks, errs1) => {
419                    let (oks, errs2) = self
420                        .differential_join_inner::<RowRowAgent<_, _>, RowRowEnter<_, _, _>>(
421                            local, oks, closure,
422                        );
423
424                    errors.push(errs1.as_collection(|k, _v| k.clone()));
425                    errors.extend(errs2);
426                    oks
427                }
428            },
429            JoinedFlavor::Trace(trace) => match arrangement {
430                ArrangementFlavor::Local(oks, errs1) => {
431                    let (oks, errs2) = self
432                        .differential_join_inner::<RowRowEnter<_, _, _>, RowRowAgent<_, _>>(
433                            trace, oks, closure,
434                        );
435
436                    errors.push(errs1.as_collection(|k, _v| k.clone()));
437                    errors.extend(errs2);
438                    oks
439                }
440                ArrangementFlavor::Trace(_gid, oks, errs1) => {
441                    let (oks, errs2) = self
442                        .differential_join_inner::<RowRowEnter<_, _, _>, RowRowEnter<_, _, _>>(
443                            trace, oks, closure,
444                        );
445
446                    errors.push(errs1.as_collection(|k, _v| k.clone()));
447                    errors.extend(errs2);
448                    oks
449                }
450            },
451        }
452    }
453
454    /// Joins the arrangement for `next_input` to the arranged version of the
455    /// join of previous inputs. This is split into its own method to enable
456    /// reuse of code with different types of `next_input`.
457    ///
458    /// The return type includes an optional error collection, which may be
459    /// `None` if we can determine that `closure` cannot error.
460    fn differential_join_inner<'s, Tr1, Tr2>(
461        &self,
462        prev_keyed: Arranged<'s, Tr1>,
463        next_input: Arranged<'s, Tr2>,
464        closure: JoinClosure,
465    ) -> (
466        VecCollection<'s, T, Row, Diff>,
467        Option<VecCollection<'s, T, DataflowError, Diff>>,
468    )
469    where
470        Tr1: TraceReader<Time = T, Diff = Diff> + Clone + 'static,
471        Tr2: for<'a> TraceReader<Key<'a> = Tr1::Key<'a>, Time = T, Diff = Diff> + Clone + 'static,
472        for<'a> Tr1::Key<'a>: ToDatumIter,
473        for<'a> Tr1::Val<'a>: ToDatumIter,
474        for<'a> Tr2::Val<'a>: ToDatumIter,
475    {
476        // Reuseable allocation for unpacking.
477        let mut datums = DatumVec::new();
478
479        if closure.could_error() {
480            let (oks, err) = self
481                .linear_join_spec
482                .render(prev_keyed, next_input, move |key, old, new| {
483                    let mut row_builder = SharedRow::get();
484                    let temp_storage = RowArena::new();
485
486                    let mut datums_local = datums.borrow();
487                    datums_local.extend(key.to_datum_iter());
488                    datums_local.extend(old.to_datum_iter());
489                    datums_local.extend(new.to_datum_iter());
490
491                    closure
492                        .apply(&mut datums_local, &temp_storage, &mut row_builder)
493                        .map(|row| row.cloned())
494                        .map_err(DataflowError::from)
495                        .transpose()
496                })
497                .inner
498                .ok_err(|(x, t, d)| {
499                    // TODO(mcsherry): consider `ok_err()` for `Collection`.
500                    match x {
501                        Ok(x) => Ok((x, t, d)),
502                        Err(x) => Err((x, t, d)),
503                    }
504                });
505
506            (oks.as_collection(), Some(err.as_collection()))
507        } else {
508            let oks = self
509                .linear_join_spec
510                .render(prev_keyed, next_input, move |key, old, new| {
511                    let mut row_builder = SharedRow::get();
512                    let temp_storage = RowArena::new();
513
514                    let mut datums_local = datums.borrow();
515                    datums_local.extend(key.to_datum_iter());
516                    datums_local.extend(old.to_datum_iter());
517                    datums_local.extend(new.to_datum_iter());
518
519                    closure
520                        .apply(&mut datums_local, &temp_storage, &mut row_builder)
521                        .expect("Closure claimed to never error")
522                        .cloned()
523                });
524
525            (oks, None)
526        }
527    }
528}