mz_compute_types/plan/
join.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Planning of `Plan::Join` operators, and supporting types.
11//!
12//! Join planning proceeds by repeatedly introducing collections that
13//! extend the set of available output columns. The expected location
14//! of each output column is determined by the order of inputs to the
15//! join operator: columns are appended in that order.
16//!
17//! While planning the join, we also have access to logic in the form
18//! of expressions, predicates, and projections that we intended to
19//! apply to the output of the join. This logic uses "output column
20//! reckoning" where columns are identified by their intended output
21//! position.
22//!
23//! As we consider applying expressions to partial results, we will
24//! place the results in column locations *after* the intended output
25//! column locations. These output locations in addition to the new
26//! distinct identifiers for constructed expressions is "extended
27//! output column reckoning", as is what we use when reasoning about
28//! work still available to be done on the partial join results.
29
30use std::collections::BTreeMap;
31
32use mz_expr::{MapFilterProject, MirScalarExpr};
33use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
34use mz_repr::{Datum, Row, RowArena};
35use proptest::prelude::*;
36use proptest_derive::Arbitrary;
37use serde::{Deserialize, Serialize};
38
39pub mod delta_join;
40pub mod linear_join;
41
42pub use delta_join::DeltaJoinPlan;
43pub use linear_join::LinearJoinPlan;
44
45include!(concat!(env!("OUT_DIR"), "/mz_compute_types.plan.join.rs"));
46
47/// A complete enumeration of possible join plans to render.
48#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
49pub enum JoinPlan {
50    /// A join implemented by a linear join.
51    Linear(LinearJoinPlan),
52    /// A join implemented by a delta join.
53    Delta(DeltaJoinPlan),
54}
55
56impl RustType<ProtoJoinPlan> for JoinPlan {
57    fn into_proto(&self) -> ProtoJoinPlan {
58        use proto_join_plan::Kind::*;
59        ProtoJoinPlan {
60            kind: Some(match self {
61                JoinPlan::Linear(inner) => Linear(inner.into_proto()),
62                JoinPlan::Delta(inner) => Delta(inner.into_proto()),
63            }),
64        }
65    }
66
67    fn from_proto(value: ProtoJoinPlan) -> Result<Self, TryFromProtoError> {
68        use proto_join_plan::Kind::*;
69        let kind = value
70            .kind
71            .ok_or_else(|| TryFromProtoError::missing_field("ProtoJoinPlan::kind"))?;
72        Ok(match kind {
73            Linear(inner) => JoinPlan::Linear(inner.into_rust()?),
74            Delta(inner) => JoinPlan::Delta(inner.into_rust()?),
75        })
76    }
77}
78
79/// A manual closure implementation of filtering and logic application.
80///
81/// This manual implementation exists to express lifetime constraints clearly,
82/// as there is a relationship between the borrowed lifetime of the closed-over
83/// state and the arguments it takes when invoked. It was not clear how to do
84/// this with a Rust closure (glorious battle was waged, but ultimately lost).
85#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
86pub struct JoinClosure {
87    /// TODO(database-issues#7533): Add documentation.
88    pub ready_equivalences: Vec<Vec<MirScalarExpr>>,
89    /// TODO(database-issues#7533): Add documentation.
90    pub before: mz_expr::SafeMfpPlan,
91}
92
93impl Arbitrary for JoinClosure {
94    type Parameters = ();
95    type Strategy = BoxedStrategy<Self>;
96
97    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
98        (
99            prop::collection::vec(prop::collection::vec(any::<MirScalarExpr>(), 0..3), 0..3),
100            any::<mz_expr::SafeMfpPlan>(),
101        )
102            .prop_map(|(ready_equivalences, before)| JoinClosure {
103                ready_equivalences,
104                before,
105            })
106            .boxed()
107    }
108}
109
110impl RustType<ProtoJoinClosure> for JoinClosure {
111    fn into_proto(&self) -> ProtoJoinClosure {
112        ProtoJoinClosure {
113            ready_equivalences: self.ready_equivalences.into_proto(),
114            before: Some(self.before.into_proto()),
115        }
116    }
117
118    fn from_proto(proto: ProtoJoinClosure) -> Result<Self, TryFromProtoError> {
119        Ok(Self {
120            ready_equivalences: proto.ready_equivalences.into_rust()?,
121            before: proto.before.into_rust_if_some("ProtoJoinClosure::before")?,
122        })
123    }
124}
125
126impl JoinClosure {
127    /// Applies per-row filtering and logic.
128    #[inline(always)]
129    pub fn apply<'a, 'row>(
130        &'a self,
131        datums: &mut Vec<Datum<'a>>,
132        temp_storage: &'a RowArena,
133        row: &'row mut Row,
134    ) -> Result<Option<&'row Row>, mz_expr::EvalError> {
135        for exprs in self.ready_equivalences.iter() {
136            // Each list of expressions should be equal to the same value.
137            let val = exprs[0].eval(&datums[..], temp_storage)?;
138            for expr in exprs[1..].iter() {
139                if expr.eval(datums, temp_storage)? != val {
140                    return Ok(None);
141                }
142            }
143        }
144        self.before.evaluate_into(datums, temp_storage, row)
145    }
146
147    /// Construct an instance of the closure from available columns.
148    ///
149    /// This method updates the available columns, equivalences, and
150    /// the `MapFilterProject` instance. The columns are updated to
151    /// include reference to any columns added by the application of
152    /// this logic, which might result from partial application of
153    /// the `MapFilterProject` instance.
154    ///
155    /// If all columns are available for `mfp`, this method works
156    /// extra hard to ensure that the closure contains all the work,
157    /// and `mfp` is left as an identity transform (which can then
158    /// be ignored).
159    fn build(
160        columns: &mut BTreeMap<usize, usize>,
161        equivalences: &mut Vec<Vec<MirScalarExpr>>,
162        mfp: &mut MapFilterProject,
163        permutation: BTreeMap<usize, usize>,
164        thinned_arity_with_key: usize,
165    ) -> Self {
166        // First, determine which columns should be compared due to `equivalences`.
167        let mut ready_equivalences = Vec::new();
168        for equivalence in equivalences.iter_mut() {
169            if let Some(pos) = equivalence
170                .iter()
171                .position(|e| e.support().into_iter().all(|c| columns.contains_key(&c)))
172            {
173                let mut should_equate = Vec::new();
174                let mut cursor = pos + 1;
175                while cursor < equivalence.len() {
176                    if equivalence[cursor]
177                        .support()
178                        .into_iter()
179                        .all(|c| columns.contains_key(&c))
180                    {
181                        // Remove expression and equate with the first bound expression.
182                        should_equate.push(equivalence.remove(cursor));
183                    } else {
184                        cursor += 1;
185                    }
186                }
187                if !should_equate.is_empty() {
188                    should_equate.push(equivalence[pos].clone());
189                    ready_equivalences.push(should_equate);
190                }
191            }
192        }
193        equivalences.retain(|e| e.len() > 1);
194        let permuted_columns = columns.iter().map(|(k, v)| (*k, permutation[v])).collect();
195        // Update ready_equivalences to reference correct column locations.
196        for exprs in ready_equivalences.iter_mut() {
197            for expr in exprs.iter_mut() {
198                expr.permute_map(&permuted_columns);
199            }
200        }
201
202        // Next, partition `mfp` into `before` and `after`, the former of which can be
203        // applied now.
204        let (mut before, after) = std::mem::replace(mfp, MapFilterProject::new(mfp.input_arity))
205            .partition(columns.clone(), columns.len());
206
207        // Add any newly created columns to `columns`. These columns may be referenced
208        // by `after`, and it will be important to track their locations.
209        let bonus_columns = before.projection.len() - before.input_arity;
210        for bonus_column in 0..bonus_columns {
211            columns.insert(mfp.input_arity + bonus_column, columns.len());
212        }
213
214        *mfp = after;
215
216        // Before constructing and returning the result, we can remove output columns of `before`
217        // that are not needed in further `equivalences` or by `after` (now `mfp`).
218        let mut demand = Vec::new();
219        demand.extend(mfp.demand());
220        for equivalence in equivalences.iter() {
221            for expr in equivalence.iter() {
222                demand.extend(expr.support());
223            }
224        }
225        demand.sort();
226        demand.dedup();
227        // We only want to remove columns that are presented as outputs (i.e. can be found as in
228        // `columns`). Other columns have yet to be introduced, and we shouldn't have any opinion
229        // about them yet.
230        demand.retain(|column| columns.contains_key(column));
231        // Project `before` output columns using current locations of demanded columns.
232        before = before.project(demand.iter().map(|column| columns[column]));
233        // Update `columns` to reflect location of retained columns.
234        columns.clear();
235        for (index, column) in demand.iter().enumerate() {
236            columns.insert(*column, index);
237        }
238
239        // If `mfp` is a permutation of the columns present in `columns`, then we can
240        // apply that permutation to `before` and `columns`, so that `mfp` becomes the
241        // identity operation.
242        if mfp.expressions.is_empty()
243            && mfp.predicates.is_empty()
244            && mfp.projection.len() == columns.len()
245            && mfp.projection.iter().all(|col| columns.contains_key(col))
246            && columns.keys().all(|col| mfp.projection.contains(col))
247        {
248            // The projection we want to apply to `before`  comes to us from `mfp` in the
249            // extended output column reckoning.
250            let projection = mfp
251                .projection
252                .iter()
253                .map(|col| columns[col])
254                .collect::<Vec<_>>();
255            before = before.project(projection);
256            // Update the physical locations of each output column.
257            columns.clear();
258            for (index, column) in mfp.projection.iter().enumerate() {
259                columns.insert(*column, index);
260            }
261        }
262
263        before.permute_fn(|c| permutation[&c], thinned_arity_with_key);
264
265        // `before` should not be modified after this point.
266        before.optimize();
267
268        // Cons up an instance of the closure with the closed-over state.
269        Self {
270            ready_equivalences,
271            before: before.into_plan().unwrap().into_nontemporal().unwrap(),
272        }
273    }
274
275    /// True iff the closure neither filters nor transforms records.
276    pub fn is_identity(&self) -> bool {
277        self.ready_equivalences.is_empty() && self.before.is_identity()
278    }
279
280    /// True iff the closure does more than projections.
281    pub fn maps_or_filters(&self) -> bool {
282        !self.before.expressions.is_empty()
283            || !self.before.predicates.is_empty()
284            || !self.ready_equivalences.is_empty()
285    }
286
287    /// Returns true if evaluation could introduce an error on non-error inputs.
288    pub fn could_error(&self) -> bool {
289        self.before.could_error()
290            || self
291                .ready_equivalences
292                .iter()
293                .any(|es| es.iter().any(|e| e.could_error()))
294    }
295}
296
297/// Maintained state as we construct join dataflows.
298///
299/// This state primarily tracks the *remaining* work that has not yet been applied to a
300/// stream of partial results.
301///
302/// This state is meant to reconcile the logical operations that remain to apply (e.g.
303/// filtering, expressions, projection) and the physical organization of the current stream
304/// of data, which columns may be partially assembled in non-standard locations and which
305/// may already have been partially subjected to logic we need to apply.
306#[derive(Debug)]
307pub struct JoinBuildState {
308    /// Map from expected locations in extended output column reckoning to physical locations.
309    column_map: BTreeMap<usize, usize>,
310    /// A list of equivalence classes of expressions.
311    ///
312    /// Within each equivalence class, expressions must evaluate to the same result to pass
313    /// the join expression. Importantly, "the same" should be evaluated with `Datum`s Rust
314    /// equality, rather than the equality presented by the `BinaryFunc` equality operator.
315    /// The distinction is important for null handling, at the least.
316    equivalences: Vec<Vec<MirScalarExpr>>,
317    /// The linear operator logic (maps, filters, and projection) that remains to be applied
318    /// to the output of the join.
319    ///
320    /// When we advance through the construction of the join dataflow, we may be able to peel
321    /// off some of this work, ideally reducing `mfp` to something nearly the identity.
322    mfp: MapFilterProject,
323}
324
325impl JoinBuildState {
326    /// Create a new join state and initial closure from initial values.
327    ///
328    /// The initial closure can be `None` which indicates that it is the identity operator.
329    fn new(
330        columns: std::ops::Range<usize>,
331        equivalences: &[Vec<MirScalarExpr>],
332        mfp: &MapFilterProject,
333    ) -> Self {
334        let mut column_map = BTreeMap::new();
335        for column in columns {
336            column_map.insert(column, column_map.len());
337        }
338        let mut equivalences = equivalences.to_vec();
339        mz_expr::canonicalize::canonicalize_equivalence_classes(&mut equivalences);
340        Self {
341            column_map,
342            equivalences,
343            mfp: mfp.clone(),
344        }
345    }
346
347    /// Present new columns and extract any newly available closure.
348    fn add_columns(
349        &mut self,
350        new_columns: std::ops::Range<usize>,
351        bound_expressions: &[MirScalarExpr],
352        thinned_arity_with_key: usize,
353        // The permutation to run on the join of the thinned collections
354        permutation: BTreeMap<usize, usize>,
355    ) -> JoinClosure {
356        // Remove each element of `bound_expressions` from `equivalences`, so that we
357        // avoid redundant predicate work. This removal also paves the way for
358        // more precise "demand" information going forward.
359        for equivalence in self.equivalences.iter_mut() {
360            equivalence.retain(|expr| !bound_expressions.contains(expr));
361        }
362        self.equivalences.retain(|e| e.len() > 1);
363
364        // Update our map of the sources of each column in the update stream.
365        for column in new_columns {
366            self.column_map.insert(column, self.column_map.len());
367        }
368
369        self.extract_closure(permutation, thinned_arity_with_key)
370    }
371
372    /// Extract a final `MapFilterProject` once all columns are available.
373    ///
374    /// If not all columns are available this method will likely panic.
375    /// This method differs from `extract_closure` in that it forcibly
376    /// completes the join, extracting projections and expressions that
377    /// may not be extracted with `extract_closure` (for example, literals,
378    /// permutations, and repetition of output columns).
379    ///
380    /// The resulting closure may be the identity operator, which can be
381    /// checked with the `is_identity()` method.
382    fn complete(self) -> JoinClosure {
383        let Self {
384            column_map,
385            mut equivalences,
386            mut mfp,
387        } = self;
388
389        for equivalence in equivalences.iter_mut() {
390            for expr in equivalence.iter_mut() {
391                expr.permute_map(&column_map);
392            }
393        }
394        let column_map_len = column_map.len();
395        mfp.permute_fn(|c| column_map[&c], column_map_len);
396        mfp.optimize();
397
398        JoinClosure {
399            ready_equivalences: equivalences,
400            before: mfp.into_plan().unwrap().into_nontemporal().unwrap(),
401        }
402    }
403
404    /// A method on `self` that extracts an available closure.
405    ///
406    /// The extracted closure is not guaranteed to be non-trivial. Sensitive users should
407    /// consider using the `.is_identity()` method to determine non-triviality.
408    fn extract_closure(
409        &mut self,
410        permutation: BTreeMap<usize, usize>,
411        thinned_arity_with_key: usize,
412    ) -> JoinClosure {
413        JoinClosure::build(
414            &mut self.column_map,
415            &mut self.equivalences,
416            &mut self.mfp,
417            permutation,
418            thinned_arity_with_key,
419        )
420    }
421}
422
423#[cfg(test)]
424mod tests {
425    use mz_ore::assert_ok;
426    use mz_proto::protobuf_roundtrip;
427
428    use super::*;
429
430    proptest! {
431        #![proptest_config(ProptestConfig::with_cases(32))]
432
433        #[mz_ore::test]
434        #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
435        fn join_plan_protobuf_roundtrip(expect in any::<JoinPlan>() ) {
436            let actual = protobuf_roundtrip::<_, ProtoJoinPlan>(&expect);
437            assert_ok!(actual);
438            assert_eq!(actual.unwrap(), expect);
439        }
440    }
441}