mz_compute_types/plan/
join.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Planning of `Plan::Join` operators, and supporting types.
11//!
12//! Join planning proceeds by repeatedly introducing collections that
13//! extend the set of available output columns. The expected location
14//! of each output column is determined by the order of inputs to the
15//! join operator: columns are appended in that order.
16//!
17//! While planning the join, we also have access to logic in the form
18//! of expressions, predicates, and projections that we intended to
19//! apply to the output of the join. This logic uses "output column
20//! reckoning" where columns are identified by their intended output
21//! position.
22//!
23//! As we consider applying expressions to partial results, we will
24//! place the results in column locations *after* the intended output
25//! column locations. These output locations in addition to the new
26//! distinct identifiers for constructed expressions is "extended
27//! output column reckoning", as is what we use when reasoning about
28//! work still available to be done on the partial join results.
29
30use std::collections::BTreeMap;
31
32use mz_expr::{MapFilterProject, MirScalarExpr};
33use mz_repr::{Datum, Row, RowArena};
34use serde::{Deserialize, Serialize};
35
36pub mod delta_join;
37pub mod linear_join;
38
39pub use delta_join::DeltaJoinPlan;
40pub use linear_join::LinearJoinPlan;
41
42/// A complete enumeration of possible join plans to render.
43#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
44pub enum JoinPlan {
45    /// A join implemented by a linear join.
46    Linear(LinearJoinPlan),
47    /// A join implemented by a delta join.
48    Delta(DeltaJoinPlan),
49}
50
51/// A manual closure implementation of filtering and logic application.
52///
53/// This manual implementation exists to express lifetime constraints clearly,
54/// as there is a relationship between the borrowed lifetime of the closed-over
55/// state and the arguments it takes when invoked. It was not clear how to do
56/// this with a Rust closure (glorious battle was waged, but ultimately lost).
57#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
58pub struct JoinClosure {
59    /// TODO(database-issues#7533): Add documentation.
60    pub ready_equivalences: Vec<Vec<MirScalarExpr>>,
61    /// TODO(database-issues#7533): Add documentation.
62    pub before: mz_expr::SafeMfpPlan,
63}
64
65impl JoinClosure {
66    /// Applies per-row filtering and logic.
67    #[inline(always)]
68    pub fn apply<'a, 'row>(
69        &'a self,
70        datums: &mut Vec<Datum<'a>>,
71        temp_storage: &'a RowArena,
72        row: &'row mut Row,
73    ) -> Result<Option<&'row Row>, mz_expr::EvalError> {
74        for exprs in self.ready_equivalences.iter() {
75            // Each list of expressions should be equal to the same value.
76            let val = exprs[0].eval(&datums[..], temp_storage)?;
77            for expr in exprs[1..].iter() {
78                if expr.eval(datums, temp_storage)? != val {
79                    return Ok(None);
80                }
81            }
82        }
83        self.before.evaluate_into(datums, temp_storage, row)
84    }
85
86    /// Construct an instance of the closure from available columns.
87    ///
88    /// This method updates the available columns, equivalences, and
89    /// the `MapFilterProject` instance. The columns are updated to
90    /// include reference to any columns added by the application of
91    /// this logic, which might result from partial application of
92    /// the `MapFilterProject` instance.
93    ///
94    /// If all columns are available for `mfp`, this method works
95    /// extra hard to ensure that the closure contains all the work,
96    /// and `mfp` is left as an identity transform (which can then
97    /// be ignored).
98    fn build(
99        columns: &mut BTreeMap<usize, usize>,
100        equivalences: &mut Vec<Vec<MirScalarExpr>>,
101        mfp: &mut MapFilterProject,
102        permutation: BTreeMap<usize, usize>,
103        thinned_arity_with_key: usize,
104    ) -> Self {
105        // First, determine which columns should be compared due to `equivalences`.
106        let mut ready_equivalences = Vec::new();
107        for equivalence in equivalences.iter_mut() {
108            if let Some(pos) = equivalence
109                .iter()
110                .position(|e| e.support().into_iter().all(|c| columns.contains_key(&c)))
111            {
112                let mut should_equate = Vec::new();
113                let mut cursor = pos + 1;
114                while cursor < equivalence.len() {
115                    if equivalence[cursor]
116                        .support()
117                        .into_iter()
118                        .all(|c| columns.contains_key(&c))
119                    {
120                        // Remove expression and equate with the first bound expression.
121                        should_equate.push(equivalence.remove(cursor));
122                    } else {
123                        cursor += 1;
124                    }
125                }
126                if !should_equate.is_empty() {
127                    should_equate.push(equivalence[pos].clone());
128                    ready_equivalences.push(should_equate);
129                }
130            }
131        }
132        equivalences.retain(|e| e.len() > 1);
133        let permuted_columns = columns.iter().map(|(k, v)| (*k, permutation[v])).collect();
134        // Update ready_equivalences to reference correct column locations.
135        for exprs in ready_equivalences.iter_mut() {
136            for expr in exprs.iter_mut() {
137                expr.permute_map(&permuted_columns);
138            }
139        }
140
141        // Next, partition `mfp` into `before` and `after`, the former of which can be
142        // applied now.
143        let (mut before, after) = std::mem::replace(mfp, MapFilterProject::new(mfp.input_arity))
144            .partition(columns.clone(), columns.len());
145
146        // Add any newly created columns to `columns`. These columns may be referenced
147        // by `after`, and it will be important to track their locations.
148        let bonus_columns = before.projection.len() - before.input_arity;
149        for bonus_column in 0..bonus_columns {
150            columns.insert(mfp.input_arity + bonus_column, columns.len());
151        }
152
153        *mfp = after;
154
155        // Before constructing and returning the result, we can remove output columns of `before`
156        // that are not needed in further `equivalences` or by `after` (now `mfp`).
157        let mut demand = Vec::new();
158        demand.extend(mfp.demand());
159        for equivalence in equivalences.iter() {
160            for expr in equivalence.iter() {
161                demand.extend(expr.support());
162            }
163        }
164        demand.sort();
165        demand.dedup();
166        // We only want to remove columns that are presented as outputs (i.e. can be found as in
167        // `columns`). Other columns have yet to be introduced, and we shouldn't have any opinion
168        // about them yet.
169        demand.retain(|column| columns.contains_key(column));
170        // Project `before` output columns using current locations of demanded columns.
171        before = before.project(demand.iter().map(|column| columns[column]));
172        // Update `columns` to reflect location of retained columns.
173        columns.clear();
174        for (index, column) in demand.iter().enumerate() {
175            columns.insert(*column, index);
176        }
177
178        // If `mfp` is a permutation of the columns present in `columns`, then we can
179        // apply that permutation to `before` and `columns`, so that `mfp` becomes the
180        // identity operation.
181        if mfp.expressions.is_empty()
182            && mfp.predicates.is_empty()
183            && mfp.projection.len() == columns.len()
184            && mfp.projection.iter().all(|col| columns.contains_key(col))
185            && columns.keys().all(|col| mfp.projection.contains(col))
186        {
187            // The projection we want to apply to `before`  comes to us from `mfp` in the
188            // extended output column reckoning.
189            let projection = mfp
190                .projection
191                .iter()
192                .map(|col| columns[col])
193                .collect::<Vec<_>>();
194            before = before.project(projection);
195            // Update the physical locations of each output column.
196            columns.clear();
197            for (index, column) in mfp.projection.iter().enumerate() {
198                columns.insert(*column, index);
199            }
200        }
201
202        before.permute_fn(|c| permutation[&c], thinned_arity_with_key);
203
204        // `before` should not be modified after this point.
205        before.optimize();
206
207        // Cons up an instance of the closure with the closed-over state.
208        Self {
209            ready_equivalences,
210            before: before.into_plan().unwrap().into_nontemporal().unwrap(),
211        }
212    }
213
214    /// True iff the closure neither filters nor transforms records.
215    pub fn is_identity(&self) -> bool {
216        self.ready_equivalences.is_empty() && self.before.is_identity()
217    }
218
219    /// True iff the closure does more than projections.
220    pub fn maps_or_filters(&self) -> bool {
221        !self.before.expressions.is_empty()
222            || !self.before.predicates.is_empty()
223            || !self.ready_equivalences.is_empty()
224    }
225
226    /// Returns true if evaluation could introduce an error on non-error inputs.
227    pub fn could_error(&self) -> bool {
228        self.before.could_error()
229            || self
230                .ready_equivalences
231                .iter()
232                .any(|es| es.iter().any(|e| e.could_error()))
233    }
234}
235
236/// Maintained state as we construct join dataflows.
237///
238/// This state primarily tracks the *remaining* work that has not yet been applied to a
239/// stream of partial results.
240///
241/// This state is meant to reconcile the logical operations that remain to apply (e.g.
242/// filtering, expressions, projection) and the physical organization of the current stream
243/// of data, which columns may be partially assembled in non-standard locations and which
244/// may already have been partially subjected to logic we need to apply.
245#[derive(Debug)]
246pub struct JoinBuildState {
247    /// Map from expected locations in extended output column reckoning to physical locations.
248    column_map: BTreeMap<usize, usize>,
249    /// A list of equivalence classes of expressions.
250    ///
251    /// Within each equivalence class, expressions must evaluate to the same result to pass
252    /// the join expression. Importantly, "the same" should be evaluated with `Datum`s Rust
253    /// equality, rather than the equality presented by the `BinaryFunc` equality operator.
254    /// The distinction is important for null handling, at the least.
255    equivalences: Vec<Vec<MirScalarExpr>>,
256    /// The linear operator logic (maps, filters, and projection) that remains to be applied
257    /// to the output of the join.
258    ///
259    /// When we advance through the construction of the join dataflow, we may be able to peel
260    /// off some of this work, ideally reducing `mfp` to something nearly the identity.
261    mfp: MapFilterProject,
262}
263
264impl JoinBuildState {
265    /// Create a new join state and initial closure from initial values.
266    ///
267    /// The initial closure can be `None` which indicates that it is the identity operator.
268    fn new(
269        columns: std::ops::Range<usize>,
270        equivalences: &[Vec<MirScalarExpr>],
271        mfp: &MapFilterProject,
272    ) -> Self {
273        let mut column_map = BTreeMap::new();
274        for column in columns {
275            column_map.insert(column, column_map.len());
276        }
277        let mut equivalences = equivalences.to_vec();
278        mz_expr::canonicalize::canonicalize_equivalence_classes(&mut equivalences);
279        Self {
280            column_map,
281            equivalences,
282            mfp: mfp.clone(),
283        }
284    }
285
286    /// Present new columns and extract any newly available closure.
287    fn add_columns(
288        &mut self,
289        new_columns: std::ops::Range<usize>,
290        bound_expressions: &[MirScalarExpr],
291        thinned_arity_with_key: usize,
292        // The permutation to run on the join of the thinned collections
293        permutation: BTreeMap<usize, usize>,
294    ) -> JoinClosure {
295        // Remove each element of `bound_expressions` from `equivalences`, so that we
296        // avoid redundant predicate work. This removal also paves the way for
297        // more precise "demand" information going forward.
298        for equivalence in self.equivalences.iter_mut() {
299            equivalence.retain(|expr| !bound_expressions.contains(expr));
300        }
301        self.equivalences.retain(|e| e.len() > 1);
302
303        // Update our map of the sources of each column in the update stream.
304        for column in new_columns {
305            self.column_map.insert(column, self.column_map.len());
306        }
307
308        self.extract_closure(permutation, thinned_arity_with_key)
309    }
310
311    /// Extract a final `MapFilterProject` once all columns are available.
312    ///
313    /// If not all columns are available this method will likely panic.
314    /// This method differs from `extract_closure` in that it forcibly
315    /// completes the join, extracting projections and expressions that
316    /// may not be extracted with `extract_closure` (for example, literals,
317    /// permutations, and repetition of output columns).
318    ///
319    /// The resulting closure may be the identity operator, which can be
320    /// checked with the `is_identity()` method.
321    fn complete(self) -> JoinClosure {
322        let Self {
323            column_map,
324            mut equivalences,
325            mut mfp,
326        } = self;
327
328        for equivalence in equivalences.iter_mut() {
329            for expr in equivalence.iter_mut() {
330                expr.permute_map(&column_map);
331            }
332        }
333        let column_map_len = column_map.len();
334        mfp.permute_fn(|c| column_map[&c], column_map_len);
335        mfp.optimize();
336
337        JoinClosure {
338            ready_equivalences: equivalences,
339            before: mfp.into_plan().unwrap().into_nontemporal().unwrap(),
340        }
341    }
342
343    /// A method on `self` that extracts an available closure.
344    ///
345    /// The extracted closure is not guaranteed to be non-trivial. Sensitive users should
346    /// consider using the `.is_identity()` method to determine non-triviality.
347    fn extract_closure(
348        &mut self,
349        permutation: BTreeMap<usize, usize>,
350        thinned_arity_with_key: usize,
351    ) -> JoinClosure {
352        JoinClosure::build(
353            &mut self.column_map,
354            &mut self.equivalences,
355            &mut self.mfp,
356            permutation,
357            thinned_arity_with_key,
358        )
359    }
360}