mz_compute_types/plan/lowering.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Lowering [`DataflowDescription`]s from MIR ([`MirRelationExpr`]) to LIR ([`Plan`]).
11
12use std::collections::BTreeMap;
13
14use columnar::Len;
15use itertools::Itertools;
16use mz_expr::JoinImplementation::{DeltaQuery, Differential, IndexedFilter, Unimplemented};
17use mz_expr::{
18 AggregateExpr, Id, JoinInputMapper, MapFilterProject, MirRelationExpr, MirScalarExpr,
19 OptimizedMirRelationExpr, TableFunc, permutation_for_arrangement,
20};
21use mz_ore::{assert_none, soft_assert_eq_or_log, soft_panic_or_log};
22use mz_repr::GlobalId;
23use mz_repr::optimize::OptimizerFeatures;
24use timely::Container;
25use timely::progress::Timestamp;
26
27use crate::dataflows::{BuildDesc, DataflowDescription, IndexImport};
28use crate::plan::join::{DeltaJoinPlan, JoinPlan, LinearJoinPlan};
29use crate::plan::reduce::{KeyValPlan, ReducePlan};
30use crate::plan::threshold::ThresholdPlan;
31use crate::plan::top_k::TopKPlan;
32use crate::plan::{AvailableCollections, GetPlan, LirId, Plan, PlanNode};
33
34pub(super) struct Context {
35 /// Known bindings to (possibly arranged) collections.
36 arrangements: BTreeMap<Id, AvailableCollections>,
37 /// Tracks the next available `LirId`.
38 next_lir_id: LirId,
39 /// Information to print along with error messages.
40 debug_info: LirDebugInfo,
41 /// Whether to enable fusion of MFPs in reductions.
42 enable_reduce_mfp_fusion: bool,
43}
44
45impl Context {
46 pub fn new(debug_name: String, features: &OptimizerFeatures) -> Self {
47 Self {
48 arrangements: Default::default(),
49 next_lir_id: LirId(1),
50 debug_info: LirDebugInfo {
51 debug_name,
52 id: GlobalId::Transient(0),
53 },
54 enable_reduce_mfp_fusion: features.enable_reduce_mfp_fusion,
55 }
56 }
57
58 fn allocate_lir_id(&mut self) -> LirId {
59 let id = self.next_lir_id;
60 self.next_lir_id = LirId(
61 self.next_lir_id
62 .0
63 .checked_add(1)
64 .expect("No LirId overflow"),
65 );
66 id
67 }
68
69 pub fn lower<T: Timestamp>(
70 mut self,
71 desc: DataflowDescription<OptimizedMirRelationExpr>,
72 ) -> Result<DataflowDescription<Plan<T>>, String> {
73 // Sources might provide arranged forms of their data, in the future.
74 // Indexes provide arranged forms of their data.
75 for IndexImport {
76 desc: index_desc,
77 typ,
78 ..
79 } in desc.index_imports.values()
80 {
81 let key = index_desc.key.clone();
82 // TODO[btv] - We should be told the permutation by
83 // `index_desc`, and it should have been generated
84 // at the same point the thinning logic was.
85 //
86 // We should for sure do that soon, but it requires
87 // a bit of a refactor, so for now we just
88 // _assume_ that they were both generated by `permutation_for_arrangement`,
89 // and recover it here.
90 let (permutation, thinning) = permutation_for_arrangement(&key, typ.arity());
91 let index_keys = self
92 .arrangements
93 .entry(Id::Global(index_desc.on_id))
94 .or_insert_with(AvailableCollections::default);
95 index_keys.arranged.push((key, permutation, thinning));
96 }
97 for id in desc.source_imports.keys() {
98 self.arrangements
99 .entry(Id::Global(*id))
100 .or_insert_with(AvailableCollections::new_raw);
101 }
102
103 // Build each object in order, registering the arrangements it forms.
104 let mut objects_to_build = Vec::with_capacity(desc.objects_to_build.len());
105 for build in desc.objects_to_build {
106 self.debug_info.id = build.id;
107 let (plan, keys) = self.lower_mir_expr(&build.plan)?;
108
109 self.arrangements.insert(Id::Global(build.id), keys);
110 objects_to_build.push(BuildDesc { id: build.id, plan });
111 }
112
113 Ok(DataflowDescription {
114 source_imports: desc.source_imports,
115 index_imports: desc.index_imports,
116 objects_to_build,
117 index_exports: desc.index_exports,
118 sink_exports: desc.sink_exports,
119 as_of: desc.as_of,
120 until: desc.until,
121 initial_storage_as_of: desc.initial_storage_as_of,
122 refresh_schedule: desc.refresh_schedule,
123 debug_name: desc.debug_name,
124 time_dependence: desc.time_dependence,
125 })
126 }
127
128 /// This method converts a MirRelationExpr into a plan that can be directly rendered.
129 ///
130 /// The rough structure is that we repeatedly extract map/filter/project operators
131 /// from each expression we see, bundle them up as a `MapFilterProject` object, and
132 /// then produce a plan for the combination of that with the next operator.
133 ///
134 /// The method accesses `self.arrangements`, which it will locally add to and remove from for
135 /// `Let` bindings (by the end of the call it should contain the same bindings as when it
136 /// started).
137 ///
138 /// The result of the method is both a `Plan`, but also a list of arrangements that
139 /// are certain to be produced, which can be relied on by the next steps in the plan.
140 /// Each of the arrangement keys is associated with an MFP that must be applied if that
141 /// arrangement is used, to back out the permutation associated with that arrangement.
142 ///
143 /// An empty list of arrangement keys indicates that only a `Collection` stream can
144 /// be assumed to exist.
145 fn lower_mir_expr<T: Timestamp>(
146 &mut self,
147 expr: &MirRelationExpr,
148 ) -> Result<(Plan<T>, AvailableCollections), String> {
149 // This function is recursive and can overflow its stack, so grow it if
150 // needed. The growth here is unbounded. Our general solution for this problem
151 // is to use [`ore::stack::RecursionGuard`] to additionally limit the stack
152 // depth. That however requires upstream error handling. This function is
153 // currently called by the Coordinator after calls to `catalog_transact`,
154 // and thus are not allowed to fail. Until that allows errors, we choose
155 // to allow the unbounded growth here. We are though somewhat protected by
156 // higher levels enforcing their own limits on stack depth (in the parser,
157 // transformer/desugarer, and planner).
158 mz_ore::stack::maybe_grow(|| self.lower_mir_expr_stack_safe(expr))
159 }
160
161 fn lower_mir_expr_stack_safe<T>(
162 &mut self,
163 expr: &MirRelationExpr,
164 ) -> Result<(Plan<T>, AvailableCollections), String>
165 where
166 T: Timestamp,
167 {
168 // Extract a maximally large MapFilterProject from `expr`.
169 // We will then try and push this in to the resulting expression.
170 //
171 // Importantly, `mfp` may contain temporal operators and not be a "safe" MFP.
172 // While we would eventually like all plan stages to be able to absorb such
173 // general operators, not all of them can.
174 let (mut mfp, expr) = MapFilterProject::extract_from_expression(expr);
175 // We attempt to plan what we have remaining, in the context of `mfp`.
176 // We may not be able to do this, and must wrap some operators with a `Mfp` stage.
177 let (mut plan, mut keys) = match expr {
178 // These operators should have been extracted from the expression.
179 MirRelationExpr::Map { .. } => {
180 panic!("This operator should have been extracted");
181 }
182 MirRelationExpr::Filter { .. } => {
183 panic!("This operator should have been extracted");
184 }
185 MirRelationExpr::Project { .. } => {
186 panic!("This operator should have been extracted");
187 }
188 // These operators may not have been extracted, and need to result in a `Plan`.
189 MirRelationExpr::Constant { rows, typ: _ } => {
190 let lir_id = self.allocate_lir_id();
191 let node = PlanNode::Constant {
192 rows: rows.clone().map(|rows| {
193 rows.into_iter()
194 .map(|(row, diff)| (row, T::minimum(), diff))
195 .collect()
196 }),
197 };
198 // The plan, not arranged in any way.
199 (node.as_plan(lir_id), AvailableCollections::new_raw())
200 }
201 MirRelationExpr::Get { id, typ: _, .. } => {
202 // This stage can absorb arbitrary MFP operators.
203 let mut mfp = mfp.take();
204 // If `mfp` is the identity, we can surface all imported arrangements.
205 // Otherwise, we apply `mfp` and promise no arrangements.
206 let mut in_keys = self
207 .arrangements
208 .get(id)
209 .cloned()
210 .unwrap_or_else(AvailableCollections::new_raw);
211
212 // Seek out an arrangement key that might be constrained to a literal.
213 // Note: this code has very little use nowadays, as its job was mostly taken over
214 // by `LiteralConstraints` (see in the below longer comment).
215 let key_val = in_keys
216 .arranged
217 .iter()
218 .filter_map(|key| {
219 mfp.literal_constraints(&key.0)
220 .map(|val| (key.clone(), val))
221 })
222 .max_by_key(|(key, _val)| key.0.len());
223
224 // Determine the plan of action for the `Get` stage.
225 let plan = if let Some(((key, permutation, thinning), val)) = &key_val {
226 // This code path used to handle looking up literals from indexes, but it's
227 // mostly deprecated, as this is nowadays performed by the `LiteralConstraints`
228 // MIR transform instead. However, it's still called in a couple of tricky
229 // special cases:
230 // - `LiteralConstraints` handles only Gets of global ids, so this code still
231 // gets to handle Filters on top of Gets of local ids.
232 // - Lowering does a `MapFilterProject::extract_from_expression`, while
233 // `LiteralConstraints` does
234 // `MapFilterProject::extract_non_errors_from_expr_mut`.
235 // - It might happen that new literal constraint optimization opportunities
236 // appear somewhere near the end of the MIR optimizer after
237 // `LiteralConstraints` has already run.
238 // (Also note that a similar literal constraint handling machinery is also
239 // present when handling the leftover MFP after this big match.)
240 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
241 in_keys.arranged = vec![(key.clone(), permutation.clone(), thinning.clone())];
242 GetPlan::Arrangement(key.clone(), Some(val.clone()), mfp)
243 } else if !mfp.is_identity() {
244 // We need to ensure a collection exists, which means we must form it.
245 if let Some((key, permutation, thinning)) =
246 in_keys.arbitrary_arrangement().cloned()
247 {
248 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
249 in_keys.arranged = vec![(key.clone(), permutation, thinning)];
250 GetPlan::Arrangement(key, None, mfp)
251 } else {
252 GetPlan::Collection(mfp)
253 }
254 } else {
255 // By default, just pass input arrangements through.
256 GetPlan::PassArrangements
257 };
258
259 let out_keys = if let GetPlan::PassArrangements = plan {
260 in_keys.clone()
261 } else {
262 AvailableCollections::new_raw()
263 };
264
265 let lir_id = self.allocate_lir_id();
266 let node = PlanNode::Get {
267 id: id.clone(),
268 keys: in_keys,
269 plan,
270 };
271 // Return the plan, and any keys if an identity `mfp`.
272 (node.as_plan(lir_id), out_keys)
273 }
274 MirRelationExpr::Let { id, value, body } => {
275 // It would be unfortunate to have a non-trivial `mfp` here, as we hope
276 // that they would be pushed down. I am not sure if we should take the
277 // initiative to push down the `mfp` ourselves.
278
279 // Plan the value using only the initial arrangements, but
280 // introduce any resulting arrangements bound to `id`.
281 let (value, v_keys) = self.lower_mir_expr(value)?;
282 let pre_existing = self.arrangements.insert(Id::Local(*id), v_keys);
283 assert_none!(pre_existing);
284 // Plan the body using initial and `value` arrangements,
285 // and then remove reference to the value arrangements.
286 let (body, b_keys) = self.lower_mir_expr(body)?;
287 self.arrangements.remove(&Id::Local(*id));
288 // Return the plan, and any `body` arrangements.
289 let lir_id = self.allocate_lir_id();
290 (
291 PlanNode::Let {
292 id: id.clone(),
293 value: Box::new(value),
294 body: Box::new(body),
295 }
296 .as_plan(lir_id),
297 b_keys,
298 )
299 }
300 MirRelationExpr::LetRec {
301 ids,
302 values,
303 limits,
304 body,
305 } => {
306 assert_eq!(ids.len(), values.len());
307 assert_eq!(ids.len(), limits.len());
308 // Plan the values using only the available arrangements, but
309 // introduce any resulting arrangements bound to each `id`.
310 // Arrangements made available cannot be used by prior bindings,
311 // as we cannot circulate an arrangement through a `Variable` yet.
312 let mut lir_values = Vec::with_capacity(values.len());
313 for (id, value) in ids.iter().zip_eq(values) {
314 let (mut lir_value, mut v_keys) = self.lower_mir_expr(value)?;
315 // If `v_keys` does not contain an unarranged collection, we must form it.
316 if !v_keys.raw {
317 // Choose an "arbitrary" arrangement; TODO: prefer a specific one.
318 let (input_key, permutation, thinning) =
319 v_keys.arbitrary_arrangement().unwrap();
320 let mut input_mfp = MapFilterProject::new(value.arity());
321 input_mfp.permute_fn(|c| permutation[c], thinning.len() + input_key.len());
322 let input_key = Some(input_key.clone());
323
324 let forms = AvailableCollections::new_raw();
325
326 // We just want to insert an `ArrangeBy` to form an unarranged collection,
327 // but there is a complication: We shouldn't break the invariant (created by
328 // `NormalizeLets`, and relied upon by the rendering) that there isn't
329 // anything between two `LetRec`s. So if `lir_value` is itself a `LetRec`,
330 // then we insert the `ArrangeBy` on the `body` of the inner `LetRec`,
331 // instead of on top of the inner `LetRec`.
332 lir_value = match lir_value {
333 Plan {
334 node:
335 PlanNode::LetRec {
336 ids,
337 values,
338 limits,
339 body,
340 },
341 lir_id,
342 } => {
343 let inner_lir_id = self.allocate_lir_id();
344 PlanNode::LetRec {
345 ids,
346 values,
347 limits,
348 body: Box::new(
349 PlanNode::ArrangeBy {
350 input_key,
351 input: body,
352 input_mfp,
353 forms,
354 }
355 .as_plan(inner_lir_id),
356 ),
357 }
358 .as_plan(lir_id)
359 }
360 lir_value => {
361 let lir_id = self.allocate_lir_id();
362 PlanNode::ArrangeBy {
363 input_key,
364 input: Box::new(lir_value),
365 input_mfp,
366 forms,
367 }
368 .as_plan(lir_id)
369 }
370 };
371 v_keys.raw = true;
372 }
373 let pre_existing = self.arrangements.insert(Id::Local(*id), v_keys);
374 assert_none!(pre_existing);
375 lir_values.push(lir_value);
376 }
377 // As we exit the iterative scope, we must leave all arrangements behind,
378 // as they reference a timestamp coordinate that must be stripped off.
379 for id in ids.iter() {
380 self.arrangements
381 .insert(Id::Local(*id), AvailableCollections::new_raw());
382 }
383 // Plan the body using initial and `value` arrangements,
384 // and then remove reference to the value arrangements.
385 let (body, b_keys) = self.lower_mir_expr(body)?;
386 for id in ids.iter() {
387 self.arrangements.remove(&Id::Local(*id));
388 }
389 // Return the plan, and any `body` arrangements.
390 let lir_id = self.allocate_lir_id();
391 (
392 PlanNode::LetRec {
393 ids: ids.clone(),
394 values: lir_values,
395 limits: limits.clone(),
396 body: Box::new(body),
397 }
398 .as_plan(lir_id),
399 b_keys,
400 )
401 }
402 MirRelationExpr::FlatMap {
403 input: flat_map_input,
404 func,
405 exprs,
406 } => {
407 // A `FlatMap UnnestList` that comes after the `Reduce` of a window function can be
408 // fused into the lowered `Reduce`.
409 //
410 // In theory, we could have implemented this also as an MIR transform. However, this
411 // is more of a physical optimization, which are sometimes unpleasant to make a part
412 // of the MIR pipeline. The specific problem here with putting this into the MIR
413 // pipeline would be that we'd need to modify MIR's semantics: MIR's Reduce
414 // currently always emits exactly 1 row per group, but the fused Reduce-FlatMap can
415 // emit multiple rows per group. Such semantic changes of MIR are very scary, since
416 // various parts of the optimizer assume that Reduce emits only 1 row per group, and
417 // it would be very hard to hunt down all these parts. (For example, key inference
418 // infers the group key as a unique key.)
419 let fused_with_reduce = 'fusion: {
420 if !matches!(func, TableFunc::UnnestList { .. }) {
421 break 'fusion None;
422 }
423 // We might have a Project of a single col between the FlatMap and the
424 // Reduce. (It projects away the grouping keys of the Reduce, and keeps the
425 // result of the window function.)
426 let (maybe_reduce, num_grouping_keys) = if let MirRelationExpr::Project {
427 input: project_input,
428 outputs: projection,
429 } = &**flat_map_input
430 {
431 // We want this to be a single column, because we'll want to deal with only
432 // one aggregation in the `Reduce`. (The aggregation of a window function
433 // always stands alone currently: we plan them separately from other
434 // aggregations, and Reduces are never fused. When window functions are
435 // fused with each other, they end up in one aggregation. When there are
436 // multiple window functions in the same SELECT, but can't be fused, they
437 // end up in different Reduces.)
438 if let &[single_col] = &**projection {
439 (project_input, single_col)
440 } else {
441 break 'fusion None;
442 }
443 } else {
444 (flat_map_input, 0)
445 };
446 if let MirRelationExpr::Reduce {
447 input,
448 group_key,
449 aggregates,
450 monotonic,
451 expected_group_size,
452 } = &**maybe_reduce
453 {
454 if group_key.len() != num_grouping_keys
455 || aggregates.len() != 1
456 || !aggregates[0].func.can_fuse_with_unnest_list()
457 {
458 break 'fusion None;
459 }
460 // At the beginning, `non_fused_mfp_above_flat_map` will be the original MFP
461 // above the FlatMap. Later, we'll mutate this to be the residual MFP that
462 // didn't get fused into the `Reduce`.
463 let non_fused_mfp_above_flat_map = &mut mfp;
464 let reduce_output_arity = num_grouping_keys + 1;
465 // We are fusing away the list that the FlatMap would have been unnesting,
466 // so the column that had that list disappears, so we have to permute the
467 // MFP above the FlatMap with this column disappearance.
468 let tweaked_mfp = {
469 let mut mfp = non_fused_mfp_above_flat_map.clone();
470 if mfp.demand().contains(&0) {
471 // I don't think this can happen currently that this MFP would
472 // refer to the list column, because both the list column and the
473 // MFP were constructed by the HIR-to-MIR lowering, so it's not just
474 // some random MFP that we are seeing here. But anyhow, it's better
475 // to check this here for robustness against future code changes.
476 break 'fusion None;
477 }
478 let permutation: BTreeMap<_, _> =
479 (1..mfp.input_arity).map(|col| (col, col - 1)).collect();
480 mfp.permute_fn(|c| permutation[&c], mfp.input_arity - 1);
481 mfp
482 };
483 // We now put together the project that was before the FlatMap, and the
484 // tweaked version of the MFP that was after the FlatMap.
485 // (Part of this MFP might be fused into the Reduce.)
486 let mut project_and_tweaked_mfp = {
487 let mut mfp = MapFilterProject::new(reduce_output_arity);
488 mfp = mfp.project(vec![num_grouping_keys]);
489 mfp = MapFilterProject::compose(mfp, tweaked_mfp);
490 mfp
491 };
492 let fused = self.lower_reduce(
493 input,
494 group_key,
495 aggregates,
496 monotonic,
497 expected_group_size,
498 &mut project_and_tweaked_mfp,
499 true,
500 )?;
501 // Update the residual MFP.
502 *non_fused_mfp_above_flat_map = project_and_tweaked_mfp;
503 Some(fused)
504 } else {
505 break 'fusion None;
506 }
507 };
508 if let Some(fused_with_reduce) = fused_with_reduce {
509 fused_with_reduce
510 } else {
511 // Couldn't fuse it with a `Reduce`, so lower as a normal `FlatMap`.
512 let (input, keys) = self.lower_mir_expr(flat_map_input)?;
513 // This stage can absorb arbitrary MFP instances.
514 let mfp = mfp.take();
515 let mut exprs = exprs.clone();
516 let input_key = if let Some((k, permutation, _)) = keys.arbitrary_arrangement()
517 {
518 // We don't permute the MFP here, because it runs _after_ the table function,
519 // whose output is in a fixed order.
520 //
521 // We _do_, however, need to permute the `expr`s that provide input to the
522 // `func`.
523 let permutation = permutation.iter().cloned().enumerate().collect();
524 for expr in &mut exprs {
525 expr.permute_map(&permutation);
526 }
527
528 Some(k.clone())
529 } else {
530 None
531 };
532
533 let lir_id = self.allocate_lir_id();
534 // Return the plan, and no arrangements.
535 (
536 PlanNode::FlatMap {
537 input_key,
538 input: Box::new(input),
539 exprs: exprs.clone(),
540 func: func.clone(),
541 mfp_after: mfp,
542 }
543 .as_plan(lir_id),
544 AvailableCollections::new_raw(),
545 )
546 }
547 }
548 MirRelationExpr::Join {
549 inputs,
550 equivalences,
551 implementation,
552 } => {
553 // Plan each of the join inputs independently.
554 // The `plans` get surfaced upwards, and the `input_keys` should
555 // be used as part of join planning / to validate the existing
556 // plans / to aid in indexed seeding of update streams.
557 let mut plans = Vec::new();
558 let mut input_keys = Vec::new();
559 let mut input_arities = Vec::new();
560 for input in inputs.iter() {
561 let (plan, keys) = self.lower_mir_expr(input)?;
562 input_arities.push(input.arity());
563 plans.push(plan);
564 input_keys.push(keys);
565 }
566
567 let input_mapper =
568 JoinInputMapper::new_from_input_arities(input_arities.iter().copied());
569
570 // Extract temporal predicates as joins cannot currently absorb them.
571 let (plan, missing) = match implementation {
572 IndexedFilter(_coll_id, _idx_id, key, _val) => {
573 // Start with the constant input. (This used to be important before database-issues#4016
574 // was fixed.)
575 let start: usize = 1;
576 let order = vec![(0usize, key.clone(), None)];
577 // All columns of the constant input will be part of the arrangement key.
578 let source_arrangement = (
579 (0..key.len())
580 .map(MirScalarExpr::column)
581 .collect::<Vec<_>>(),
582 (0..key.len()).collect::<Vec<_>>(),
583 Vec::<usize>::new(),
584 );
585 let (ljp, missing) = LinearJoinPlan::create_from(
586 start,
587 Some(&source_arrangement),
588 equivalences,
589 &order,
590 input_mapper,
591 &mut mfp,
592 &input_keys,
593 );
594 (JoinPlan::Linear(ljp), missing)
595 }
596 Differential((start, start_arr, _start_characteristic), order) => {
597 let source_arrangement = start_arr.as_ref().and_then(|key| {
598 input_keys[*start]
599 .arranged
600 .iter()
601 .find(|(k, _, _)| k == key)
602 .clone()
603 });
604 let (ljp, missing) = LinearJoinPlan::create_from(
605 *start,
606 source_arrangement,
607 equivalences,
608 order,
609 input_mapper,
610 &mut mfp,
611 &input_keys,
612 );
613 (JoinPlan::Linear(ljp), missing)
614 }
615 DeltaQuery(orders) => {
616 let (djp, missing) = DeltaJoinPlan::create_from(
617 equivalences,
618 orders,
619 input_mapper,
620 &mut mfp,
621 &input_keys,
622 );
623 (JoinPlan::Delta(djp), missing)
624 }
625 // Other plans are errors, and should be reported as such.
626 Unimplemented => return Err("unimplemented join".to_string()),
627 };
628 // The renderer will expect certain arrangements to exist; if any of those are not available, the join planning functions above should have returned them in
629 // `missing`. We thus need to plan them here so they'll exist.
630 let is_delta = matches!(plan, JoinPlan::Delta(_));
631 for (((input_plan, input_keys), missing), arity) in plans
632 .iter_mut()
633 .zip_eq(input_keys.iter())
634 .zip_eq(missing.into_iter())
635 .zip_eq(input_arities.iter().cloned())
636 {
637 if missing != Default::default() {
638 if is_delta {
639 // join_implementation.rs produced a sub-optimal plan here;
640 // we shouldn't plan delta joins at all if not all of the required
641 // arrangements are available. Soft panic in CI and log an error in
642 // production to increase the chances that we will catch all situations
643 // that violate this constraint.
644 soft_panic_or_log!("Arrangements depended on by delta join alarmingly absent: {:?}
645Dataflow info: {}
646This is not expected to cause incorrect results, but could indicate a performance issue in Materialize.", missing, self.debug_info);
647 } else {
648 soft_panic_or_log!("Arrangements depended on by a non-delta join are absent: {:?}
649Dataflow info: {}
650This is not expected to cause incorrect results, but could indicate a performance issue in Materialize.", missing, self.debug_info);
651 // Nowadays MIR transforms take care to insert MIR ArrangeBys for each
652 // Join input. (Earlier, they were missing in the following cases:
653 // - They were const-folded away for constant inputs. This is not
654 // happening since
655 // https://github.com/MaterializeInc/materialize/pull/16351
656 // - They were not being inserted for the constant input of
657 // `IndexedFilter`s. This was fixed in
658 // https://github.com/MaterializeInc/materialize/pull/20920
659 // - They were not being inserted for the first input of Differential
660 // joins. This was fixed in
661 // https://github.com/MaterializeInc/materialize/pull/16099)
662 }
663 let lir_id = self.allocate_lir_id();
664 let raw_plan = std::mem::replace(
665 input_plan,
666 PlanNode::Constant {
667 rows: Ok(Vec::new()),
668 }
669 .as_plan(lir_id),
670 );
671 *input_plan = self.arrange_by(raw_plan, missing, input_keys, arity);
672 }
673 }
674 // Return the plan, and no arrangements.
675 let lir_id = self.allocate_lir_id();
676 (
677 PlanNode::Join {
678 inputs: plans,
679 plan,
680 }
681 .as_plan(lir_id),
682 AvailableCollections::new_raw(),
683 )
684 }
685 MirRelationExpr::Reduce {
686 input,
687 group_key,
688 aggregates,
689 monotonic,
690 expected_group_size,
691 } => {
692 if aggregates
693 .iter()
694 .any(|agg| agg.func.can_fuse_with_unnest_list())
695 {
696 // This case should have been handled at the `MirRelationExpr::FlatMap` case
697 // above. But that has a pretty complicated pattern matching, so it's not
698 // unthinkable that it fails.
699 soft_panic_or_log!(
700 "Window function performance issue: `reduce_unnest_list_fusion` failed"
701 );
702 }
703 self.lower_reduce(
704 input,
705 group_key,
706 aggregates,
707 monotonic,
708 expected_group_size,
709 &mut mfp,
710 false,
711 )?
712 }
713 MirRelationExpr::TopK {
714 input,
715 group_key,
716 order_key,
717 limit,
718 offset,
719 monotonic,
720 expected_group_size,
721 } => {
722 let arity = input.arity();
723 let (input, keys) = self.lower_mir_expr(input)?;
724
725 let top_k_plan = TopKPlan::create_from(
726 group_key.clone(),
727 order_key.clone(),
728 *offset,
729 limit.clone(),
730 arity,
731 *monotonic,
732 *expected_group_size,
733 );
734
735 // We don't have an MFP here -- install an operator to permute the
736 // input, if necessary.
737 let input = if !keys.raw {
738 self.arrange_by(input, AvailableCollections::new_raw(), &keys, arity)
739 } else {
740 input
741 };
742 // Return the plan, and no arrangements.
743 let lir_id = self.allocate_lir_id();
744 (
745 PlanNode::TopK {
746 input: Box::new(input),
747 top_k_plan,
748 }
749 .as_plan(lir_id),
750 AvailableCollections::new_raw(),
751 )
752 }
753 MirRelationExpr::Negate { input } => {
754 let arity = input.arity();
755 let (input, keys) = self.lower_mir_expr(input)?;
756
757 // We don't have an MFP here -- install an operator to permute the
758 // input, if necessary.
759 let input = if !keys.raw {
760 self.arrange_by(input, AvailableCollections::new_raw(), &keys, arity)
761 } else {
762 input
763 };
764 // Return the plan, and no arrangements.
765 let lir_id = self.allocate_lir_id();
766 (
767 PlanNode::Negate {
768 input: Box::new(input),
769 }
770 .as_plan(lir_id),
771 AvailableCollections::new_raw(),
772 )
773 }
774 MirRelationExpr::Threshold { input } => {
775 let (plan, keys) = self.lower_mir_expr(input)?;
776 let arity = input.arity();
777 let (threshold_plan, required_arrangement) = ThresholdPlan::create_from(arity);
778 let plan = if !keys
779 .arranged
780 .iter()
781 .any(|(key, _, _)| key == &required_arrangement.0)
782 {
783 self.arrange_by(
784 plan,
785 AvailableCollections::new_arranged(vec![required_arrangement]),
786 &keys,
787 arity,
788 )
789 } else {
790 plan
791 };
792
793 let output_keys = threshold_plan.keys();
794 // Return the plan, and any produced keys.
795 let lir_id = self.allocate_lir_id();
796 (
797 PlanNode::Threshold {
798 input: Box::new(plan),
799 threshold_plan,
800 }
801 .as_plan(lir_id),
802 output_keys,
803 )
804 }
805 MirRelationExpr::Union { base, inputs } => {
806 let arity = base.arity();
807 let mut plans_keys = Vec::with_capacity(1 + inputs.len());
808 let (plan, keys) = self.lower_mir_expr(base)?;
809 plans_keys.push((plan, keys));
810 for input in inputs.iter() {
811 let (plan, keys) = self.lower_mir_expr(input)?;
812 plans_keys.push((plan, keys));
813 }
814 let plans = plans_keys
815 .into_iter()
816 .map(|(plan, keys)| {
817 // We don't have an MFP here -- install an operator to permute the
818 // input, if necessary.
819 if !keys.raw {
820 self.arrange_by(plan, AvailableCollections::new_raw(), &keys, arity)
821 } else {
822 plan
823 }
824 })
825 .collect();
826 // Return the plan and no arrangements.
827 let lir_id = self.allocate_lir_id();
828 (
829 PlanNode::Union {
830 inputs: plans,
831 consolidate_output: false,
832 }
833 .as_plan(lir_id),
834 AvailableCollections::new_raw(),
835 )
836 }
837 MirRelationExpr::ArrangeBy { input, keys } => {
838 let input_mir = input;
839 let (input, mut input_keys) = self.lower_mir_expr(input)?;
840 // Fill the `types` in `input_keys` if not already present.
841 let arity = input_mir.arity();
842
843 // Determine keys that are not present in `input_keys`.
844 let new_keys = keys
845 .iter()
846 .filter(|k1| !input_keys.arranged.iter().any(|(k2, _, _)| k1 == &k2))
847 .cloned()
848 .collect::<Vec<_>>();
849 if new_keys.is_empty() {
850 (input, input_keys)
851 } else {
852 let mut new_keys = new_keys
853 .iter()
854 .cloned()
855 .map(|k| {
856 let (permutation, thinning) = permutation_for_arrangement(&k, arity);
857 (k, permutation, thinning)
858 })
859 .collect::<Vec<_>>();
860 let forms = AvailableCollections {
861 raw: input_keys.raw,
862 arranged: new_keys.clone(),
863 };
864 let (input_key, input_mfp) = if let Some((input_key, permutation, thinning)) =
865 input_keys.arbitrary_arrangement()
866 {
867 let mut mfp = MapFilterProject::new(arity);
868 mfp.permute_fn(|c| permutation[c], thinning.len() + input_key.len());
869 (Some(input_key.clone()), mfp)
870 } else {
871 (None, MapFilterProject::new(arity))
872 };
873 input_keys.arranged.append(&mut new_keys);
874 input_keys.arranged.sort_by(|k1, k2| k1.0.cmp(&k2.0));
875
876 // Return the plan and extended keys.
877 let lir_id = self.allocate_lir_id();
878 (
879 PlanNode::ArrangeBy {
880 input_key,
881 input: Box::new(input),
882 input_mfp,
883 forms,
884 }
885 .as_plan(lir_id),
886 input_keys,
887 )
888 }
889 }
890 };
891
892 // If the plan stage did not absorb all linear operators, introduce a new stage to implement them.
893 if !mfp.is_identity() {
894 // Seek out an arrangement key that might be constrained to a literal.
895 // TODO: Improve key selection heuristic.
896 let key_val = keys
897 .arranged
898 .iter()
899 .filter_map(|(key, permutation, thinning)| {
900 let mut mfp = mfp.clone();
901 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
902 mfp.literal_constraints(key)
903 .map(|val| (key.clone(), permutation, thinning, val))
904 })
905 .max_by_key(|(key, _, _, _)| key.len());
906
907 // Input key selection strategy:
908 // (1) If we can read a key at a particular value, do so
909 // (2) Otherwise, if there is a key that causes the MFP to be the identity, and
910 // therefore allows us to avoid discarding the arrangement, use that.
911 // (3) Otherwise, if there is _some_ key, use that,
912 // (4) Otherwise just read the raw collection.
913 let input_key_val = if let Some((key, permutation, thinning, val)) = key_val {
914 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
915
916 Some((key, Some(val)))
917 } else if let Some((key, permutation, thinning)) =
918 keys.arranged.iter().find(|(key, permutation, thinning)| {
919 let mut mfp = mfp.clone();
920 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
921 mfp.is_identity()
922 })
923 {
924 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
925 Some((key.clone(), None))
926 } else if let Some((key, permutation, thinning)) = keys.arbitrary_arrangement() {
927 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
928 Some((key.clone(), None))
929 } else {
930 None
931 };
932
933 if mfp.is_identity() {
934 // We have discovered a key
935 // whose permutation causes the MFP to actually
936 // be the identity! We can keep it around,
937 // but without its permutation this time,
938 // and with a trivial thinning of the right length.
939 let (key, val) = input_key_val.unwrap();
940 let (_old_key, old_permutation, old_thinning) = keys
941 .arranged
942 .iter_mut()
943 .find(|(key2, _, _)| key2 == &key)
944 .unwrap();
945 *old_permutation = (0..mfp.input_arity).collect();
946 let old_thinned_arity = old_thinning.len();
947 *old_thinning = (0..old_thinned_arity).collect();
948 // Get rid of all other forms, as this is now the only one known to be valid.
949 // TODO[btv] we can probably save the other arrangements too, if we adjust their permutations.
950 // This is not hard to do, but leaving it for a quick follow-up to avoid making the present diff too unwieldy.
951 keys.arranged.retain(|(key2, _, _)| key2 == &key);
952 keys.raw = false;
953
954 // Creating a Plan::Mfp node is now logically unnecessary, but we
955 // should do so anyway when `val` is populated, so that
956 // the `key_val` optimization gets applied.
957 let lir_id = self.allocate_lir_id();
958 if val.is_some() {
959 plan = PlanNode::Mfp {
960 input: Box::new(plan),
961 mfp,
962 input_key_val: Some((key, val)),
963 }
964 .as_plan(lir_id)
965 }
966 } else {
967 let lir_id = self.allocate_lir_id();
968 plan = PlanNode::Mfp {
969 input: Box::new(plan),
970 mfp,
971 input_key_val,
972 }
973 .as_plan(lir_id);
974 keys = AvailableCollections::new_raw();
975 }
976 }
977
978 Ok((plan, keys))
979 }
980
981 /// Lowers a `Reduce` with the given fields and an `mfp_on_top`, which is the MFP that is
982 /// originally on top of the `Reduce`. This MFP, or a part of it, might be fused into the
983 /// `Reduce`, in which case `mfp_on_top` is mutated to be the residual MFP, i.e., what was not
984 /// fused.
985 fn lower_reduce<T: Timestamp>(
986 &mut self,
987 input: &MirRelationExpr,
988 group_key: &Vec<MirScalarExpr>,
989 aggregates: &Vec<AggregateExpr>,
990 monotonic: &bool,
991 expected_group_size: &Option<u64>,
992 mfp_on_top: &mut MapFilterProject,
993 fused_unnest_list: bool,
994 ) -> Result<(Plan<T>, AvailableCollections), String> {
995 let input_arity = input.arity();
996 let (input, keys) = self.lower_mir_expr(input)?;
997 let (input_key, permutation_and_new_arity) =
998 if let Some((input_key, permutation, thinning)) = keys.arbitrary_arrangement() {
999 (
1000 Some(input_key.clone()),
1001 Some((permutation.clone(), thinning.len() + input_key.len())),
1002 )
1003 } else {
1004 (None, None)
1005 };
1006 let key_val_plan = KeyValPlan::new(
1007 input_arity,
1008 group_key,
1009 aggregates,
1010 permutation_and_new_arity,
1011 );
1012 let reduce_plan = ReducePlan::create_from(
1013 aggregates.clone(),
1014 *monotonic,
1015 *expected_group_size,
1016 fused_unnest_list,
1017 );
1018 // Return the plan, and the keys it produces.
1019 let mfp_after;
1020 let output_arity;
1021 if self.enable_reduce_mfp_fusion {
1022 (mfp_after, *mfp_on_top, output_arity) =
1023 reduce_plan.extract_mfp_after(mfp_on_top.clone(), group_key.len());
1024 } else {
1025 (mfp_after, output_arity) = (
1026 MapFilterProject::new(mfp_on_top.input_arity),
1027 group_key.len() + aggregates.len(),
1028 );
1029 }
1030 soft_assert_eq_or_log!(
1031 mfp_on_top.input_arity,
1032 output_arity,
1033 "Output arity of reduce must match input arity for MFP on top of it"
1034 );
1035 let output_keys = reduce_plan.keys(group_key.len(), output_arity);
1036 let lir_id = self.allocate_lir_id();
1037 Ok((
1038 PlanNode::Reduce {
1039 input_key,
1040 input: Box::new(input),
1041 key_val_plan,
1042 plan: reduce_plan,
1043 mfp_after,
1044 }
1045 .as_plan(lir_id),
1046 output_keys,
1047 ))
1048 }
1049
1050 /// Replace the plan with another one
1051 /// that has the collection in some additional forms.
1052 pub fn arrange_by<T>(
1053 &mut self,
1054 plan: Plan<T>,
1055 collections: AvailableCollections,
1056 old_collections: &AvailableCollections,
1057 arity: usize,
1058 ) -> Plan<T> {
1059 if let Plan {
1060 node:
1061 PlanNode::ArrangeBy {
1062 input_key,
1063 input,
1064 input_mfp,
1065 mut forms,
1066 },
1067 lir_id,
1068 } = plan
1069 {
1070 forms.raw |= collections.raw;
1071 forms.arranged.extend(collections.arranged);
1072 forms.arranged.sort_by(|k1, k2| k1.0.cmp(&k2.0));
1073 forms.arranged.dedup_by(|k1, k2| k1.0 == k2.0);
1074 PlanNode::ArrangeBy {
1075 input_key,
1076 input,
1077 input_mfp,
1078 forms,
1079 }
1080 .as_plan(lir_id)
1081 } else {
1082 let (input_key, input_mfp) = if let Some((input_key, permutation, thinning)) =
1083 old_collections.arbitrary_arrangement()
1084 {
1085 let mut mfp = MapFilterProject::new(arity);
1086 mfp.permute_fn(|c| permutation[c], thinning.len() + input_key.len());
1087 (Some(input_key.clone()), mfp)
1088 } else {
1089 (None, MapFilterProject::new(arity))
1090 };
1091 let lir_id = self.allocate_lir_id();
1092
1093 PlanNode::ArrangeBy {
1094 input_key,
1095 input: Box::new(plan),
1096 input_mfp,
1097 forms: collections,
1098 }
1099 .as_plan(lir_id)
1100 }
1101 }
1102}
1103
1104/// Various bits of state to print along with error messages during LIR planning,
1105/// to aid debugging.
1106#[derive(Clone, Debug)]
1107pub struct LirDebugInfo {
1108 debug_name: String,
1109 id: GlobalId,
1110}
1111
1112impl std::fmt::Display for LirDebugInfo {
1113 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1114 write!(f, "Debug name: {}; id: {}", self.debug_name, self.id)
1115 }
1116}