mz_compute_types/plan/join.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Planning of `Plan::Join` operators, and supporting types.
11//!
12//! Join planning proceeds by repeatedly introducing collections that
13//! extend the set of available output columns. The expected location
14//! of each output column is determined by the order of inputs to the
15//! join operator: columns are appended in that order.
16//!
17//! While planning the join, we also have access to logic in the form
18//! of expressions, predicates, and projections that we intended to
19//! apply to the output of the join. This logic uses "output column
20//! reckoning" where columns are identified by their intended output
21//! position.
22//!
23//! As we consider applying expressions to partial results, we will
24//! place the results in column locations *after* the intended output
25//! column locations. These output locations in addition to the new
26//! distinct identifiers for constructed expressions is "extended
27//! output column reckoning", as is what we use when reasoning about
28//! work still available to be done on the partial join results.
29
30use std::collections::BTreeMap;
31
32use mz_expr::{MapFilterProject, MirScalarExpr};
33use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
34use mz_repr::{Datum, Row, RowArena};
35use proptest::prelude::*;
36use proptest_derive::Arbitrary;
37use serde::{Deserialize, Serialize};
38
39pub mod delta_join;
40pub mod linear_join;
41
42pub use delta_join::DeltaJoinPlan;
43pub use linear_join::LinearJoinPlan;
44
45include!(concat!(env!("OUT_DIR"), "/mz_compute_types.plan.join.rs"));
46
47/// A complete enumeration of possible join plans to render.
48#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
49pub enum JoinPlan {
50 /// A join implemented by a linear join.
51 Linear(LinearJoinPlan),
52 /// A join implemented by a delta join.
53 Delta(DeltaJoinPlan),
54}
55
56impl RustType<ProtoJoinPlan> for JoinPlan {
57 fn into_proto(&self) -> ProtoJoinPlan {
58 use proto_join_plan::Kind::*;
59 ProtoJoinPlan {
60 kind: Some(match self {
61 JoinPlan::Linear(inner) => Linear(inner.into_proto()),
62 JoinPlan::Delta(inner) => Delta(inner.into_proto()),
63 }),
64 }
65 }
66
67 fn from_proto(value: ProtoJoinPlan) -> Result<Self, TryFromProtoError> {
68 use proto_join_plan::Kind::*;
69 let kind = value
70 .kind
71 .ok_or_else(|| TryFromProtoError::missing_field("ProtoJoinPlan::kind"))?;
72 Ok(match kind {
73 Linear(inner) => JoinPlan::Linear(inner.into_rust()?),
74 Delta(inner) => JoinPlan::Delta(inner.into_rust()?),
75 })
76 }
77}
78
79/// A manual closure implementation of filtering and logic application.
80///
81/// This manual implementation exists to express lifetime constraints clearly,
82/// as there is a relationship between the borrowed lifetime of the closed-over
83/// state and the arguments it takes when invoked. It was not clear how to do
84/// this with a Rust closure (glorious battle was waged, but ultimately lost).
85#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
86pub struct JoinClosure {
87 /// TODO(database-issues#7533): Add documentation.
88 pub ready_equivalences: Vec<Vec<MirScalarExpr>>,
89 /// TODO(database-issues#7533): Add documentation.
90 pub before: mz_expr::SafeMfpPlan,
91}
92
93impl Arbitrary for JoinClosure {
94 type Parameters = ();
95 type Strategy = BoxedStrategy<Self>;
96
97 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
98 (
99 prop::collection::vec(prop::collection::vec(any::<MirScalarExpr>(), 0..3), 0..3),
100 any::<mz_expr::SafeMfpPlan>(),
101 )
102 .prop_map(|(ready_equivalences, before)| JoinClosure {
103 ready_equivalences,
104 before,
105 })
106 .boxed()
107 }
108}
109
110impl RustType<ProtoJoinClosure> for JoinClosure {
111 fn into_proto(&self) -> ProtoJoinClosure {
112 ProtoJoinClosure {
113 ready_equivalences: self.ready_equivalences.into_proto(),
114 before: Some(self.before.into_proto()),
115 }
116 }
117
118 fn from_proto(proto: ProtoJoinClosure) -> Result<Self, TryFromProtoError> {
119 Ok(Self {
120 ready_equivalences: proto.ready_equivalences.into_rust()?,
121 before: proto.before.into_rust_if_some("ProtoJoinClosure::before")?,
122 })
123 }
124}
125
126impl JoinClosure {
127 /// Applies per-row filtering and logic.
128 #[inline(always)]
129 pub fn apply<'a, 'row>(
130 &'a self,
131 datums: &mut Vec<Datum<'a>>,
132 temp_storage: &'a RowArena,
133 row: &'row mut Row,
134 ) -> Result<Option<&'row Row>, mz_expr::EvalError> {
135 for exprs in self.ready_equivalences.iter() {
136 // Each list of expressions should be equal to the same value.
137 let val = exprs[0].eval(&datums[..], temp_storage)?;
138 for expr in exprs[1..].iter() {
139 if expr.eval(datums, temp_storage)? != val {
140 return Ok(None);
141 }
142 }
143 }
144 self.before.evaluate_into(datums, temp_storage, row)
145 }
146
147 /// Construct an instance of the closure from available columns.
148 ///
149 /// This method updates the available columns, equivalences, and
150 /// the `MapFilterProject` instance. The columns are updated to
151 /// include reference to any columns added by the application of
152 /// this logic, which might result from partial application of
153 /// the `MapFilterProject` instance.
154 ///
155 /// If all columns are available for `mfp`, this method works
156 /// extra hard to ensure that the closure contains all the work,
157 /// and `mfp` is left as an identity transform (which can then
158 /// be ignored).
159 fn build(
160 columns: &mut BTreeMap<usize, usize>,
161 equivalences: &mut Vec<Vec<MirScalarExpr>>,
162 mfp: &mut MapFilterProject,
163 permutation: BTreeMap<usize, usize>,
164 thinned_arity_with_key: usize,
165 ) -> Self {
166 // First, determine which columns should be compared due to `equivalences`.
167 let mut ready_equivalences = Vec::new();
168 for equivalence in equivalences.iter_mut() {
169 if let Some(pos) = equivalence
170 .iter()
171 .position(|e| e.support().into_iter().all(|c| columns.contains_key(&c)))
172 {
173 let mut should_equate = Vec::new();
174 let mut cursor = pos + 1;
175 while cursor < equivalence.len() {
176 if equivalence[cursor]
177 .support()
178 .into_iter()
179 .all(|c| columns.contains_key(&c))
180 {
181 // Remove expression and equate with the first bound expression.
182 should_equate.push(equivalence.remove(cursor));
183 } else {
184 cursor += 1;
185 }
186 }
187 if !should_equate.is_empty() {
188 should_equate.push(equivalence[pos].clone());
189 ready_equivalences.push(should_equate);
190 }
191 }
192 }
193 equivalences.retain(|e| e.len() > 1);
194 let permuted_columns = columns.iter().map(|(k, v)| (*k, permutation[v])).collect();
195 // Update ready_equivalences to reference correct column locations.
196 for exprs in ready_equivalences.iter_mut() {
197 for expr in exprs.iter_mut() {
198 expr.permute_map(&permuted_columns);
199 }
200 }
201
202 // Next, partition `mfp` into `before` and `after`, the former of which can be
203 // applied now.
204 let (mut before, after) = std::mem::replace(mfp, MapFilterProject::new(mfp.input_arity))
205 .partition(columns.clone(), columns.len());
206
207 // Add any newly created columns to `columns`. These columns may be referenced
208 // by `after`, and it will be important to track their locations.
209 let bonus_columns = before.projection.len() - before.input_arity;
210 for bonus_column in 0..bonus_columns {
211 columns.insert(mfp.input_arity + bonus_column, columns.len());
212 }
213
214 *mfp = after;
215
216 // Before constructing and returning the result, we can remove output columns of `before`
217 // that are not needed in further `equivalences` or by `after` (now `mfp`).
218 let mut demand = Vec::new();
219 demand.extend(mfp.demand());
220 for equivalence in equivalences.iter() {
221 for expr in equivalence.iter() {
222 demand.extend(expr.support());
223 }
224 }
225 demand.sort();
226 demand.dedup();
227 // We only want to remove columns that are presented as outputs (i.e. can be found as in
228 // `columns`). Other columns have yet to be introduced, and we shouldn't have any opinion
229 // about them yet.
230 demand.retain(|column| columns.contains_key(column));
231 // Project `before` output columns using current locations of demanded columns.
232 before = before.project(demand.iter().map(|column| columns[column]));
233 // Update `columns` to reflect location of retained columns.
234 columns.clear();
235 for (index, column) in demand.iter().enumerate() {
236 columns.insert(*column, index);
237 }
238
239 // If `mfp` is a permutation of the columns present in `columns`, then we can
240 // apply that permutation to `before` and `columns`, so that `mfp` becomes the
241 // identity operation.
242 if mfp.expressions.is_empty()
243 && mfp.predicates.is_empty()
244 && mfp.projection.len() == columns.len()
245 && mfp.projection.iter().all(|col| columns.contains_key(col))
246 && columns.keys().all(|col| mfp.projection.contains(col))
247 {
248 // The projection we want to apply to `before` comes to us from `mfp` in the
249 // extended output column reckoning.
250 let projection = mfp
251 .projection
252 .iter()
253 .map(|col| columns[col])
254 .collect::<Vec<_>>();
255 before = before.project(projection);
256 // Update the physical locations of each output column.
257 columns.clear();
258 for (index, column) in mfp.projection.iter().enumerate() {
259 columns.insert(*column, index);
260 }
261 }
262
263 before.permute_fn(|c| permutation[&c], thinned_arity_with_key);
264
265 // `before` should not be modified after this point.
266 before.optimize();
267
268 // Cons up an instance of the closure with the closed-over state.
269 Self {
270 ready_equivalences,
271 before: before.into_plan().unwrap().into_nontemporal().unwrap(),
272 }
273 }
274
275 /// True iff the closure neither filters nor transforms records.
276 pub fn is_identity(&self) -> bool {
277 self.ready_equivalences.is_empty() && self.before.is_identity()
278 }
279
280 /// Returns true if evaluation could introduce an error on non-error inputs.
281 pub fn could_error(&self) -> bool {
282 self.before.could_error()
283 || self
284 .ready_equivalences
285 .iter()
286 .any(|es| es.iter().any(|e| e.could_error()))
287 }
288}
289
290/// Maintained state as we construct join dataflows.
291///
292/// This state primarily tracks the *remaining* work that has not yet been applied to a
293/// stream of partial results.
294///
295/// This state is meant to reconcile the logical operations that remain to apply (e.g.
296/// filtering, expressions, projection) and the physical organization of the current stream
297/// of data, which columns may be partially assembled in non-standard locations and which
298/// may already have been partially subjected to logic we need to apply.
299#[derive(Debug)]
300pub struct JoinBuildState {
301 /// Map from expected locations in extended output column reckoning to physical locations.
302 column_map: BTreeMap<usize, usize>,
303 /// A list of equivalence classes of expressions.
304 ///
305 /// Within each equivalence class, expressions must evaluate to the same result to pass
306 /// the join expression. Importantly, "the same" should be evaluated with `Datum`s Rust
307 /// equality, rather than the equality presented by the `BinaryFunc` equality operator.
308 /// The distinction is important for null handling, at the least.
309 equivalences: Vec<Vec<MirScalarExpr>>,
310 /// The linear operator logic (maps, filters, and projection) that remains to be applied
311 /// to the output of the join.
312 ///
313 /// When we advance through the construction of the join dataflow, we may be able to peel
314 /// off some of this work, ideally reducing `mfp` to something nearly the identity.
315 mfp: MapFilterProject,
316}
317
318impl JoinBuildState {
319 /// Create a new join state and initial closure from initial values.
320 ///
321 /// The initial closure can be `None` which indicates that it is the identity operator.
322 fn new(
323 columns: std::ops::Range<usize>,
324 equivalences: &[Vec<MirScalarExpr>],
325 mfp: &MapFilterProject,
326 ) -> Self {
327 let mut column_map = BTreeMap::new();
328 for column in columns {
329 column_map.insert(column, column_map.len());
330 }
331 let mut equivalences = equivalences.to_vec();
332 mz_expr::canonicalize::canonicalize_equivalence_classes(&mut equivalences);
333 Self {
334 column_map,
335 equivalences,
336 mfp: mfp.clone(),
337 }
338 }
339
340 /// Present new columns and extract any newly available closure.
341 fn add_columns(
342 &mut self,
343 new_columns: std::ops::Range<usize>,
344 bound_expressions: &[MirScalarExpr],
345 thinned_arity_with_key: usize,
346 // The permutation to run on the join of the thinned collections
347 permutation: BTreeMap<usize, usize>,
348 ) -> JoinClosure {
349 // Remove each element of `bound_expressions` from `equivalences`, so that we
350 // avoid redundant predicate work. This removal also paves the way for
351 // more precise "demand" information going forward.
352 for equivalence in self.equivalences.iter_mut() {
353 equivalence.retain(|expr| !bound_expressions.contains(expr));
354 }
355 self.equivalences.retain(|e| e.len() > 1);
356
357 // Update our map of the sources of each column in the update stream.
358 for column in new_columns {
359 self.column_map.insert(column, self.column_map.len());
360 }
361
362 self.extract_closure(permutation, thinned_arity_with_key)
363 }
364
365 /// Extract a final `MapFilterProject` once all columns are available.
366 ///
367 /// If not all columns are available this method will likely panic.
368 /// This method differs from `extract_closure` in that it forcibly
369 /// completes the join, extracting projections and expressions that
370 /// may not be extracted with `extract_closure` (for example, literals,
371 /// permutations, and repetition of output columns).
372 ///
373 /// The resulting closure may be the identity operator, which can be
374 /// checked with the `is_identity()` method.
375 fn complete(self) -> JoinClosure {
376 let Self {
377 column_map,
378 mut equivalences,
379 mut mfp,
380 } = self;
381
382 for equivalence in equivalences.iter_mut() {
383 for expr in equivalence.iter_mut() {
384 expr.permute_map(&column_map);
385 }
386 }
387 let column_map_len = column_map.len();
388 mfp.permute_fn(|c| column_map[&c], column_map_len);
389 mfp.optimize();
390
391 JoinClosure {
392 ready_equivalences: equivalences,
393 before: mfp.into_plan().unwrap().into_nontemporal().unwrap(),
394 }
395 }
396
397 /// A method on `self` that extracts an available closure.
398 ///
399 /// The extracted closure is not guaranteed to be non-trivial. Sensitive users should
400 /// consider using the `.is_identity()` method to determine non-triviality.
401 fn extract_closure(
402 &mut self,
403 permutation: BTreeMap<usize, usize>,
404 thinned_arity_with_key: usize,
405 ) -> JoinClosure {
406 JoinClosure::build(
407 &mut self.column_map,
408 &mut self.equivalences,
409 &mut self.mfp,
410 permutation,
411 thinned_arity_with_key,
412 )
413 }
414}
415
416#[cfg(test)]
417mod tests {
418 use mz_ore::assert_ok;
419 use mz_proto::protobuf_roundtrip;
420
421 use super::*;
422
423 proptest! {
424 #![proptest_config(ProptestConfig::with_cases(32))]
425
426 #[mz_ore::test]
427 #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
428 fn join_plan_protobuf_roundtrip(expect in any::<JoinPlan>() ) {
429 let actual = protobuf_roundtrip::<_, ProtoJoinPlan>(&expect);
430 assert_ok!(actual);
431 assert_eq!(actual.unwrap(), expect);
432 }
433 }
434}