Skip to main content

mz_compute/render/join/
linear_join.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Rendering of linear join plans.
11//!
12//! Consult [LinearJoinPlan] documentation for details.
13
14use std::time::{Duration, Instant};
15
16use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
17use differential_dataflow::lattice::Lattice;
18use differential_dataflow::operators::arrange::arrangement::Arranged;
19use differential_dataflow::trace::TraceReader;
20use differential_dataflow::{AsCollection, Data, VecCollection};
21use mz_compute_types::dyncfgs::{
22    ENABLE_COLUMN_PAGED_BATCHER, ENABLE_MZ_JOIN_CORE, LINEAR_JOIN_YIELDING,
23};
24use mz_compute_types::plan::join::JoinClosure;
25use mz_compute_types::plan::join::linear_join::{LinearJoinPlan, LinearStagePlan};
26use mz_dyncfg::ConfigSet;
27use mz_expr::Eval;
28use mz_repr::fixed_length::ToDatumIter;
29use mz_repr::{DatumVec, Diff, Row, RowArena, SharedRow};
30use mz_timely_util::columnar::batcher;
31use mz_timely_util::columnar::builder::ColumnBuilder;
32use mz_timely_util::columnar::{Col2ValBatcher, Col2ValPagedBatcher, columnar_exchange};
33use mz_timely_util::operator::{CollectionExt, StreamExt};
34use timely::dataflow::Scope;
35use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
36use timely::dataflow::operators::OkErr;
37
38use crate::extensions::arrange::MzArrangeCore;
39use crate::render::RenderTimestamp;
40use crate::render::context::{ArrangementFlavor, CollectionBundle, Context};
41use crate::render::errors::DataflowErrorSer;
42use crate::render::join::mz_join_core::mz_join_core;
43use crate::typedefs::{RowRowAgent, RowRowEnter};
44use mz_row_spine::{RowRowBuilder, RowRowColPagedBuilder, RowRowSpine};
45
46/// Available linear join implementations.
47///
48/// See the `mz_join_core` module docs for our rationale for providing two join implementations.
49#[derive(Clone, Copy)]
50enum LinearJoinImpl {
51    Materialize,
52    DifferentialDataflow,
53}
54
55/// Specification of how linear joins are to be executed.
56///
57/// Note that currently `yielding` only affects the `Materialize` join implementation, as the DD
58/// join doesn't allow configuring its yielding behavior. Merging [#390] would fix this.
59///
60/// [#390]: https://github.com/TimelyDataflow/differential-dataflow/pull/390
61#[derive(Clone, Copy)]
62pub struct LinearJoinSpec {
63    implementation: LinearJoinImpl,
64    yielding: YieldSpec,
65}
66
67impl Default for LinearJoinSpec {
68    fn default() -> Self {
69        Self {
70            implementation: LinearJoinImpl::Materialize,
71            yielding: Default::default(),
72        }
73    }
74}
75
76impl LinearJoinSpec {
77    /// Create a `LinearJoinSpec` based on the given config.
78    pub fn from_config(config: &ConfigSet) -> Self {
79        let implementation = if ENABLE_MZ_JOIN_CORE.get(config) {
80            LinearJoinImpl::Materialize
81        } else {
82            LinearJoinImpl::DifferentialDataflow
83        };
84
85        let yielding_raw = LINEAR_JOIN_YIELDING.get(config);
86        let yielding = YieldSpec::try_from_str(&yielding_raw).unwrap_or_else(|| {
87            tracing::error!("invalid LINEAR_JOIN_YIELDING config: {yielding_raw}");
88            YieldSpec::default()
89        });
90
91        Self {
92            implementation,
93            yielding,
94        }
95    }
96
97    /// Render a join operator according to this specification.
98    fn render<'s, T, Tr1, Tr2, L, I>(
99        &self,
100        arranged1: Arranged<'s, Tr1>,
101        arranged2: Arranged<'s, Tr2>,
102        result: L,
103    ) -> VecCollection<'s, T, I::Item, Diff>
104    where
105        T: Lattice + timely::progress::Timestamp,
106        Tr1: TraceReader<Time = T, Diff = Diff> + Clone + 'static,
107        Tr2: for<'a> TraceReader<Key<'a> = Tr1::Key<'a>, Time = T, Diff = Diff> + Clone + 'static,
108        L: FnMut(Tr1::Key<'_>, Tr1::Val<'_>, Tr2::Val<'_>) -> I + 'static,
109        I: IntoIterator<Item: Data> + 'static,
110    {
111        use LinearJoinImpl::*;
112
113        match (
114            self.implementation,
115            self.yielding.after_work,
116            self.yielding.after_time,
117        ) {
118            (DifferentialDataflow, _, _) => arranged1.join_core(arranged2, result),
119            (Materialize, Some(work_limit), Some(time_limit)) => {
120                let yield_fn =
121                    move |start: Instant, work| work >= work_limit || start.elapsed() >= time_limit;
122                mz_join_core(arranged1, arranged2, result, yield_fn).as_collection()
123            }
124            (Materialize, Some(work_limit), None) => {
125                let yield_fn = move |_start, work| work >= work_limit;
126                mz_join_core(arranged1, arranged2, result, yield_fn).as_collection()
127            }
128            (Materialize, None, Some(time_limit)) => {
129                let yield_fn = move |start: Instant, _work| start.elapsed() >= time_limit;
130                mz_join_core(arranged1, arranged2, result, yield_fn).as_collection()
131            }
132            (Materialize, None, None) => {
133                let yield_fn = |_start, _work| false;
134                mz_join_core(arranged1, arranged2, result, yield_fn).as_collection()
135            }
136        }
137    }
138}
139
140/// Specification of a dataflow operator's yielding behavior.
141#[derive(Clone, Copy)]
142struct YieldSpec {
143    /// Yield after the given amount of work was performed.
144    after_work: Option<usize>,
145    /// Yield after the given amount of time has elapsed.
146    after_time: Option<Duration>,
147}
148
149impl Default for YieldSpec {
150    fn default() -> Self {
151        Self {
152            after_work: Some(1_000_000),
153            after_time: Some(Duration::from_millis(100)),
154        }
155    }
156}
157
158impl YieldSpec {
159    fn try_from_str(s: &str) -> Option<Self> {
160        let mut after_work = None;
161        let mut after_time = None;
162
163        let options = s.split(',').map(|o| o.trim());
164        for option in options {
165            let mut iter = option.split(':').map(|p| p.trim());
166            match std::array::from_fn(|_| iter.next()) {
167                [Some("work"), Some(amount), None] => {
168                    let amount = amount.parse().ok()?;
169                    after_work = Some(amount);
170                }
171                [Some("time"), Some(millis), None] => {
172                    let millis = millis.parse().ok()?;
173                    let duration = Duration::from_millis(millis);
174                    after_time = Some(duration);
175                }
176                _ => return None,
177            }
178        }
179
180        Some(Self {
181            after_work,
182            after_time,
183        })
184    }
185}
186
187/// Different forms the streamed data might take.
188enum JoinedFlavor<'scope, T: RenderTimestamp> {
189    /// Streamed data as a collection.
190    Collection(VecCollection<'scope, T, Row, Diff>),
191    /// A dataflow-local arrangement.
192    Local(Arranged<'scope, RowRowAgent<T, Diff>>),
193    /// An imported arrangement.
194    Trace(Arranged<'scope, RowRowEnter<mz_repr::Timestamp, Diff, T>>),
195}
196
197impl<'scope, T> Context<'scope, T>
198where
199    T: Lattice + RenderTimestamp,
200{
201    pub(crate) fn render_join(
202        &self,
203        inputs: Vec<CollectionBundle<'scope, T>>,
204        linear_plan: LinearJoinPlan,
205    ) -> CollectionBundle<'scope, T> {
206        self.scope.clone().region_named("Join(Linear)", |inner| {
207            self.render_join_inner(inputs, linear_plan, inner)
208        })
209    }
210
211    fn render_join_inner(
212        &self,
213        inputs: Vec<CollectionBundle<'scope, T>>,
214        linear_plan: LinearJoinPlan,
215        inner: Scope<'_, T>,
216    ) -> CollectionBundle<'scope, T> {
217        // Collect all error streams, and concatenate them at the end.
218        let mut errors = Vec::new();
219
220        // Determine which form our maintained spine of updates will initially take.
221        // First, just check out the availability of an appropriate arrangement.
222        // This will be `None` in the degenerate single-input join case, which ensures
223        // that we do not panic if we never go around the `stage_plans` loop.
224        let arrangement = linear_plan
225            .stage_plans
226            .get(0)
227            .and_then(|stage| inputs[linear_plan.source_relation].arrangement(&stage.stream_key));
228        // We can use an arrangement if it exists and an initial closure does not.
229        let mut joined = match (arrangement, linear_plan.initial_closure) {
230            (Some(ArrangementFlavor::Local(oks, errs)), None) => {
231                errors.push(errs.as_collection(|k, _v| k.clone()).enter_region(inner));
232                JoinedFlavor::Local(oks.enter_region(inner))
233            }
234            (Some(ArrangementFlavor::Trace(_gid, oks, errs)), None) => {
235                errors.push(errs.as_collection(|k, _v| k.clone()).enter_region(inner));
236                JoinedFlavor::Trace(oks.enter_region(inner))
237            }
238            (_, initial_closure) => {
239                // TODO: extract closure from the first stage in the join plan, should it exist.
240                // TODO: apply that closure in `flat_map_ref` rather than calling `.collection`.
241                let (joined, errs) = inputs[linear_plan.source_relation]
242                    .as_specific_collection(linear_plan.source_key.as_deref(), &self.config_set);
243                errors.push(errs.enter_region(inner));
244                let mut joined = joined.enter_region(inner);
245
246                // In the current code this should always be `None`, but we have this here should
247                // we change that and want to know what we should be doing.
248                if let Some(closure) = initial_closure {
249                    // If there is no starting arrangement, then we can run filters
250                    // directly on the starting collection.
251                    // If there is only one input, we are done joining, so run filters
252                    let name = "LinearJoinInitialization";
253                    type CB<C> = ConsolidatingContainerBuilder<C>;
254                    let (j, errs) = joined.flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>(name, {
255                        // Reuseable allocation for unpacking.
256                        let mut datums = DatumVec::new();
257                        move |row| {
258                            let mut row_builder = SharedRow::get();
259                            let temp_storage = RowArena::new();
260                            let mut datums_local = datums.borrow_with(&row);
261                            // TODO(mcsherry): re-use `row` allocation.
262                            closure
263                                .apply(&mut datums_local, &temp_storage, &mut row_builder)
264                                .map(|row| row.cloned())
265                                .map_err(DataflowErrorSer::from)
266                                .transpose()
267                        }
268                    });
269                    joined = j;
270                    errors.push(errs);
271                }
272
273                JoinedFlavor::Collection(joined)
274            }
275        };
276
277        // progress through stages, updating partial results and errors.
278        for stage_plan in linear_plan.stage_plans.into_iter() {
279            // Different variants of `joined` implement this differently,
280            // and the logic is centralized there.
281            let stream = self.differential_join(
282                joined,
283                inputs[stage_plan.lookup_relation].enter_region(inner),
284                stage_plan,
285                &mut errors,
286            );
287            // Update joined results and capture any errors.
288            joined = JoinedFlavor::Collection(stream);
289        }
290
291        // We have completed the join building, but may have work remaining.
292        // For example, we may have expressions not pushed down (e.g. literals)
293        // and projections that could not be applied (e.g. column repetition).
294        let bundle = if let JoinedFlavor::Collection(mut joined) = joined {
295            if let Some(closure) = linear_plan.final_closure {
296                let name = "LinearJoinFinalization";
297                type CB<C> = ConsolidatingContainerBuilder<C>;
298                let (updates, errs) = joined.flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>(name, {
299                    // Reuseable allocation for unpacking.
300                    let mut datums = DatumVec::new();
301                    move |row| {
302                        let mut row_builder = SharedRow::get();
303                        let temp_storage = RowArena::new();
304                        let mut datums_local = datums.borrow_with(&row);
305                        // TODO(mcsherry): re-use `row` allocation.
306                        closure
307                            .apply(&mut datums_local, &temp_storage, &mut row_builder)
308                            .map(|row| row.cloned())
309                            .map_err(DataflowErrorSer::from)
310                            .transpose()
311                    }
312                });
313
314                joined = updates;
315                errors.push(errs);
316            }
317
318            // Return joined results and all produced errors collected together.
319            CollectionBundle::from_collections(
320                joined,
321                differential_dataflow::collection::concatenate(inner, errors),
322            )
323        } else {
324            panic!("Unexpectedly arranged join output");
325        };
326        bundle.leave_region(self.scope)
327    }
328
329    /// Looks up the arrangement for the next input and joins it to the arranged
330    /// version of the join of previous inputs.
331    fn differential_join<'s>(
332        &self,
333        mut joined: JoinedFlavor<'s, T>,
334        lookup_relation: CollectionBundle<'s, T>,
335        LinearStagePlan {
336            stream_key,
337            stream_thinning,
338            lookup_key,
339            closure,
340            lookup_relation: _,
341        }: LinearStagePlan,
342        errors: &mut Vec<VecCollection<'s, T, DataflowErrorSer, Diff>>,
343    ) -> VecCollection<'s, T, Row, Diff> {
344        // If we have only a streamed collection, we must first form an arrangement.
345        if let JoinedFlavor::Collection(stream) = joined {
346            let name = "LinearJoinKeyPreparation";
347            let (keyed, errs) = stream
348                .inner
349                .unary_fallible::<ColumnBuilder<((Row, Row), T, Diff)>, _, _, _>(
350                    Pipeline,
351                    name,
352                    |_, _| {
353                        Box::new(move |input, ok, errs| {
354                            let mut temp_storage = RowArena::new();
355                            let mut key_buf = Row::default();
356                            let mut val_buf = Row::default();
357                            let mut datums = DatumVec::new();
358                            input.for_each(|time, data| {
359                                let mut ok_session = ok.session_with_builder(&time);
360                                let mut err_session = errs.session(&time);
361                                for (row, time, diff) in data.iter() {
362                                    temp_storage.clear();
363                                    let datums_local = datums.borrow_with(row);
364                                    let datums = stream_key
365                                        .iter()
366                                        .map(|e| e.eval(&datums_local, &temp_storage));
367                                    let result = key_buf.packer().try_extend(datums);
368                                    match result {
369                                        Ok(()) => {
370                                            val_buf.packer().extend(
371                                                stream_thinning.iter().map(|e| datums_local[*e]),
372                                            );
373                                            ok_session.give(((&key_buf, &val_buf), time, diff));
374                                        }
375                                        Err(e) => {
376                                            err_session.give((e.into(), time.clone(), *diff));
377                                        }
378                                    }
379                                }
380                            });
381                        })
382                    },
383                );
384
385            errors.push(errs.as_collection());
386
387            let exchange = ExchangeCore::<ColumnBuilder<_>, _>::new_core(
388                columnar_exchange::<Row, Row, T, Diff>,
389            );
390            let arranged = if ENABLE_COLUMN_PAGED_BATCHER.get(&self.config_set) {
391                keyed.mz_arrange_core::<
392                    _,
393                    batcher::ColumnChunker<_>,
394                    Col2ValPagedBatcher<_, _, _, _>,
395                    RowRowColPagedBuilder<_, _>,
396                    RowRowSpine<_, _>,
397                >(exchange, "JoinStage")
398            } else {
399                keyed.mz_arrange_core::<
400                    _,
401                    batcher::Chunker<_>,
402                    Col2ValBatcher<_, _, _, _>,
403                    RowRowBuilder<_, _>,
404                    RowRowSpine<_, _>,
405                >(exchange, "JoinStage")
406            };
407            joined = JoinedFlavor::Local(arranged);
408        }
409
410        // Demultiplex the four different cross products of arrangement types we might have.
411        let arrangement = lookup_relation
412            .arrangement(&lookup_key[..])
413            .expect("Arrangement absent despite explicit construction");
414
415        match joined {
416            JoinedFlavor::Collection(_) => {
417                unreachable!("JoinedFlavor::VecCollection variant avoided at top of method");
418            }
419            JoinedFlavor::Local(local) => match arrangement {
420                ArrangementFlavor::Local(oks, errs1) => {
421                    let (oks, errs2) = self
422                        .differential_join_inner::<RowRowAgent<_, _>, RowRowAgent<_, _>>(
423                            local, oks, closure,
424                        );
425
426                    errors.push(errs1.as_collection(|k, _v| k.clone()));
427                    errors.extend(errs2);
428                    oks
429                }
430                ArrangementFlavor::Trace(_gid, oks, errs1) => {
431                    let (oks, errs2) = self
432                        .differential_join_inner::<RowRowAgent<_, _>, RowRowEnter<_, _, _>>(
433                            local, oks, closure,
434                        );
435
436                    errors.push(errs1.as_collection(|k, _v| k.clone()));
437                    errors.extend(errs2);
438                    oks
439                }
440            },
441            JoinedFlavor::Trace(trace) => match arrangement {
442                ArrangementFlavor::Local(oks, errs1) => {
443                    let (oks, errs2) = self
444                        .differential_join_inner::<RowRowEnter<_, _, _>, RowRowAgent<_, _>>(
445                            trace, oks, closure,
446                        );
447
448                    errors.push(errs1.as_collection(|k, _v| k.clone()));
449                    errors.extend(errs2);
450                    oks
451                }
452                ArrangementFlavor::Trace(_gid, oks, errs1) => {
453                    let (oks, errs2) = self
454                        .differential_join_inner::<RowRowEnter<_, _, _>, RowRowEnter<_, _, _>>(
455                            trace, oks, closure,
456                        );
457
458                    errors.push(errs1.as_collection(|k, _v| k.clone()));
459                    errors.extend(errs2);
460                    oks
461                }
462            },
463        }
464    }
465
466    /// Joins the arrangement for `next_input` to the arranged version of the
467    /// join of previous inputs. This is split into its own method to enable
468    /// reuse of code with different types of `next_input`.
469    ///
470    /// The return type includes an optional error collection, which may be
471    /// `None` if we can determine that `closure` cannot error.
472    fn differential_join_inner<'s, Tr1, Tr2>(
473        &self,
474        prev_keyed: Arranged<'s, Tr1>,
475        next_input: Arranged<'s, Tr2>,
476        closure: JoinClosure,
477    ) -> (
478        VecCollection<'s, T, Row, Diff>,
479        Option<VecCollection<'s, T, DataflowErrorSer, Diff>>,
480    )
481    where
482        Tr1: TraceReader<Time = T, Diff = Diff> + Clone + 'static,
483        Tr2: for<'a> TraceReader<Key<'a> = Tr1::Key<'a>, Time = T, Diff = Diff> + Clone + 'static,
484        for<'a> Tr1::Key<'a>: ToDatumIter,
485        for<'a> Tr1::Val<'a>: ToDatumIter,
486        for<'a> Tr2::Val<'a>: ToDatumIter,
487    {
488        // Reuseable allocation for unpacking.
489        let mut datums = DatumVec::new();
490
491        if closure.could_error() {
492            let (oks, err) = self
493                .linear_join_spec
494                .render(prev_keyed, next_input, move |key, old, new| {
495                    let mut row_builder = SharedRow::get();
496                    let temp_storage = RowArena::new();
497
498                    let mut datums_local = datums.borrow();
499                    datums_local.extend(key.to_datum_iter());
500                    datums_local.extend(old.to_datum_iter());
501                    datums_local.extend(new.to_datum_iter());
502
503                    closure
504                        .apply(&mut datums_local, &temp_storage, &mut row_builder)
505                        .map(|row| row.cloned())
506                        .map_err(DataflowErrorSer::from)
507                        .transpose()
508                })
509                .inner
510                .ok_err(|(x, t, d)| {
511                    // TODO(mcsherry): consider `ok_err()` for `Collection`.
512                    match x {
513                        Ok(x) => Ok((x, t, d)),
514                        Err(x) => Err((x, t, d)),
515                    }
516                });
517
518            (oks.as_collection(), Some(err.as_collection()))
519        } else {
520            let oks = self
521                .linear_join_spec
522                .render(prev_keyed, next_input, move |key, old, new| {
523                    let mut row_builder = SharedRow::get();
524                    let temp_storage = RowArena::new();
525
526                    let mut datums_local = datums.borrow();
527                    datums_local.extend(key.to_datum_iter());
528                    datums_local.extend(old.to_datum_iter());
529                    datums_local.extend(new.to_datum_iter());
530
531                    closure
532                        .apply(&mut datums_local, &temp_storage, &mut row_builder)
533                        .expect("Closure claimed to never error")
534                        .cloned()
535                });
536
537            (oks, None)
538        }
539    }
540}