mz_compute/render/join/
linear_join.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Rendering of linear join plans.
11//!
12//! Consult [LinearJoinPlan] documentation for details.
13
14use std::time::{Duration, Instant};
15
16use columnar::Columnar;
17use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
18use differential_dataflow::containers::Columnation;
19use differential_dataflow::lattice::Lattice;
20use differential_dataflow::operators::arrange::arrangement::Arranged;
21use differential_dataflow::trace::TraceReader;
22use differential_dataflow::{AsCollection, Collection, Data};
23use mz_compute_types::dyncfgs::{ENABLE_MZ_JOIN_CORE, LINEAR_JOIN_YIELDING};
24use mz_compute_types::plan::join::JoinClosure;
25use mz_compute_types::plan::join::linear_join::{LinearJoinPlan, LinearStagePlan};
26use mz_dyncfg::ConfigSet;
27use mz_repr::fixed_length::ToDatumIter;
28use mz_repr::{DatumVec, Diff, Row, RowArena, SharedRow};
29use mz_storage_types::errors::DataflowError;
30use mz_timely_util::containers::{Col2ValBatcher, ColumnBuilder, columnar_exchange};
31use mz_timely_util::operator::{CollectionExt, StreamExt};
32use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
33use timely::dataflow::operators::OkErr;
34use timely::dataflow::scopes::Child;
35use timely::dataflow::{Scope, ScopeParent};
36use timely::progress::timestamp::{Refines, Timestamp};
37
38use crate::extensions::arrange::MzArrangeCore;
39use crate::render::RenderTimestamp;
40use crate::render::context::{ArrangementFlavor, CollectionBundle, Context, ShutdownToken};
41use crate::render::join::mz_join_core::mz_join_core;
42use crate::row_spine::{RowRowBuilder, RowRowSpine};
43use crate::typedefs::{RowRowAgent, RowRowEnter};
44
45/// Available linear join implementations.
46///
47/// See the `mz_join_core` module docs for our rationale for providing two join implementations.
48#[derive(Clone, Copy)]
49enum LinearJoinImpl {
50    Materialize,
51    DifferentialDataflow,
52}
53
54/// Specification of how linear joins are to be executed.
55///
56/// Note that currently `yielding` only affects the `Materialize` join implementation, as the DD
57/// join doesn't allow configuring its yielding behavior. Merging [#390] would fix this.
58///
59/// [#390]: https://github.com/TimelyDataflow/differential-dataflow/pull/390
60#[derive(Clone, Copy)]
61pub struct LinearJoinSpec {
62    implementation: LinearJoinImpl,
63    yielding: YieldSpec,
64}
65
66impl Default for LinearJoinSpec {
67    fn default() -> Self {
68        Self {
69            implementation: LinearJoinImpl::Materialize,
70            yielding: Default::default(),
71        }
72    }
73}
74
75impl LinearJoinSpec {
76    /// Create a `LinearJoinSpec` based on the given config.
77    pub fn from_config(config: &ConfigSet) -> Self {
78        let implementation = match ENABLE_MZ_JOIN_CORE.get(config) {
79            true => LinearJoinImpl::Materialize,
80            false => LinearJoinImpl::DifferentialDataflow,
81        };
82
83        let yielding_raw = LINEAR_JOIN_YIELDING.get(config);
84        let yielding = YieldSpec::try_from_str(&yielding_raw).unwrap_or_else(|| {
85            tracing::error!("invalid LINEAR_JOIN_YIELDING config: {yielding_raw}");
86            YieldSpec::default()
87        });
88
89        Self {
90            implementation,
91            yielding,
92        }
93    }
94
95    /// Render a join operator according to this specification.
96    fn render<G, Tr1, Tr2, L, I>(
97        &self,
98        arranged1: &Arranged<G, Tr1>,
99        arranged2: &Arranged<G, Tr2>,
100        shutdown_token: ShutdownToken,
101        result: L,
102    ) -> Collection<G, I::Item, Diff>
103    where
104        G: Scope,
105        G::Timestamp: Lattice,
106        Tr1: TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
107        Tr2: for<'a> TraceReader<Key<'a> = Tr1::Key<'a>, Time = G::Timestamp, Diff = Diff>
108            + Clone
109            + 'static,
110        L: FnMut(Tr1::Key<'_>, Tr1::Val<'_>, Tr2::Val<'_>) -> I + 'static,
111        I: IntoIterator,
112        I::Item: Data,
113    {
114        use LinearJoinImpl::*;
115
116        match (
117            self.implementation,
118            self.yielding.after_work,
119            self.yielding.after_time,
120        ) {
121            (DifferentialDataflow, _, _) => arranged1.join_core(arranged2, result),
122            (Materialize, Some(work_limit), Some(time_limit)) => {
123                let yield_fn =
124                    move |start: Instant, work| work >= work_limit || start.elapsed() >= time_limit;
125                mz_join_core(arranged1, arranged2, shutdown_token, result, yield_fn).as_collection()
126            }
127            (Materialize, Some(work_limit), None) => {
128                let yield_fn = move |_start, work| work >= work_limit;
129                mz_join_core(arranged1, arranged2, shutdown_token, result, yield_fn).as_collection()
130            }
131            (Materialize, None, Some(time_limit)) => {
132                let yield_fn = move |start: Instant, _work| start.elapsed() >= time_limit;
133                mz_join_core(arranged1, arranged2, shutdown_token, result, yield_fn).as_collection()
134            }
135            (Materialize, None, None) => {
136                let yield_fn = |_start, _work| false;
137                mz_join_core(arranged1, arranged2, shutdown_token, result, yield_fn).as_collection()
138            }
139        }
140    }
141}
142
143/// Specification of a dataflow operator's yielding behavior.
144#[derive(Clone, Copy)]
145struct YieldSpec {
146    /// Yield after the given amount of work was performed.
147    after_work: Option<usize>,
148    /// Yield after the given amount of time has elapsed.
149    after_time: Option<Duration>,
150}
151
152impl Default for YieldSpec {
153    fn default() -> Self {
154        Self {
155            after_work: Some(1_000_000),
156            after_time: Some(Duration::from_millis(100)),
157        }
158    }
159}
160
161impl YieldSpec {
162    fn try_from_str(s: &str) -> Option<Self> {
163        let mut after_work = None;
164        let mut after_time = None;
165
166        let options = s.split(',').map(|o| o.trim());
167        for option in options {
168            let mut iter = option.split(':').map(|p| p.trim());
169            match std::array::from_fn(|_| iter.next()) {
170                [Some("work"), Some(amount), None] => {
171                    let amount = amount.parse().ok()?;
172                    after_work = Some(amount);
173                }
174                [Some("time"), Some(millis), None] => {
175                    let millis = millis.parse().ok()?;
176                    let duration = Duration::from_millis(millis);
177                    after_time = Some(duration);
178                }
179                _ => return None,
180            }
181        }
182
183        Some(Self {
184            after_work,
185            after_time,
186        })
187    }
188}
189
190/// Different forms the streamed data might take.
191enum JoinedFlavor<G, T>
192where
193    G: Scope,
194    G::Timestamp: Lattice + Refines<T> + Columnation,
195    T: Timestamp + Lattice + Columnation,
196{
197    /// Streamed data as a collection.
198    Collection(Collection<G, Row, Diff>),
199    /// A dataflow-local arrangement.
200    Local(Arranged<G, RowRowAgent<G::Timestamp, Diff>>),
201    /// An imported arrangement.
202    Trace(Arranged<G, RowRowEnter<T, Diff, G::Timestamp>>),
203}
204
205impl<G, T> Context<G, T>
206where
207    G: Scope,
208    G::Timestamp: Lattice + Refines<T> + RenderTimestamp,
209    <G::Timestamp as Columnar>::Container: Clone + Send,
210    for<'a> <G::Timestamp as Columnar>::Ref<'a>: Ord + Copy,
211    T: Timestamp + Lattice + Columnation,
212{
213    pub(crate) fn render_join(
214        &self,
215        inputs: Vec<CollectionBundle<G, T>>,
216        linear_plan: LinearJoinPlan,
217    ) -> CollectionBundle<G, T> {
218        self.scope.clone().region_named("Join(Linear)", |inner| {
219            self.render_join_inner(inputs, linear_plan, inner)
220        })
221    }
222
223    fn render_join_inner(
224        &self,
225        inputs: Vec<CollectionBundle<G, T>>,
226        linear_plan: LinearJoinPlan,
227        inner: &mut Child<G, <G as ScopeParent>::Timestamp>,
228    ) -> CollectionBundle<G, T> {
229        // Collect all error streams, and concatenate them at the end.
230        let mut errors = Vec::new();
231
232        // Determine which form our maintained spine of updates will initially take.
233        // First, just check out the availability of an appropriate arrangement.
234        // This will be `None` in the degenerate single-input join case, which ensures
235        // that we do not panic if we never go around the `stage_plans` loop.
236        let arrangement = linear_plan
237            .stage_plans
238            .get(0)
239            .and_then(|stage| inputs[linear_plan.source_relation].arrangement(&stage.stream_key));
240        // We can use an arrangement if it exists and an initial closure does not.
241        let mut joined = match (arrangement, linear_plan.initial_closure) {
242            (Some(ArrangementFlavor::Local(oks, errs)), None) => {
243                errors.push(errs.as_collection(|k, _v| k.clone()).enter_region(inner));
244                JoinedFlavor::Local(oks.enter_region(inner))
245            }
246            (Some(ArrangementFlavor::Trace(_gid, oks, errs)), None) => {
247                errors.push(errs.as_collection(|k, _v| k.clone()).enter_region(inner));
248                JoinedFlavor::Trace(oks.enter_region(inner))
249            }
250            (_, initial_closure) => {
251                // TODO: extract closure from the first stage in the join plan, should it exist.
252                // TODO: apply that closure in `flat_map_ref` rather than calling `.collection`.
253                let (joined, errs) = inputs[linear_plan.source_relation]
254                    .as_specific_collection(linear_plan.source_key.as_deref(), &self.config_set);
255                errors.push(errs.enter_region(inner));
256                let mut joined = joined.enter_region(inner);
257
258                // In the current code this should always be `None`, but we have this here should
259                // we change that and want to know what we should be doing.
260                if let Some(closure) = initial_closure {
261                    // If there is no starting arrangement, then we can run filters
262                    // directly on the starting collection.
263                    // If there is only one input, we are done joining, so run filters
264                    let name = "LinearJoinInitialization";
265                    type CB<C> = ConsolidatingContainerBuilder<C>;
266                    let (j, errs) = joined.flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>(name, {
267                        // Reuseable allocation for unpacking.
268                        let mut datums = DatumVec::new();
269                        move |row| {
270                            let binding = SharedRow::get();
271                            let mut row_builder = binding.borrow_mut();
272                            let temp_storage = RowArena::new();
273                            let mut datums_local = datums.borrow_with(&row);
274                            // TODO(mcsherry): re-use `row` allocation.
275                            closure
276                                .apply(&mut datums_local, &temp_storage, &mut row_builder)
277                                .map(|row| row.cloned())
278                                .map_err(DataflowError::from)
279                                .transpose()
280                        }
281                    });
282                    joined = j;
283                    errors.push(errs);
284                }
285
286                JoinedFlavor::Collection(joined)
287            }
288        };
289
290        // progress through stages, updating partial results and errors.
291        for stage_plan in linear_plan.stage_plans.into_iter() {
292            // Different variants of `joined` implement this differently,
293            // and the logic is centralized there.
294            let stream = self.differential_join(
295                joined,
296                inputs[stage_plan.lookup_relation].enter_region(inner),
297                stage_plan,
298                &mut errors,
299            );
300            // Update joined results and capture any errors.
301            joined = JoinedFlavor::Collection(stream);
302        }
303
304        // We have completed the join building, but may have work remaining.
305        // For example, we may have expressions not pushed down (e.g. literals)
306        // and projections that could not be applied (e.g. column repetition).
307        let bundle = if let JoinedFlavor::Collection(mut joined) = joined {
308            if let Some(closure) = linear_plan.final_closure {
309                let name = "LinearJoinFinalization";
310                type CB<C> = ConsolidatingContainerBuilder<C>;
311                let (updates, errs) = joined.flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>(name, {
312                    // Reuseable allocation for unpacking.
313                    let mut datums = DatumVec::new();
314                    move |row| {
315                        let binding = SharedRow::get();
316                        let mut row_builder = binding.borrow_mut();
317                        let temp_storage = RowArena::new();
318                        let mut datums_local = datums.borrow_with(&row);
319                        // TODO(mcsherry): re-use `row` allocation.
320                        closure
321                            .apply(&mut datums_local, &temp_storage, &mut row_builder)
322                            .map(|row| row.cloned())
323                            .map_err(DataflowError::from)
324                            .transpose()
325                    }
326                });
327
328                joined = updates;
329                errors.push(errs);
330            }
331
332            // Return joined results and all produced errors collected together.
333            CollectionBundle::from_collections(
334                joined,
335                differential_dataflow::collection::concatenate(inner, errors),
336            )
337        } else {
338            panic!("Unexpectedly arranged join output");
339        };
340        bundle.leave_region()
341    }
342
343    /// Looks up the arrangement for the next input and joins it to the arranged
344    /// version of the join of previous inputs.
345    fn differential_join<S>(
346        &self,
347        mut joined: JoinedFlavor<S, T>,
348        lookup_relation: CollectionBundle<S, T>,
349        LinearStagePlan {
350            stream_key,
351            stream_thinning,
352            lookup_key,
353            closure,
354            lookup_relation: _,
355        }: LinearStagePlan,
356        errors: &mut Vec<Collection<S, DataflowError, Diff>>,
357    ) -> Collection<S, Row, Diff>
358    where
359        S: Scope<Timestamp = G::Timestamp>,
360    {
361        // If we have only a streamed collection, we must first form an arrangement.
362        if let JoinedFlavor::Collection(stream) = joined {
363            let name = "LinearJoinKeyPreparation";
364            let (keyed, errs) = stream
365                .inner
366                .unary_fallible::<ColumnBuilder<((Row, Row), S::Timestamp, Diff)>, _, _, _>(
367                    Pipeline,
368                    name,
369                    |_, _| {
370                        Box::new(move |input, ok, errs| {
371                            let mut temp_storage = RowArena::new();
372                            let mut key_buf = Row::default();
373                            let mut val_buf = Row::default();
374                            let mut datums = DatumVec::new();
375                            while let Some((time, data)) = input.next() {
376                                let mut ok_session = ok.session_with_builder(&time);
377                                let mut err_session = errs.session(&time);
378                                for (row, time, diff) in data.iter() {
379                                    temp_storage.clear();
380                                    let datums_local = datums.borrow_with(row);
381                                    let datums = stream_key
382                                        .iter()
383                                        .map(|e| e.eval(&datums_local, &temp_storage));
384                                    let result = key_buf.packer().try_extend(datums);
385                                    match result {
386                                        Ok(()) => {
387                                            val_buf.packer().extend(
388                                                stream_thinning.iter().map(|e| datums_local[*e]),
389                                            );
390                                            ok_session.give(((&key_buf, &val_buf), time, diff));
391                                        }
392                                        Err(e) => {
393                                            err_session.give((e.into(), time.clone(), *diff));
394                                        }
395                                    }
396                                }
397                            }
398                        })
399                    },
400                );
401
402            errors.push(errs.as_collection());
403
404            let arranged = keyed
405                .mz_arrange_core::<_, Col2ValBatcher<_, _,_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
406                    ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<Row, Row, S::Timestamp, Diff>),"JoinStage"
407                );
408            joined = JoinedFlavor::Local(arranged);
409        }
410
411        // Demultiplex the four different cross products of arrangement types we might have.
412        let arrangement = lookup_relation
413            .arrangement(&lookup_key[..])
414            .expect("Arrangement absent despite explicit construction");
415
416        match joined {
417            JoinedFlavor::Collection(_) => {
418                unreachable!("JoinedFlavor::Collection variant avoided at top of method");
419            }
420            JoinedFlavor::Local(local) => match arrangement {
421                ArrangementFlavor::Local(oks, errs1) => {
422                    let (oks, errs2) = self
423                        .differential_join_inner::<_, RowRowAgent<_, _>, RowRowAgent<_, _>>(
424                            local, oks, closure,
425                        );
426
427                    errors.push(errs1.as_collection(|k, _v| k.clone()));
428                    errors.extend(errs2);
429                    oks
430                }
431                ArrangementFlavor::Trace(_gid, oks, errs1) => {
432                    let (oks, errs2) = self
433                        .differential_join_inner::<_, RowRowAgent<_, _>, RowRowEnter<_, _, _>>(
434                            local, oks, closure,
435                        );
436
437                    errors.push(errs1.as_collection(|k, _v| k.clone()));
438                    errors.extend(errs2);
439                    oks
440                }
441            },
442            JoinedFlavor::Trace(trace) => match arrangement {
443                ArrangementFlavor::Local(oks, errs1) => {
444                    let (oks, errs2) = self
445                        .differential_join_inner::<_, RowRowEnter<_, _, _>, RowRowAgent<_, _>>(
446                            trace, oks, closure,
447                        );
448
449                    errors.push(errs1.as_collection(|k, _v| k.clone()));
450                    errors.extend(errs2);
451                    oks
452                }
453                ArrangementFlavor::Trace(_gid, oks, errs1) => {
454                    let (oks, errs2) = self
455                        .differential_join_inner::<_, RowRowEnter<_, _, _>, RowRowEnter<_, _, _>>(
456                            trace, oks, closure,
457                        );
458
459                    errors.push(errs1.as_collection(|k, _v| k.clone()));
460                    errors.extend(errs2);
461                    oks
462                }
463            },
464        }
465    }
466
467    /// Joins the arrangement for `next_input` to the arranged version of the
468    /// join of previous inputs. This is split into its own method to enable
469    /// reuse of code with different types of `next_input`.
470    ///
471    /// The return type includes an optional error collection, which may be
472    /// `None` if we can determine that `closure` cannot error.
473    fn differential_join_inner<S, Tr1, Tr2>(
474        &self,
475        prev_keyed: Arranged<S, Tr1>,
476        next_input: Arranged<S, Tr2>,
477        closure: JoinClosure,
478    ) -> (
479        Collection<S, Row, Diff>,
480        Option<Collection<S, DataflowError, Diff>>,
481    )
482    where
483        S: Scope<Timestamp = G::Timestamp>,
484        Tr1: TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
485        Tr2: for<'a> TraceReader<Key<'a> = Tr1::Key<'a>, Time = G::Timestamp, Diff = Diff>
486            + Clone
487            + 'static,
488        for<'a> Tr1::Key<'a>: ToDatumIter,
489        for<'a> Tr1::Val<'a>: ToDatumIter,
490        for<'a> Tr2::Val<'a>: ToDatumIter,
491    {
492        // Reuseable allocation for unpacking.
493        let mut datums = DatumVec::new();
494
495        if closure.could_error() {
496            let (oks, err) = self
497                .linear_join_spec
498                .render(
499                    &prev_keyed,
500                    &next_input,
501                    self.shutdown_token.clone(),
502                    move |key, old, new| {
503                        let binding = SharedRow::get();
504                        let mut row_builder = binding.borrow_mut();
505                        let temp_storage = RowArena::new();
506
507                        let mut datums_local = datums.borrow();
508                        datums_local.extend(key.to_datum_iter());
509                        datums_local.extend(old.to_datum_iter());
510                        datums_local.extend(new.to_datum_iter());
511
512                        closure
513                            .apply(&mut datums_local, &temp_storage, &mut row_builder)
514                            .map(|row| row.cloned())
515                            .map_err(DataflowError::from)
516                            .transpose()
517                    },
518                )
519                .inner
520                .ok_err(|(x, t, d)| {
521                    // TODO(mcsherry): consider `ok_err()` for `Collection`.
522                    match x {
523                        Ok(x) => Ok((x, t, d)),
524                        Err(x) => Err((x, t, d)),
525                    }
526                });
527
528            (oks.as_collection(), Some(err.as_collection()))
529        } else {
530            let oks = self.linear_join_spec.render(
531                &prev_keyed,
532                &next_input,
533                self.shutdown_token.clone(),
534                move |key, old, new| {
535                    let binding = SharedRow::get();
536                    let mut row_builder = binding.borrow_mut();
537                    let temp_storage = RowArena::new();
538
539                    let mut datums_local = datums.borrow();
540                    datums_local.extend(key.to_datum_iter());
541                    datums_local.extend(old.to_datum_iter());
542                    datums_local.extend(new.to_datum_iter());
543
544                    closure
545                        .apply(&mut datums_local, &temp_storage, &mut row_builder)
546                        .expect("Closure claimed to never error")
547                        .cloned()
548                },
549            );
550
551            (oks, None)
552        }
553    }
554}