mz_compute_types/plan/join.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Planning of `Plan::Join` operators, and supporting types.
11//!
12//! Join planning proceeds by repeatedly introducing collections that
13//! extend the set of available output columns. The expected location
14//! of each output column is determined by the order of inputs to the
15//! join operator: columns are appended in that order.
16//!
17//! While planning the join, we also have access to logic in the form
18//! of expressions, predicates, and projections that we intended to
19//! apply to the output of the join. This logic uses "output column
20//! reckoning" where columns are identified by their intended output
21//! position.
22//!
23//! As we consider applying expressions to partial results, we will
24//! place the results in column locations *after* the intended output
25//! column locations. These output locations in addition to the new
26//! distinct identifiers for constructed expressions is "extended
27//! output column reckoning", as is what we use when reasoning about
28//! work still available to be done on the partial join results.
29
30use std::collections::BTreeMap;
31
32use mz_expr::{MapFilterProject, MirScalarExpr};
33use mz_repr::{Datum, Row, RowArena};
34use serde::{Deserialize, Serialize};
35
36pub mod delta_join;
37pub mod linear_join;
38
39pub use delta_join::DeltaJoinPlan;
40pub use linear_join::LinearJoinPlan;
41
42/// A complete enumeration of possible join plans to render.
43#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
44pub enum JoinPlan {
45 /// A join implemented by a linear join.
46 Linear(LinearJoinPlan),
47 /// A join implemented by a delta join.
48 Delta(DeltaJoinPlan),
49}
50
51/// A manual closure implementation of filtering and logic application.
52///
53/// This manual implementation exists to express lifetime constraints clearly,
54/// as there is a relationship between the borrowed lifetime of the closed-over
55/// state and the arguments it takes when invoked. It was not clear how to do
56/// this with a Rust closure (glorious battle was waged, but ultimately lost).
57#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
58pub struct JoinClosure {
59 /// TODO(database-issues#7533): Add documentation.
60 pub ready_equivalences: Vec<Vec<MirScalarExpr>>,
61 /// TODO(database-issues#7533): Add documentation.
62 pub before: mz_expr::SafeMfpPlan,
63}
64
65impl JoinClosure {
66 /// Applies per-row filtering and logic.
67 #[inline(always)]
68 pub fn apply<'a, 'row>(
69 &'a self,
70 datums: &mut Vec<Datum<'a>>,
71 temp_storage: &'a RowArena,
72 row: &'row mut Row,
73 ) -> Result<Option<&'row Row>, mz_expr::EvalError> {
74 for exprs in self.ready_equivalences.iter() {
75 // Each list of expressions should be equal to the same value.
76 let val = exprs[0].eval(&datums[..], temp_storage)?;
77 for expr in exprs[1..].iter() {
78 if expr.eval(datums, temp_storage)? != val {
79 return Ok(None);
80 }
81 }
82 }
83 self.before.evaluate_into(datums, temp_storage, row)
84 }
85
86 /// Construct an instance of the closure from available columns.
87 ///
88 /// This method updates the available columns, equivalences, and
89 /// the `MapFilterProject` instance. The columns are updated to
90 /// include reference to any columns added by the application of
91 /// this logic, which might result from partial application of
92 /// the `MapFilterProject` instance.
93 ///
94 /// If all columns are available for `mfp`, this method works
95 /// extra hard to ensure that the closure contains all the work,
96 /// and `mfp` is left as an identity transform (which can then
97 /// be ignored).
98 fn build(
99 columns: &mut BTreeMap<usize, usize>,
100 equivalences: &mut Vec<Vec<MirScalarExpr>>,
101 mfp: &mut MapFilterProject,
102 permutation: BTreeMap<usize, usize>,
103 thinned_arity_with_key: usize,
104 ) -> Self {
105 // First, determine which columns should be compared due to `equivalences`.
106 let mut ready_equivalences = Vec::new();
107 for equivalence in equivalences.iter_mut() {
108 if let Some(pos) = equivalence
109 .iter()
110 .position(|e| e.support().into_iter().all(|c| columns.contains_key(&c)))
111 {
112 let mut should_equate = Vec::new();
113 let mut cursor = pos + 1;
114 while cursor < equivalence.len() {
115 if equivalence[cursor]
116 .support()
117 .into_iter()
118 .all(|c| columns.contains_key(&c))
119 {
120 // Remove expression and equate with the first bound expression.
121 should_equate.push(equivalence.remove(cursor));
122 } else {
123 cursor += 1;
124 }
125 }
126 if !should_equate.is_empty() {
127 should_equate.push(equivalence[pos].clone());
128 ready_equivalences.push(should_equate);
129 }
130 }
131 }
132 equivalences.retain(|e| e.len() > 1);
133 let permuted_columns = columns.iter().map(|(k, v)| (*k, permutation[v])).collect();
134 // Update ready_equivalences to reference correct column locations.
135 for exprs in ready_equivalences.iter_mut() {
136 for expr in exprs.iter_mut() {
137 expr.permute_map(&permuted_columns);
138 }
139 }
140
141 // Next, partition `mfp` into `before` and `after`, the former of which can be
142 // applied now.
143 let (mut before, after) = std::mem::replace(mfp, MapFilterProject::new(mfp.input_arity))
144 .partition(columns.clone(), columns.len());
145
146 // Add any newly created columns to `columns`. These columns may be referenced
147 // by `after`, and it will be important to track their locations.
148 let bonus_columns = before.projection.len() - before.input_arity;
149 for bonus_column in 0..bonus_columns {
150 columns.insert(mfp.input_arity + bonus_column, columns.len());
151 }
152
153 *mfp = after;
154
155 // Before constructing and returning the result, we can remove output columns of `before`
156 // that are not needed in further `equivalences` or by `after` (now `mfp`).
157 let mut demand = Vec::new();
158 demand.extend(mfp.demand());
159 for equivalence in equivalences.iter() {
160 for expr in equivalence.iter() {
161 demand.extend(expr.support());
162 }
163 }
164 demand.sort();
165 demand.dedup();
166 // We only want to remove columns that are presented as outputs (i.e. can be found as in
167 // `columns`). Other columns have yet to be introduced, and we shouldn't have any opinion
168 // about them yet.
169 demand.retain(|column| columns.contains_key(column));
170 // Project `before` output columns using current locations of demanded columns.
171 before = before.project(demand.iter().map(|column| columns[column]));
172 // Update `columns` to reflect location of retained columns.
173 columns.clear();
174 for (index, column) in demand.iter().enumerate() {
175 columns.insert(*column, index);
176 }
177
178 // If `mfp` is a permutation of the columns present in `columns`, then we can
179 // apply that permutation to `before` and `columns`, so that `mfp` becomes the
180 // identity operation.
181 if mfp.expressions.is_empty()
182 && mfp.predicates.is_empty()
183 && mfp.projection.len() == columns.len()
184 && mfp.projection.iter().all(|col| columns.contains_key(col))
185 && columns.keys().all(|col| mfp.projection.contains(col))
186 {
187 // The projection we want to apply to `before` comes to us from `mfp` in the
188 // extended output column reckoning.
189 let projection = mfp
190 .projection
191 .iter()
192 .map(|col| columns[col])
193 .collect::<Vec<_>>();
194 before = before.project(projection);
195 // Update the physical locations of each output column.
196 columns.clear();
197 for (index, column) in mfp.projection.iter().enumerate() {
198 columns.insert(*column, index);
199 }
200 }
201
202 before.permute_fn(|c| permutation[&c], thinned_arity_with_key);
203
204 // `before` should not be modified after this point.
205 before.optimize();
206
207 // Cons up an instance of the closure with the closed-over state.
208 Self {
209 ready_equivalences,
210 before: before.into_plan().unwrap().into_nontemporal().unwrap(),
211 }
212 }
213
214 /// True iff the closure neither filters nor transforms records.
215 pub fn is_identity(&self) -> bool {
216 self.ready_equivalences.is_empty() && self.before.is_identity()
217 }
218
219 /// True iff the closure does more than projections.
220 pub fn maps_or_filters(&self) -> bool {
221 !self.before.expressions.is_empty()
222 || !self.before.predicates.is_empty()
223 || !self.ready_equivalences.is_empty()
224 }
225
226 /// Returns true if evaluation could introduce an error on non-error inputs.
227 pub fn could_error(&self) -> bool {
228 self.before.could_error()
229 || self
230 .ready_equivalences
231 .iter()
232 .any(|es| es.iter().any(|e| e.could_error()))
233 }
234}
235
236/// Maintained state as we construct join dataflows.
237///
238/// This state primarily tracks the *remaining* work that has not yet been applied to a
239/// stream of partial results.
240///
241/// This state is meant to reconcile the logical operations that remain to apply (e.g.
242/// filtering, expressions, projection) and the physical organization of the current stream
243/// of data, which columns may be partially assembled in non-standard locations and which
244/// may already have been partially subjected to logic we need to apply.
245#[derive(Debug)]
246pub struct JoinBuildState {
247 /// Map from expected locations in extended output column reckoning to physical locations.
248 column_map: BTreeMap<usize, usize>,
249 /// A list of equivalence classes of expressions.
250 ///
251 /// Within each equivalence class, expressions must evaluate to the same result to pass
252 /// the join expression. Importantly, "the same" should be evaluated with `Datum`s Rust
253 /// equality, rather than the equality presented by the `BinaryFunc` equality operator.
254 /// The distinction is important for null handling, at the least.
255 equivalences: Vec<Vec<MirScalarExpr>>,
256 /// The linear operator logic (maps, filters, and projection) that remains to be applied
257 /// to the output of the join.
258 ///
259 /// When we advance through the construction of the join dataflow, we may be able to peel
260 /// off some of this work, ideally reducing `mfp` to something nearly the identity.
261 mfp: MapFilterProject,
262}
263
264impl JoinBuildState {
265 /// Create a new join state and initial closure from initial values.
266 ///
267 /// The initial closure can be `None` which indicates that it is the identity operator.
268 fn new(
269 columns: std::ops::Range<usize>,
270 equivalences: &[Vec<MirScalarExpr>],
271 mfp: &MapFilterProject,
272 ) -> Self {
273 let mut column_map = BTreeMap::new();
274 for column in columns {
275 column_map.insert(column, column_map.len());
276 }
277 let mut equivalences = equivalences.to_vec();
278 mz_expr::canonicalize::canonicalize_equivalence_classes(&mut equivalences);
279 Self {
280 column_map,
281 equivalences,
282 mfp: mfp.clone(),
283 }
284 }
285
286 /// Present new columns and extract any newly available closure.
287 fn add_columns(
288 &mut self,
289 new_columns: std::ops::Range<usize>,
290 bound_expressions: &[MirScalarExpr],
291 thinned_arity_with_key: usize,
292 // The permutation to run on the join of the thinned collections
293 permutation: BTreeMap<usize, usize>,
294 ) -> JoinClosure {
295 // Remove each element of `bound_expressions` from `equivalences`, so that we
296 // avoid redundant predicate work. This removal also paves the way for
297 // more precise "demand" information going forward.
298 for equivalence in self.equivalences.iter_mut() {
299 equivalence.retain(|expr| !bound_expressions.contains(expr));
300 }
301 self.equivalences.retain(|e| e.len() > 1);
302
303 // Update our map of the sources of each column in the update stream.
304 for column in new_columns {
305 self.column_map.insert(column, self.column_map.len());
306 }
307
308 self.extract_closure(permutation, thinned_arity_with_key)
309 }
310
311 /// Extract a final `MapFilterProject` once all columns are available.
312 ///
313 /// If not all columns are available this method will likely panic.
314 /// This method differs from `extract_closure` in that it forcibly
315 /// completes the join, extracting projections and expressions that
316 /// may not be extracted with `extract_closure` (for example, literals,
317 /// permutations, and repetition of output columns).
318 ///
319 /// The resulting closure may be the identity operator, which can be
320 /// checked with the `is_identity()` method.
321 fn complete(self) -> JoinClosure {
322 let Self {
323 column_map,
324 mut equivalences,
325 mut mfp,
326 } = self;
327
328 for equivalence in equivalences.iter_mut() {
329 for expr in equivalence.iter_mut() {
330 expr.permute_map(&column_map);
331 }
332 }
333 let column_map_len = column_map.len();
334 mfp.permute_fn(|c| column_map[&c], column_map_len);
335 mfp.optimize();
336
337 JoinClosure {
338 ready_equivalences: equivalences,
339 before: mfp.into_plan().unwrap().into_nontemporal().unwrap(),
340 }
341 }
342
343 /// A method on `self` that extracts an available closure.
344 ///
345 /// The extracted closure is not guaranteed to be non-trivial. Sensitive users should
346 /// consider using the `.is_identity()` method to determine non-triviality.
347 fn extract_closure(
348 &mut self,
349 permutation: BTreeMap<usize, usize>,
350 thinned_arity_with_key: usize,
351 ) -> JoinClosure {
352 JoinClosure::build(
353 &mut self.column_map,
354 &mut self.equivalences,
355 &mut self.mfp,
356 permutation,
357 thinned_arity_with_key,
358 )
359 }
360}