mz_compute_types/plan/
join.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Planning of `Plan::Join` operators, and supporting types.
11//!
12//! Join planning proceeds by repeatedly introducing collections that
13//! extend the set of available output columns. The expected location
14//! of each output column is determined by the order of inputs to the
15//! join operator: columns are appended in that order.
16//!
17//! While planning the join, we also have access to logic in the form
18//! of expressions, predicates, and projections that we intended to
19//! apply to the output of the join. This logic uses "output column
20//! reckoning" where columns are identified by their intended output
21//! position.
22//!
23//! As we consider applying expressions to partial results, we will
24//! place the results in column locations *after* the intended output
25//! column locations. These output locations in addition to the new
26//! distinct identifiers for constructed expressions is "extended
27//! output column reckoning", as is what we use when reasoning about
28//! work still available to be done on the partial join results.
29
30use std::collections::BTreeMap;
31
32use mz_expr::{MapFilterProject, MirScalarExpr};
33use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
34use mz_repr::{Datum, Row, RowArena};
35use proptest::prelude::*;
36use proptest_derive::Arbitrary;
37use serde::{Deserialize, Serialize};
38
39pub mod delta_join;
40pub mod linear_join;
41
42pub use delta_join::DeltaJoinPlan;
43pub use linear_join::LinearJoinPlan;
44
45include!(concat!(env!("OUT_DIR"), "/mz_compute_types.plan.join.rs"));
46
47/// A complete enumeration of possible join plans to render.
48#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
49pub enum JoinPlan {
50    /// A join implemented by a linear join.
51    Linear(LinearJoinPlan),
52    /// A join implemented by a delta join.
53    Delta(DeltaJoinPlan),
54}
55
56impl RustType<ProtoJoinPlan> for JoinPlan {
57    fn into_proto(&self) -> ProtoJoinPlan {
58        use proto_join_plan::Kind::*;
59        ProtoJoinPlan {
60            kind: Some(match self {
61                JoinPlan::Linear(inner) => Linear(inner.into_proto()),
62                JoinPlan::Delta(inner) => Delta(inner.into_proto()),
63            }),
64        }
65    }
66
67    fn from_proto(value: ProtoJoinPlan) -> Result<Self, TryFromProtoError> {
68        use proto_join_plan::Kind::*;
69        let kind = value
70            .kind
71            .ok_or_else(|| TryFromProtoError::missing_field("ProtoJoinPlan::kind"))?;
72        Ok(match kind {
73            Linear(inner) => JoinPlan::Linear(inner.into_rust()?),
74            Delta(inner) => JoinPlan::Delta(inner.into_rust()?),
75        })
76    }
77}
78
79/// A manual closure implementation of filtering and logic application.
80///
81/// This manual implementation exists to express lifetime constraints clearly,
82/// as there is a relationship between the borrowed lifetime of the closed-over
83/// state and the arguments it takes when invoked. It was not clear how to do
84/// this with a Rust closure (glorious battle was waged, but ultimately lost).
85#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
86pub struct JoinClosure {
87    /// TODO(database-issues#7533): Add documentation.
88    pub ready_equivalences: Vec<Vec<MirScalarExpr>>,
89    /// TODO(database-issues#7533): Add documentation.
90    pub before: mz_expr::SafeMfpPlan,
91}
92
93impl Arbitrary for JoinClosure {
94    type Parameters = ();
95    type Strategy = BoxedStrategy<Self>;
96
97    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
98        (
99            prop::collection::vec(prop::collection::vec(any::<MirScalarExpr>(), 0..3), 0..3),
100            any::<mz_expr::SafeMfpPlan>(),
101        )
102            .prop_map(|(ready_equivalences, before)| JoinClosure {
103                ready_equivalences,
104                before,
105            })
106            .boxed()
107    }
108}
109
110impl RustType<ProtoJoinClosure> for JoinClosure {
111    fn into_proto(&self) -> ProtoJoinClosure {
112        ProtoJoinClosure {
113            ready_equivalences: self.ready_equivalences.into_proto(),
114            before: Some(self.before.into_proto()),
115        }
116    }
117
118    fn from_proto(proto: ProtoJoinClosure) -> Result<Self, TryFromProtoError> {
119        Ok(Self {
120            ready_equivalences: proto.ready_equivalences.into_rust()?,
121            before: proto.before.into_rust_if_some("ProtoJoinClosure::before")?,
122        })
123    }
124}
125
126impl JoinClosure {
127    /// Applies per-row filtering and logic.
128    #[inline(always)]
129    pub fn apply<'a, 'row>(
130        &'a self,
131        datums: &mut Vec<Datum<'a>>,
132        temp_storage: &'a RowArena,
133        row: &'row mut Row,
134    ) -> Result<Option<&'row Row>, mz_expr::EvalError> {
135        for exprs in self.ready_equivalences.iter() {
136            // Each list of expressions should be equal to the same value.
137            let val = exprs[0].eval(&datums[..], temp_storage)?;
138            for expr in exprs[1..].iter() {
139                if expr.eval(datums, temp_storage)? != val {
140                    return Ok(None);
141                }
142            }
143        }
144        self.before.evaluate_into(datums, temp_storage, row)
145    }
146
147    /// Construct an instance of the closure from available columns.
148    ///
149    /// This method updates the available columns, equivalences, and
150    /// the `MapFilterProject` instance. The columns are updated to
151    /// include reference to any columns added by the application of
152    /// this logic, which might result from partial application of
153    /// the `MapFilterProject` instance.
154    ///
155    /// If all columns are available for `mfp`, this method works
156    /// extra hard to ensure that the closure contains all the work,
157    /// and `mfp` is left as an identity transform (which can then
158    /// be ignored).
159    fn build(
160        columns: &mut BTreeMap<usize, usize>,
161        equivalences: &mut Vec<Vec<MirScalarExpr>>,
162        mfp: &mut MapFilterProject,
163        permutation: BTreeMap<usize, usize>,
164        thinned_arity_with_key: usize,
165    ) -> Self {
166        // First, determine which columns should be compared due to `equivalences`.
167        let mut ready_equivalences = Vec::new();
168        for equivalence in equivalences.iter_mut() {
169            if let Some(pos) = equivalence
170                .iter()
171                .position(|e| e.support().into_iter().all(|c| columns.contains_key(&c)))
172            {
173                let mut should_equate = Vec::new();
174                let mut cursor = pos + 1;
175                while cursor < equivalence.len() {
176                    if equivalence[cursor]
177                        .support()
178                        .into_iter()
179                        .all(|c| columns.contains_key(&c))
180                    {
181                        // Remove expression and equate with the first bound expression.
182                        should_equate.push(equivalence.remove(cursor));
183                    } else {
184                        cursor += 1;
185                    }
186                }
187                if !should_equate.is_empty() {
188                    should_equate.push(equivalence[pos].clone());
189                    ready_equivalences.push(should_equate);
190                }
191            }
192        }
193        equivalences.retain(|e| e.len() > 1);
194        let permuted_columns = columns.iter().map(|(k, v)| (*k, permutation[v])).collect();
195        // Update ready_equivalences to reference correct column locations.
196        for exprs in ready_equivalences.iter_mut() {
197            for expr in exprs.iter_mut() {
198                expr.permute_map(&permuted_columns);
199            }
200        }
201
202        // Next, partition `mfp` into `before` and `after`, the former of which can be
203        // applied now.
204        let (mut before, after) = std::mem::replace(mfp, MapFilterProject::new(mfp.input_arity))
205            .partition(columns.clone(), columns.len());
206
207        // Add any newly created columns to `columns`. These columns may be referenced
208        // by `after`, and it will be important to track their locations.
209        let bonus_columns = before.projection.len() - before.input_arity;
210        for bonus_column in 0..bonus_columns {
211            columns.insert(mfp.input_arity + bonus_column, columns.len());
212        }
213
214        *mfp = after;
215
216        // Before constructing and returning the result, we can remove output columns of `before`
217        // that are not needed in further `equivalences` or by `after` (now `mfp`).
218        let mut demand = Vec::new();
219        demand.extend(mfp.demand());
220        for equivalence in equivalences.iter() {
221            for expr in equivalence.iter() {
222                demand.extend(expr.support());
223            }
224        }
225        demand.sort();
226        demand.dedup();
227        // We only want to remove columns that are presented as outputs (i.e. can be found as in
228        // `columns`). Other columns have yet to be introduced, and we shouldn't have any opinion
229        // about them yet.
230        demand.retain(|column| columns.contains_key(column));
231        // Project `before` output columns using current locations of demanded columns.
232        before = before.project(demand.iter().map(|column| columns[column]));
233        // Update `columns` to reflect location of retained columns.
234        columns.clear();
235        for (index, column) in demand.iter().enumerate() {
236            columns.insert(*column, index);
237        }
238
239        // If `mfp` is a permutation of the columns present in `columns`, then we can
240        // apply that permutation to `before` and `columns`, so that `mfp` becomes the
241        // identity operation.
242        if mfp.expressions.is_empty()
243            && mfp.predicates.is_empty()
244            && mfp.projection.len() == columns.len()
245            && mfp.projection.iter().all(|col| columns.contains_key(col))
246            && columns.keys().all(|col| mfp.projection.contains(col))
247        {
248            // The projection we want to apply to `before`  comes to us from `mfp` in the
249            // extended output column reckoning.
250            let projection = mfp
251                .projection
252                .iter()
253                .map(|col| columns[col])
254                .collect::<Vec<_>>();
255            before = before.project(projection);
256            // Update the physical locations of each output column.
257            columns.clear();
258            for (index, column) in mfp.projection.iter().enumerate() {
259                columns.insert(*column, index);
260            }
261        }
262
263        before.permute_fn(|c| permutation[&c], thinned_arity_with_key);
264
265        // `before` should not be modified after this point.
266        before.optimize();
267
268        // Cons up an instance of the closure with the closed-over state.
269        Self {
270            ready_equivalences,
271            before: before.into_plan().unwrap().into_nontemporal().unwrap(),
272        }
273    }
274
275    /// True iff the closure neither filters nor transforms records.
276    pub fn is_identity(&self) -> bool {
277        self.ready_equivalences.is_empty() && self.before.is_identity()
278    }
279
280    /// Returns true if evaluation could introduce an error on non-error inputs.
281    pub fn could_error(&self) -> bool {
282        self.before.could_error()
283            || self
284                .ready_equivalences
285                .iter()
286                .any(|es| es.iter().any(|e| e.could_error()))
287    }
288}
289
290/// Maintained state as we construct join dataflows.
291///
292/// This state primarily tracks the *remaining* work that has not yet been applied to a
293/// stream of partial results.
294///
295/// This state is meant to reconcile the logical operations that remain to apply (e.g.
296/// filtering, expressions, projection) and the physical organization of the current stream
297/// of data, which columns may be partially assembled in non-standard locations and which
298/// may already have been partially subjected to logic we need to apply.
299#[derive(Debug)]
300pub struct JoinBuildState {
301    /// Map from expected locations in extended output column reckoning to physical locations.
302    column_map: BTreeMap<usize, usize>,
303    /// A list of equivalence classes of expressions.
304    ///
305    /// Within each equivalence class, expressions must evaluate to the same result to pass
306    /// the join expression. Importantly, "the same" should be evaluated with `Datum`s Rust
307    /// equality, rather than the equality presented by the `BinaryFunc` equality operator.
308    /// The distinction is important for null handling, at the least.
309    equivalences: Vec<Vec<MirScalarExpr>>,
310    /// The linear operator logic (maps, filters, and projection) that remains to be applied
311    /// to the output of the join.
312    ///
313    /// When we advance through the construction of the join dataflow, we may be able to peel
314    /// off some of this work, ideally reducing `mfp` to something nearly the identity.
315    mfp: MapFilterProject,
316}
317
318impl JoinBuildState {
319    /// Create a new join state and initial closure from initial values.
320    ///
321    /// The initial closure can be `None` which indicates that it is the identity operator.
322    fn new(
323        columns: std::ops::Range<usize>,
324        equivalences: &[Vec<MirScalarExpr>],
325        mfp: &MapFilterProject,
326    ) -> Self {
327        let mut column_map = BTreeMap::new();
328        for column in columns {
329            column_map.insert(column, column_map.len());
330        }
331        let mut equivalences = equivalences.to_vec();
332        mz_expr::canonicalize::canonicalize_equivalence_classes(&mut equivalences);
333        Self {
334            column_map,
335            equivalences,
336            mfp: mfp.clone(),
337        }
338    }
339
340    /// Present new columns and extract any newly available closure.
341    fn add_columns(
342        &mut self,
343        new_columns: std::ops::Range<usize>,
344        bound_expressions: &[MirScalarExpr],
345        thinned_arity_with_key: usize,
346        // The permutation to run on the join of the thinned collections
347        permutation: BTreeMap<usize, usize>,
348    ) -> JoinClosure {
349        // Remove each element of `bound_expressions` from `equivalences`, so that we
350        // avoid redundant predicate work. This removal also paves the way for
351        // more precise "demand" information going forward.
352        for equivalence in self.equivalences.iter_mut() {
353            equivalence.retain(|expr| !bound_expressions.contains(expr));
354        }
355        self.equivalences.retain(|e| e.len() > 1);
356
357        // Update our map of the sources of each column in the update stream.
358        for column in new_columns {
359            self.column_map.insert(column, self.column_map.len());
360        }
361
362        self.extract_closure(permutation, thinned_arity_with_key)
363    }
364
365    /// Extract a final `MapFilterProject` once all columns are available.
366    ///
367    /// If not all columns are available this method will likely panic.
368    /// This method differs from `extract_closure` in that it forcibly
369    /// completes the join, extracting projections and expressions that
370    /// may not be extracted with `extract_closure` (for example, literals,
371    /// permutations, and repetition of output columns).
372    ///
373    /// The resulting closure may be the identity operator, which can be
374    /// checked with the `is_identity()` method.
375    fn complete(self) -> JoinClosure {
376        let Self {
377            column_map,
378            mut equivalences,
379            mut mfp,
380        } = self;
381
382        for equivalence in equivalences.iter_mut() {
383            for expr in equivalence.iter_mut() {
384                expr.permute_map(&column_map);
385            }
386        }
387        let column_map_len = column_map.len();
388        mfp.permute_fn(|c| column_map[&c], column_map_len);
389        mfp.optimize();
390
391        JoinClosure {
392            ready_equivalences: equivalences,
393            before: mfp.into_plan().unwrap().into_nontemporal().unwrap(),
394        }
395    }
396
397    /// A method on `self` that extracts an available closure.
398    ///
399    /// The extracted closure is not guaranteed to be non-trivial. Sensitive users should
400    /// consider using the `.is_identity()` method to determine non-triviality.
401    fn extract_closure(
402        &mut self,
403        permutation: BTreeMap<usize, usize>,
404        thinned_arity_with_key: usize,
405    ) -> JoinClosure {
406        JoinClosure::build(
407            &mut self.column_map,
408            &mut self.equivalences,
409            &mut self.mfp,
410            permutation,
411            thinned_arity_with_key,
412        )
413    }
414}
415
416#[cfg(test)]
417mod tests {
418    use mz_ore::assert_ok;
419    use mz_proto::protobuf_roundtrip;
420
421    use super::*;
422
423    proptest! {
424        #![proptest_config(ProptestConfig::with_cases(32))]
425
426        #[mz_ore::test]
427        #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
428        fn join_plan_protobuf_roundtrip(expect in any::<JoinPlan>() ) {
429            let actual = protobuf_roundtrip::<_, ProtoJoinPlan>(&expect);
430            assert_ok!(actual);
431            assert_eq!(actual.unwrap(), expect);
432        }
433    }
434}