mz_compute/render/join/
linear_join.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Rendering of linear join plans.
11//!
12//! Consult [LinearJoinPlan] documentation for details.
13
14use std::time::{Duration, Instant};
15
16use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
17use differential_dataflow::lattice::Lattice;
18use differential_dataflow::operators::arrange::arrangement::Arranged;
19use differential_dataflow::trace::TraceReader;
20use differential_dataflow::{AsCollection, Collection, Data};
21use mz_compute_types::dyncfgs::{
22    ENABLE_MZ_JOIN_CORE, ENABLE_MZ_JOIN_CORE_V2, LINEAR_JOIN_YIELDING,
23};
24use mz_compute_types::plan::join::JoinClosure;
25use mz_compute_types::plan::join::linear_join::{LinearJoinPlan, LinearStagePlan};
26use mz_dyncfg::ConfigSet;
27use mz_repr::fixed_length::ToDatumIter;
28use mz_repr::{DatumVec, Diff, Row, RowArena, SharedRow};
29use mz_storage_types::errors::DataflowError;
30use mz_timely_util::columnar::builder::ColumnBuilder;
31use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
32use mz_timely_util::operator::{CollectionExt, StreamExt};
33use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
34use timely::dataflow::operators::OkErr;
35use timely::dataflow::scopes::Child;
36use timely::dataflow::{Scope, ScopeParent};
37use timely::progress::timestamp::Refines;
38
39use crate::extensions::arrange::MzArrangeCore;
40use crate::render::RenderTimestamp;
41use crate::render::context::{ArrangementFlavor, CollectionBundle, Context, ShutdownProbe};
42use crate::render::join::mz_join_core::mz_join_core;
43use crate::render::join::mz_join_core_v2::mz_join_core as mz_join_core_v2;
44use crate::row_spine::{RowRowBuilder, RowRowSpine};
45use crate::typedefs::{MzTimestamp, RowRowAgent, RowRowEnter};
46
47/// Available linear join implementations.
48///
49/// See the `mz_join_core` module docs for our rationale for providing two join implementations.
50#[derive(Clone, Copy)]
51enum LinearJoinImpl {
52    Materialize,
53    MaterializeV2,
54    DifferentialDataflow,
55}
56
57/// Specification of how linear joins are to be executed.
58///
59/// Note that currently `yielding` only affects the `Materialize` join implementation, as the DD
60/// join doesn't allow configuring its yielding behavior. Merging [#390] would fix this.
61///
62/// [#390]: https://github.com/TimelyDataflow/differential-dataflow/pull/390
63#[derive(Clone, Copy)]
64pub struct LinearJoinSpec {
65    implementation: LinearJoinImpl,
66    yielding: YieldSpec,
67}
68
69impl Default for LinearJoinSpec {
70    fn default() -> Self {
71        Self {
72            implementation: LinearJoinImpl::Materialize,
73            yielding: Default::default(),
74        }
75    }
76}
77
78impl LinearJoinSpec {
79    /// Create a `LinearJoinSpec` based on the given config.
80    pub fn from_config(config: &ConfigSet) -> Self {
81        let implementation = if ENABLE_MZ_JOIN_CORE_V2.get(config) {
82            LinearJoinImpl::MaterializeV2
83        } else if ENABLE_MZ_JOIN_CORE.get(config) {
84            LinearJoinImpl::Materialize
85        } else {
86            LinearJoinImpl::DifferentialDataflow
87        };
88
89        let yielding_raw = LINEAR_JOIN_YIELDING.get(config);
90        let yielding = YieldSpec::try_from_str(&yielding_raw).unwrap_or_else(|| {
91            tracing::error!("invalid LINEAR_JOIN_YIELDING config: {yielding_raw}");
92            YieldSpec::default()
93        });
94
95        Self {
96            implementation,
97            yielding,
98        }
99    }
100
101    /// Render a join operator according to this specification.
102    fn render<G, Tr1, Tr2, L, I>(
103        &self,
104        arranged1: &Arranged<G, Tr1>,
105        arranged2: &Arranged<G, Tr2>,
106        shutdown_probe: ShutdownProbe,
107        result: L,
108    ) -> Collection<G, I::Item, Diff>
109    where
110        G: Scope,
111        G::Timestamp: Lattice,
112        Tr1: TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
113        Tr2: for<'a> TraceReader<Key<'a> = Tr1::Key<'a>, Time = G::Timestamp, Diff = Diff>
114            + Clone
115            + 'static,
116        L: FnMut(Tr1::Key<'_>, Tr1::Val<'_>, Tr2::Val<'_>) -> I + 'static,
117        I: IntoIterator<Item: Data> + 'static,
118    {
119        use LinearJoinImpl::*;
120
121        match (
122            self.implementation,
123            self.yielding.after_work,
124            self.yielding.after_time,
125        ) {
126            (DifferentialDataflow, _, _) => arranged1.join_core(arranged2, result),
127            (Materialize, Some(work_limit), Some(time_limit)) => {
128                let yield_fn =
129                    move |start: Instant, work| work >= work_limit || start.elapsed() >= time_limit;
130                mz_join_core(arranged1, arranged2, shutdown_probe, result, yield_fn).as_collection()
131            }
132            (Materialize, Some(work_limit), None) => {
133                let yield_fn = move |_start, work| work >= work_limit;
134                mz_join_core(arranged1, arranged2, shutdown_probe, result, yield_fn).as_collection()
135            }
136            (Materialize, None, Some(time_limit)) => {
137                let yield_fn = move |start: Instant, _work| start.elapsed() >= time_limit;
138                mz_join_core(arranged1, arranged2, shutdown_probe, result, yield_fn).as_collection()
139            }
140            (Materialize, None, None) => {
141                let yield_fn = |_start, _work| false;
142                mz_join_core(arranged1, arranged2, shutdown_probe, result, yield_fn).as_collection()
143            }
144            (MaterializeV2, Some(work_limit), Some(time_limit)) => {
145                let yield_fn =
146                    move |start: Instant, work| work >= work_limit || start.elapsed() >= time_limit;
147                mz_join_core_v2(arranged1, arranged2, shutdown_probe, result, yield_fn)
148                    .as_collection()
149            }
150            (MaterializeV2, Some(work_limit), None) => {
151                let yield_fn = move |_start, work| work >= work_limit;
152                mz_join_core_v2(arranged1, arranged2, shutdown_probe, result, yield_fn)
153                    .as_collection()
154            }
155            (MaterializeV2, None, Some(time_limit)) => {
156                let yield_fn = move |start: Instant, _work| start.elapsed() >= time_limit;
157                mz_join_core_v2(arranged1, arranged2, shutdown_probe, result, yield_fn)
158                    .as_collection()
159            }
160            (MaterializeV2, None, None) => {
161                let yield_fn = |_start, _work| false;
162                mz_join_core_v2(arranged1, arranged2, shutdown_probe, result, yield_fn)
163                    .as_collection()
164            }
165        }
166    }
167}
168
169/// Specification of a dataflow operator's yielding behavior.
170#[derive(Clone, Copy)]
171struct YieldSpec {
172    /// Yield after the given amount of work was performed.
173    after_work: Option<usize>,
174    /// Yield after the given amount of time has elapsed.
175    after_time: Option<Duration>,
176}
177
178impl Default for YieldSpec {
179    fn default() -> Self {
180        Self {
181            after_work: Some(1_000_000),
182            after_time: Some(Duration::from_millis(100)),
183        }
184    }
185}
186
187impl YieldSpec {
188    fn try_from_str(s: &str) -> Option<Self> {
189        let mut after_work = None;
190        let mut after_time = None;
191
192        let options = s.split(',').map(|o| o.trim());
193        for option in options {
194            let mut iter = option.split(':').map(|p| p.trim());
195            match std::array::from_fn(|_| iter.next()) {
196                [Some("work"), Some(amount), None] => {
197                    let amount = amount.parse().ok()?;
198                    after_work = Some(amount);
199                }
200                [Some("time"), Some(millis), None] => {
201                    let millis = millis.parse().ok()?;
202                    let duration = Duration::from_millis(millis);
203                    after_time = Some(duration);
204                }
205                _ => return None,
206            }
207        }
208
209        Some(Self {
210            after_work,
211            after_time,
212        })
213    }
214}
215
216/// Different forms the streamed data might take.
217enum JoinedFlavor<G, T>
218where
219    G: Scope,
220    G::Timestamp: Refines<T> + MzTimestamp,
221    T: MzTimestamp,
222{
223    /// Streamed data as a collection.
224    Collection(Collection<G, Row, Diff>),
225    /// A dataflow-local arrangement.
226    Local(Arranged<G, RowRowAgent<G::Timestamp, Diff>>),
227    /// An imported arrangement.
228    Trace(Arranged<G, RowRowEnter<T, Diff, G::Timestamp>>),
229}
230
231impl<G, T> Context<G, T>
232where
233    G: Scope,
234    G::Timestamp: Lattice + Refines<T> + RenderTimestamp,
235    T: MzTimestamp,
236{
237    pub(crate) fn render_join(
238        &self,
239        inputs: Vec<CollectionBundle<G, T>>,
240        linear_plan: LinearJoinPlan,
241    ) -> CollectionBundle<G, T> {
242        self.scope.clone().region_named("Join(Linear)", |inner| {
243            self.render_join_inner(inputs, linear_plan, inner)
244        })
245    }
246
247    fn render_join_inner(
248        &self,
249        inputs: Vec<CollectionBundle<G, T>>,
250        linear_plan: LinearJoinPlan,
251        inner: &mut Child<G, <G as ScopeParent>::Timestamp>,
252    ) -> CollectionBundle<G, T> {
253        // Collect all error streams, and concatenate them at the end.
254        let mut errors = Vec::new();
255
256        // Determine which form our maintained spine of updates will initially take.
257        // First, just check out the availability of an appropriate arrangement.
258        // This will be `None` in the degenerate single-input join case, which ensures
259        // that we do not panic if we never go around the `stage_plans` loop.
260        let arrangement = linear_plan
261            .stage_plans
262            .get(0)
263            .and_then(|stage| inputs[linear_plan.source_relation].arrangement(&stage.stream_key));
264        // We can use an arrangement if it exists and an initial closure does not.
265        let mut joined = match (arrangement, linear_plan.initial_closure) {
266            (Some(ArrangementFlavor::Local(oks, errs)), None) => {
267                errors.push(errs.as_collection(|k, _v| k.clone()).enter_region(inner));
268                JoinedFlavor::Local(oks.enter_region(inner))
269            }
270            (Some(ArrangementFlavor::Trace(_gid, oks, errs)), None) => {
271                errors.push(errs.as_collection(|k, _v| k.clone()).enter_region(inner));
272                JoinedFlavor::Trace(oks.enter_region(inner))
273            }
274            (_, initial_closure) => {
275                // TODO: extract closure from the first stage in the join plan, should it exist.
276                // TODO: apply that closure in `flat_map_ref` rather than calling `.collection`.
277                let (joined, errs) = inputs[linear_plan.source_relation]
278                    .as_specific_collection(linear_plan.source_key.as_deref(), &self.config_set);
279                errors.push(errs.enter_region(inner));
280                let mut joined = joined.enter_region(inner);
281
282                // In the current code this should always be `None`, but we have this here should
283                // we change that and want to know what we should be doing.
284                if let Some(closure) = initial_closure {
285                    // If there is no starting arrangement, then we can run filters
286                    // directly on the starting collection.
287                    // If there is only one input, we are done joining, so run filters
288                    let name = "LinearJoinInitialization";
289                    type CB<C> = ConsolidatingContainerBuilder<C>;
290                    let (j, errs) = joined.flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>(name, {
291                        // Reuseable allocation for unpacking.
292                        let mut datums = DatumVec::new();
293                        move |row| {
294                            let mut row_builder = SharedRow::get();
295                            let temp_storage = RowArena::new();
296                            let mut datums_local = datums.borrow_with(&row);
297                            // TODO(mcsherry): re-use `row` allocation.
298                            closure
299                                .apply(&mut datums_local, &temp_storage, &mut row_builder)
300                                .map(|row| row.cloned())
301                                .map_err(DataflowError::from)
302                                .transpose()
303                        }
304                    });
305                    joined = j;
306                    errors.push(errs);
307                }
308
309                JoinedFlavor::Collection(joined)
310            }
311        };
312
313        // progress through stages, updating partial results and errors.
314        for stage_plan in linear_plan.stage_plans.into_iter() {
315            // Different variants of `joined` implement this differently,
316            // and the logic is centralized there.
317            let stream = self.differential_join(
318                joined,
319                inputs[stage_plan.lookup_relation].enter_region(inner),
320                stage_plan,
321                &mut errors,
322            );
323            // Update joined results and capture any errors.
324            joined = JoinedFlavor::Collection(stream);
325        }
326
327        // We have completed the join building, but may have work remaining.
328        // For example, we may have expressions not pushed down (e.g. literals)
329        // and projections that could not be applied (e.g. column repetition).
330        let bundle = if let JoinedFlavor::Collection(mut joined) = joined {
331            if let Some(closure) = linear_plan.final_closure {
332                let name = "LinearJoinFinalization";
333                type CB<C> = ConsolidatingContainerBuilder<C>;
334                let (updates, errs) = joined.flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>(name, {
335                    // Reuseable allocation for unpacking.
336                    let mut datums = DatumVec::new();
337                    move |row| {
338                        let mut row_builder = SharedRow::get();
339                        let temp_storage = RowArena::new();
340                        let mut datums_local = datums.borrow_with(&row);
341                        // TODO(mcsherry): re-use `row` allocation.
342                        closure
343                            .apply(&mut datums_local, &temp_storage, &mut row_builder)
344                            .map(|row| row.cloned())
345                            .map_err(DataflowError::from)
346                            .transpose()
347                    }
348                });
349
350                joined = updates;
351                errors.push(errs);
352            }
353
354            // Return joined results and all produced errors collected together.
355            CollectionBundle::from_collections(
356                joined,
357                differential_dataflow::collection::concatenate(inner, errors),
358            )
359        } else {
360            panic!("Unexpectedly arranged join output");
361        };
362        bundle.leave_region()
363    }
364
365    /// Looks up the arrangement for the next input and joins it to the arranged
366    /// version of the join of previous inputs.
367    fn differential_join<S>(
368        &self,
369        mut joined: JoinedFlavor<S, T>,
370        lookup_relation: CollectionBundle<S, T>,
371        LinearStagePlan {
372            stream_key,
373            stream_thinning,
374            lookup_key,
375            closure,
376            lookup_relation: _,
377        }: LinearStagePlan,
378        errors: &mut Vec<Collection<S, DataflowError, Diff>>,
379    ) -> Collection<S, Row, Diff>
380    where
381        S: Scope<Timestamp = G::Timestamp>,
382    {
383        // If we have only a streamed collection, we must first form an arrangement.
384        if let JoinedFlavor::Collection(stream) = joined {
385            let name = "LinearJoinKeyPreparation";
386            let (keyed, errs) = stream
387                .inner
388                .unary_fallible::<ColumnBuilder<((Row, Row), S::Timestamp, Diff)>, _, _, _>(
389                    Pipeline,
390                    name,
391                    |_, _| {
392                        Box::new(move |input, ok, errs| {
393                            let mut temp_storage = RowArena::new();
394                            let mut key_buf = Row::default();
395                            let mut val_buf = Row::default();
396                            let mut datums = DatumVec::new();
397                            while let Some((time, data)) = input.next() {
398                                let mut ok_session = ok.session_with_builder(&time);
399                                let mut err_session = errs.session(&time);
400                                for (row, time, diff) in data.iter() {
401                                    temp_storage.clear();
402                                    let datums_local = datums.borrow_with(row);
403                                    let datums = stream_key
404                                        .iter()
405                                        .map(|e| e.eval(&datums_local, &temp_storage));
406                                    let result = key_buf.packer().try_extend(datums);
407                                    match result {
408                                        Ok(()) => {
409                                            val_buf.packer().extend(
410                                                stream_thinning.iter().map(|e| datums_local[*e]),
411                                            );
412                                            ok_session.give(((&key_buf, &val_buf), time, diff));
413                                        }
414                                        Err(e) => {
415                                            err_session.give((e.into(), time.clone(), *diff));
416                                        }
417                                    }
418                                }
419                            }
420                        })
421                    },
422                );
423
424            errors.push(errs.as_collection());
425
426            let arranged = keyed
427                .mz_arrange_core::<_, Col2ValBatcher<_, _,_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
428                    ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<Row, Row, S::Timestamp, Diff>),"JoinStage"
429                );
430            joined = JoinedFlavor::Local(arranged);
431        }
432
433        // Demultiplex the four different cross products of arrangement types we might have.
434        let arrangement = lookup_relation
435            .arrangement(&lookup_key[..])
436            .expect("Arrangement absent despite explicit construction");
437
438        match joined {
439            JoinedFlavor::Collection(_) => {
440                unreachable!("JoinedFlavor::Collection variant avoided at top of method");
441            }
442            JoinedFlavor::Local(local) => match arrangement {
443                ArrangementFlavor::Local(oks, errs1) => {
444                    let (oks, errs2) = self
445                        .differential_join_inner::<_, RowRowAgent<_, _>, RowRowAgent<_, _>>(
446                            local, oks, closure,
447                        );
448
449                    errors.push(errs1.as_collection(|k, _v| k.clone()));
450                    errors.extend(errs2);
451                    oks
452                }
453                ArrangementFlavor::Trace(_gid, oks, errs1) => {
454                    let (oks, errs2) = self
455                        .differential_join_inner::<_, RowRowAgent<_, _>, RowRowEnter<_, _, _>>(
456                            local, oks, closure,
457                        );
458
459                    errors.push(errs1.as_collection(|k, _v| k.clone()));
460                    errors.extend(errs2);
461                    oks
462                }
463            },
464            JoinedFlavor::Trace(trace) => match arrangement {
465                ArrangementFlavor::Local(oks, errs1) => {
466                    let (oks, errs2) = self
467                        .differential_join_inner::<_, RowRowEnter<_, _, _>, RowRowAgent<_, _>>(
468                            trace, oks, closure,
469                        );
470
471                    errors.push(errs1.as_collection(|k, _v| k.clone()));
472                    errors.extend(errs2);
473                    oks
474                }
475                ArrangementFlavor::Trace(_gid, oks, errs1) => {
476                    let (oks, errs2) = self
477                        .differential_join_inner::<_, RowRowEnter<_, _, _>, RowRowEnter<_, _, _>>(
478                            trace, oks, closure,
479                        );
480
481                    errors.push(errs1.as_collection(|k, _v| k.clone()));
482                    errors.extend(errs2);
483                    oks
484                }
485            },
486        }
487    }
488
489    /// Joins the arrangement for `next_input` to the arranged version of the
490    /// join of previous inputs. This is split into its own method to enable
491    /// reuse of code with different types of `next_input`.
492    ///
493    /// The return type includes an optional error collection, which may be
494    /// `None` if we can determine that `closure` cannot error.
495    fn differential_join_inner<S, Tr1, Tr2>(
496        &self,
497        prev_keyed: Arranged<S, Tr1>,
498        next_input: Arranged<S, Tr2>,
499        closure: JoinClosure,
500    ) -> (
501        Collection<S, Row, Diff>,
502        Option<Collection<S, DataflowError, Diff>>,
503    )
504    where
505        S: Scope<Timestamp = G::Timestamp>,
506        Tr1: TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
507        Tr2: for<'a> TraceReader<Key<'a> = Tr1::Key<'a>, Time = G::Timestamp, Diff = Diff>
508            + Clone
509            + 'static,
510        for<'a> Tr1::Key<'a>: ToDatumIter,
511        for<'a> Tr1::Val<'a>: ToDatumIter,
512        for<'a> Tr2::Val<'a>: ToDatumIter,
513    {
514        // Reuseable allocation for unpacking.
515        let mut datums = DatumVec::new();
516
517        if closure.could_error() {
518            let (oks, err) = self
519                .linear_join_spec
520                .render(
521                    &prev_keyed,
522                    &next_input,
523                    self.shutdown_probe.clone(),
524                    move |key, old, new| {
525                        let mut row_builder = SharedRow::get();
526                        let temp_storage = RowArena::new();
527
528                        let mut datums_local = datums.borrow();
529                        datums_local.extend(key.to_datum_iter());
530                        datums_local.extend(old.to_datum_iter());
531                        datums_local.extend(new.to_datum_iter());
532
533                        closure
534                            .apply(&mut datums_local, &temp_storage, &mut row_builder)
535                            .map(|row| row.cloned())
536                            .map_err(DataflowError::from)
537                            .transpose()
538                    },
539                )
540                .inner
541                .ok_err(|(x, t, d)| {
542                    // TODO(mcsherry): consider `ok_err()` for `Collection`.
543                    match x {
544                        Ok(x) => Ok((x, t, d)),
545                        Err(x) => Err((x, t, d)),
546                    }
547                });
548
549            (oks.as_collection(), Some(err.as_collection()))
550        } else {
551            let oks = self.linear_join_spec.render(
552                &prev_keyed,
553                &next_input,
554                self.shutdown_probe.clone(),
555                move |key, old, new| {
556                    let mut row_builder = SharedRow::get();
557                    let temp_storage = RowArena::new();
558
559                    let mut datums_local = datums.borrow();
560                    datums_local.extend(key.to_datum_iter());
561                    datums_local.extend(old.to_datum_iter());
562                    datums_local.extend(new.to_datum_iter());
563
564                    closure
565                        .apply(&mut datums_local, &temp_storage, &mut row_builder)
566                        .expect("Closure claimed to never error")
567                        .cloned()
568                },
569            );
570
571            (oks, None)
572        }
573    }
574}