mz_compute_types/plan/lowering.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Lowering [`DataflowDescription`]s from MIR ([`MirRelationExpr`]) to LIR ([`Plan`]).
11
12use std::collections::{BTreeMap, BTreeSet};
13
14use columnar::Len;
15use itertools::Itertools;
16use mz_expr::JoinImplementation::{DeltaQuery, Differential, IndexedFilter, Unimplemented};
17use mz_expr::{
18 AggregateExpr, Id, JoinInputMapper, MapFilterProject, MirRelationExpr, MirScalarExpr,
19 OptimizedMirRelationExpr, TableFunc, permutation_for_arrangement,
20};
21use mz_ore::{assert_none, soft_assert_eq_or_log, soft_panic_or_log};
22use mz_repr::optimize::OptimizerFeatures;
23use mz_repr::{GlobalId, Timestamp};
24
25use crate::dataflows::{BuildDesc, DataflowDescription, IndexImport};
26use crate::plan::join::{DeltaJoinPlan, JoinPlan, LinearJoinPlan};
27use crate::plan::reduce::{KeyValPlan, ReducePlan};
28use crate::plan::threshold::ThresholdPlan;
29use crate::plan::top_k::TopKPlan;
30use crate::plan::{ArrangementStrategy, AvailableCollections, GetPlan, LirId, Plan, PlanNode};
31
32/// Pick an [`ArrangementStrategy`] based on whether the input may contain future-stamped
33/// updates. Future updates are the only case where temporal bucketing pays off.
34fn strategy_from_future(has_future_updates: bool) -> ArrangementStrategy {
35 if has_future_updates {
36 ArrangementStrategy::TemporalBucketing
37 } else {
38 ArrangementStrategy::Direct
39 }
40}
41
42/// The result of lowering a [`MirRelationExpr`] to a [`Plan`].
43struct LoweredExpr {
44 /// The lowered plan.
45 plan: Plan,
46 /// The arrangement keys that the plan is certain to produce.
47 keys: AvailableCollections,
48 /// Whether the plan's output may contain updates at future timestamps,
49 /// e.g., from a temporal MFP using `mz_now()`.
50 has_future_updates: bool,
51}
52
53pub(super) struct Context {
54 /// Known bindings to (possibly arranged) collections.
55 arrangements: BTreeMap<Id, AvailableCollections>,
56 /// Ids whose collections may contain updates at future timestamps,
57 /// e.g., from a temporal MFP using `mz_now()`.
58 has_future_updates: BTreeSet<Id>,
59 /// Tracks the next available `LirId`.
60 next_lir_id: LirId,
61 /// Information to print along with error messages.
62 debug_info: LirDebugInfo,
63 /// Whether to enable fusion of MFPs in reductions.
64 enable_reduce_mfp_fusion: bool,
65}
66
67impl Context {
68 pub fn new(debug_name: String, features: &OptimizerFeatures) -> Self {
69 Self {
70 arrangements: Default::default(),
71 has_future_updates: Default::default(),
72 next_lir_id: LirId(1),
73 debug_info: LirDebugInfo {
74 debug_name,
75 id: GlobalId::Transient(0),
76 },
77 enable_reduce_mfp_fusion: features.enable_reduce_mfp_fusion,
78 }
79 }
80
81 fn allocate_lir_id(&mut self) -> LirId {
82 let id = self.next_lir_id;
83 self.next_lir_id = LirId(
84 self.next_lir_id
85 .0
86 .checked_add(1)
87 .expect("No LirId overflow"),
88 );
89 id
90 }
91
92 pub fn lower(
93 mut self,
94 desc: DataflowDescription<OptimizedMirRelationExpr>,
95 ) -> Result<DataflowDescription<Plan>, String> {
96 // Sources might provide arranged forms of their data, in the future.
97 // Indexes provide arranged forms of their data.
98 for IndexImport {
99 desc: index_desc,
100 typ,
101 ..
102 } in desc.index_imports.values()
103 {
104 let key = index_desc.key.clone();
105 // TODO[btv] - We should be told the permutation by
106 // `index_desc`, and it should have been generated
107 // at the same point the thinning logic was.
108 //
109 // We should for sure do that soon, but it requires
110 // a bit of a refactor, so for now we just
111 // _assume_ that they were both generated by `permutation_for_arrangement`,
112 // and recover it here.
113 let (permutation, thinning) = permutation_for_arrangement(&key, typ.arity());
114 let index_keys = self
115 .arrangements
116 .entry(Id::Global(index_desc.on_id))
117 .or_insert_with(AvailableCollections::default);
118 index_keys.arranged.push((key, permutation, thinning));
119 }
120 for id in desc.source_imports.keys() {
121 self.arrangements
122 .entry(Id::Global(*id))
123 .or_insert_with(AvailableCollections::new_raw);
124 }
125
126 // Build each object in order, registering the arrangements it forms.
127 let mut objects_to_build = Vec::with_capacity(desc.objects_to_build.len());
128 for build in desc.objects_to_build {
129 self.debug_info.id = build.id;
130 let LoweredExpr {
131 plan,
132 keys,
133 has_future_updates,
134 } = self.lower_mir_expr(&build.plan)?;
135
136 self.arrangements.insert(Id::Global(build.id), keys);
137 if has_future_updates {
138 self.has_future_updates.insert(Id::Global(build.id));
139 }
140 objects_to_build.push(BuildDesc { id: build.id, plan });
141 }
142
143 Ok(DataflowDescription {
144 source_imports: desc.source_imports,
145 index_imports: desc.index_imports,
146 objects_to_build,
147 index_exports: desc.index_exports,
148 sink_exports: desc.sink_exports,
149 as_of: desc.as_of,
150 until: desc.until,
151 initial_storage_as_of: desc.initial_storage_as_of,
152 refresh_schedule: desc.refresh_schedule,
153 debug_name: desc.debug_name,
154 time_dependence: desc.time_dependence,
155 })
156 }
157
158 /// This method converts a MirRelationExpr into a plan that can be directly rendered.
159 ///
160 /// The rough structure is that we repeatedly extract map/filter/project operators
161 /// from each expression we see, bundle them up as a `MapFilterProject` object, and
162 /// then produce a plan for the combination of that with the next operator.
163 ///
164 /// The method accesses `self.arrangements`, which it will locally add to and remove from for
165 /// `Let` bindings (by the end of the call it should contain the same bindings as when it
166 /// started).
167 ///
168 /// The result of the method is both a `Plan`, but also a list of arrangements that
169 /// are certain to be produced, which can be relied on by the next steps in the plan.
170 /// Each of the arrangement keys is associated with an MFP that must be applied if that
171 /// arrangement is used, to back out the permutation associated with that arrangement.
172 ///
173 /// An empty list of arrangement keys indicates that only a `Collection` stream can
174 /// be assumed to exist.
175 fn lower_mir_expr(&mut self, expr: &MirRelationExpr) -> Result<LoweredExpr, String> {
176 // This function is recursive and can overflow its stack, so grow it if
177 // needed. The growth here is unbounded. Our general solution for this problem
178 // is to use [`ore::stack::RecursionGuard`] to additionally limit the stack
179 // depth. That however requires upstream error handling. This function is
180 // currently called by the Coordinator after calls to `catalog_transact`,
181 // and thus are not allowed to fail. Until that allows errors, we choose
182 // to allow the unbounded growth here. We are though somewhat protected by
183 // higher levels enforcing their own limits on stack depth (in the parser,
184 // transformer/desugarer, and planner).
185 mz_ore::stack::maybe_grow(|| self.lower_mir_expr_stack_safe(expr))
186 }
187
188 fn lower_mir_expr_stack_safe(&mut self, expr: &MirRelationExpr) -> Result<LoweredExpr, String> {
189 // Extract a maximally large MapFilterProject from `expr`.
190 // We will then try and push this in to the resulting expression.
191 //
192 // Importantly, `mfp` may contain temporal operators and not be a "safe" MFP.
193 // While we would eventually like all plan stages to be able to absorb such
194 // general operators, not all of them can.
195 let (mut mfp, expr) = MapFilterProject::extract_from_expression(expr);
196 // We attempt to plan what we have remaining, in the context of `mfp`.
197 // We may not be able to do this, and must wrap some operators with a `Mfp` stage.
198 let LoweredExpr {
199 mut plan,
200 mut keys,
201 mut has_future_updates,
202 } = match expr {
203 // These operators should have been extracted from the expression.
204 MirRelationExpr::Map { .. } => {
205 panic!("This operator should have been extracted");
206 }
207 MirRelationExpr::Filter { .. } => {
208 panic!("This operator should have been extracted");
209 }
210 MirRelationExpr::Project { .. } => {
211 panic!("This operator should have been extracted");
212 }
213 // These operators may not have been extracted, and need to result in a `Plan`.
214 MirRelationExpr::Constant { rows, typ: _ } => {
215 let lir_id = self.allocate_lir_id();
216 let node = PlanNode::Constant {
217 rows: rows.clone().map(|rows| {
218 rows.into_iter()
219 .map(|(row, diff)| (row, Timestamp::MIN, diff))
220 .collect()
221 }),
222 };
223 // The plan, not arranged in any way.
224 LoweredExpr {
225 plan: node.as_plan(lir_id),
226 keys: AvailableCollections::new_raw(),
227 has_future_updates: false,
228 }
229 }
230 MirRelationExpr::Get { id, typ: _, .. } => {
231 // This stage can absorb arbitrary MFP operators.
232 let mut mfp = mfp.take();
233 // If `mfp` is the identity, we can surface all imported arrangements.
234 // Otherwise, we apply `mfp` and promise no arrangements.
235 let mut in_keys = self
236 .arrangements
237 .get(id)
238 .cloned()
239 .unwrap_or_else(AvailableCollections::new_raw);
240
241 // Seek out an arrangement key that might be constrained to a literal.
242 // Note: this code has very little use nowadays, as its job was mostly taken over
243 // by `LiteralConstraints` (see in the below longer comment).
244 let key_val = in_keys
245 .arranged
246 .iter()
247 .filter_map(|key| {
248 mfp.literal_constraints(&key.0)
249 .map(|val| (key.clone(), val))
250 })
251 .max_by_key(|(key, _val)| key.0.len());
252
253 // Determine the plan of action for the `Get` stage.
254 let plan = if let Some(((key, permutation, thinning), val)) = &key_val {
255 // This code path used to handle looking up literals from indexes, but it's
256 // mostly deprecated, as this is nowadays performed by the `LiteralConstraints`
257 // MIR transform instead. However, it's still called in a couple of tricky
258 // special cases:
259 // - `LiteralConstraints` handles only Gets of global ids, so this code still
260 // gets to handle Filters on top of Gets of local ids.
261 // - Lowering does a `MapFilterProject::extract_from_expression`, while
262 // `LiteralConstraints` does
263 // `MapFilterProject::extract_non_errors_from_expr_mut`.
264 // - It might happen that new literal constraint optimization opportunities
265 // appear somewhere near the end of the MIR optimizer after
266 // `LiteralConstraints` has already run.
267 // (Also note that a similar literal constraint handling machinery is also
268 // present when handling the leftover MFP after this big match.)
269 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
270 in_keys.arranged = vec![(key.clone(), permutation.clone(), thinning.clone())];
271 GetPlan::Arrangement(key.clone(), Some(val.clone()), mfp)
272 } else if !mfp.is_identity() {
273 // We need to ensure a collection exists, which means we must form it.
274 if let Some((key, permutation, thinning)) =
275 in_keys.arbitrary_arrangement().cloned()
276 {
277 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
278 in_keys.arranged = vec![(key.clone(), permutation, thinning)];
279 GetPlan::Arrangement(key, None, mfp)
280 } else {
281 GetPlan::Collection(mfp)
282 }
283 } else {
284 // By default, just pass input arrangements through.
285 GetPlan::PassArrangements
286 };
287
288 let out_keys = if let GetPlan::PassArrangements = plan {
289 in_keys.clone()
290 } else {
291 AvailableCollections::new_raw()
292 };
293
294 // Even with a non-temporal MFP, we must propagate `has_future_updates`
295 // from the underlying binding — applying an MFP doesn't drop future-
296 // timestamped updates that already exist on the input.
297 let has_future_updates = self.has_future_updates.contains(id)
298 || match &plan {
299 GetPlan::Arrangement(_, _, mfp) | GetPlan::Collection(mfp) => {
300 mfp.has_temporal_predicates()
301 }
302 GetPlan::PassArrangements => false,
303 };
304
305 let lir_id = self.allocate_lir_id();
306 let node = PlanNode::Get {
307 id: id.clone(),
308 keys: in_keys,
309 plan,
310 };
311 // Return the plan, and any keys if an identity `mfp`.
312 LoweredExpr {
313 plan: node.as_plan(lir_id),
314 keys: out_keys,
315 has_future_updates,
316 }
317 }
318 MirRelationExpr::Let { id, value, body } => {
319 // It would be unfortunate to have a non-trivial `mfp` here, as we hope
320 // that they would be pushed down. I am not sure if we should take the
321 // initiative to push down the `mfp` ourselves.
322
323 // Plan the value using only the initial arrangements, but
324 // introduce any resulting arrangements bound to `id`.
325 let LoweredExpr {
326 plan: value,
327 keys: v_keys,
328 has_future_updates: v_future,
329 } = self.lower_mir_expr(value)?;
330 let pre_existing = self.arrangements.insert(Id::Local(*id), v_keys);
331 assert_none!(pre_existing);
332 if v_future {
333 self.has_future_updates.insert(Id::Local(*id));
334 }
335 // Plan the body using initial and `value` arrangements,
336 // and then remove reference to the value arrangements.
337 let LoweredExpr {
338 plan: body,
339 keys: b_keys,
340 has_future_updates: b_future,
341 } = self.lower_mir_expr(body)?;
342 self.arrangements.remove(&Id::Local(*id));
343 self.has_future_updates.remove(&Id::Local(*id));
344 // Return the plan, and any `body` arrangements.
345 let lir_id = self.allocate_lir_id();
346 LoweredExpr {
347 plan: PlanNode::Let {
348 id: id.clone(),
349 value: Box::new(value),
350 body: Box::new(body),
351 }
352 .as_plan(lir_id),
353 keys: b_keys,
354 has_future_updates: b_future,
355 }
356 }
357 MirRelationExpr::LetRec {
358 ids,
359 values,
360 limits,
361 body,
362 } => {
363 assert_eq!(ids.len(), values.len());
364 assert_eq!(ids.len(), limits.len());
365 // Plan the values using only the available arrangements, but
366 // introduce any resulting arrangements bound to each `id`.
367 // Arrangements made available cannot be used by prior bindings,
368 // as we cannot circulate an arrangement through a `Variable` yet.
369 let mut lir_values = Vec::with_capacity(values.len());
370 let mut any_v_future = false;
371 for (id, value) in ids.iter().zip_eq(values) {
372 let LoweredExpr {
373 plan: mut lir_value,
374 keys: mut v_keys,
375 has_future_updates: v_future,
376 } = self.lower_mir_expr(value)?;
377 any_v_future |= v_future;
378 // If `v_keys` does not contain an unarranged collection, we must form it.
379 if !v_keys.raw {
380 // Choose an "arbitrary" arrangement; TODO: prefer a specific one.
381 let (input_key, permutation, thinning) =
382 v_keys.arbitrary_arrangement().unwrap();
383 let mut input_mfp = MapFilterProject::new(value.arity());
384 input_mfp.permute_fn(|c| permutation[c], thinning.len() + input_key.len());
385 let input_key = Some(input_key.clone());
386
387 let forms = AvailableCollections::new_raw();
388
389 // We just want to insert an `ArrangeBy` to form an unarranged collection,
390 // but there is a complication: We shouldn't break the invariant (created by
391 // `NormalizeLets`, and relied upon by the rendering) that there isn't
392 // anything between two `LetRec`s. So if `lir_value` is itself a `LetRec`,
393 // then we insert the `ArrangeBy` on the `body` of the inner `LetRec`,
394 // instead of on top of the inner `LetRec`.
395 //
396 // We forward `v_future` for honesty; bucketing has no observable effect
397 // inside an iterative scope, but the field should reflect reality.
398 lir_value = match lir_value {
399 Plan {
400 node:
401 PlanNode::LetRec {
402 ids,
403 values,
404 limits,
405 body,
406 },
407 lir_id,
408 } => {
409 let inner_lir_id = self.allocate_lir_id();
410 PlanNode::LetRec {
411 ids,
412 values,
413 limits,
414 body: Box::new(
415 PlanNode::ArrangeBy {
416 input_key,
417 input: body,
418 input_mfp,
419 forms,
420 strategy: strategy_from_future(v_future),
421 }
422 .as_plan(inner_lir_id),
423 ),
424 }
425 .as_plan(lir_id)
426 }
427 lir_value => {
428 let lir_id = self.allocate_lir_id();
429 PlanNode::ArrangeBy {
430 input_key,
431 input: Box::new(lir_value),
432 input_mfp,
433 forms,
434 strategy: strategy_from_future(v_future),
435 }
436 .as_plan(lir_id)
437 }
438 };
439 v_keys.raw = true;
440 }
441 let pre_existing = self.arrangements.insert(Id::Local(*id), v_keys);
442 assert_none!(pre_existing);
443 if v_future {
444 self.has_future_updates.insert(Id::Local(*id));
445 }
446 lir_values.push(lir_value);
447 }
448 // As we exit the iterative scope, we must leave all arrangements behind,
449 // as they reference a timestamp coordinate that must be stripped off.
450 for id in ids.iter() {
451 self.arrangements
452 .insert(Id::Local(*id), AvailableCollections::new_raw());
453 }
454 // Plan the body using initial and `value` arrangements,
455 // and then remove reference to the value arrangements.
456 let LoweredExpr {
457 plan: body,
458 keys: b_keys,
459 has_future_updates: b_future,
460 } = self.lower_mir_expr(body)?;
461 for id in ids.iter() {
462 self.arrangements.remove(&Id::Local(*id));
463 self.has_future_updates.remove(&Id::Local(*id));
464 }
465 // Return the plan, and any `body` arrangements.
466 //
467 // The body's `b_future` alone can under-report: an earlier binding may only
468 // inherit `has_future_updates` via a Variable to a *later* binding, which the
469 // sequential sweep can't observe at the time the earlier binding is lowered.
470 // A precise fix would require a fixpoint (or the MIR `Analysis` framework with
471 // a `true ⊑ false` lattice). As a cheap correct alternative, OR with the
472 // bindings' future flags: any cross-binding propagation must originate from a
473 // local temporal predicate inside *some* binding, so the OR captures it
474 // without forcing bucketing on a fully non-temporal LetRec.
475 let lir_id = self.allocate_lir_id();
476 LoweredExpr {
477 plan: PlanNode::LetRec {
478 ids: ids.clone(),
479 values: lir_values,
480 limits: limits.clone(),
481 body: Box::new(body),
482 }
483 .as_plan(lir_id),
484 keys: b_keys,
485 has_future_updates: b_future || any_v_future,
486 }
487 }
488 MirRelationExpr::FlatMap {
489 input: flat_map_input,
490 func,
491 exprs,
492 } => {
493 // A `FlatMap UnnestList` that comes after the `Reduce` of a window function can be
494 // fused into the lowered `Reduce`.
495 //
496 // In theory, we could have implemented this also as an MIR transform. However, this
497 // is more of a physical optimization, which are sometimes unpleasant to make a part
498 // of the MIR pipeline. The specific problem here with putting this into the MIR
499 // pipeline would be that we'd need to modify MIR's semantics: MIR's Reduce
500 // currently always emits exactly 1 row per group, but the fused Reduce-FlatMap can
501 // emit multiple rows per group. Such semantic changes of MIR are very scary, since
502 // various parts of the optimizer assume that Reduce emits only 1 row per group, and
503 // it would be very hard to hunt down all these parts. (For example, key inference
504 // infers the group key as a unique key.)
505 let fused_with_reduce = 'fusion: {
506 if !matches!(func, TableFunc::UnnestList { .. }) {
507 break 'fusion None;
508 }
509 // We might have a Project of a single col between the FlatMap and the
510 // Reduce. (It projects away the grouping keys of the Reduce, and keeps the
511 // result of the window function.)
512 let (maybe_reduce, num_grouping_keys) = if let MirRelationExpr::Project {
513 input: project_input,
514 outputs: projection,
515 } = &**flat_map_input
516 {
517 // We want this to be a single column, because we'll want to deal with only
518 // one aggregation in the `Reduce`. (The aggregation of a window function
519 // always stands alone currently: we plan them separately from other
520 // aggregations, and Reduces are never fused. When window functions are
521 // fused with each other, they end up in one aggregation. When there are
522 // multiple window functions in the same SELECT, but can't be fused, they
523 // end up in different Reduces.)
524 if let &[single_col] = &**projection {
525 (project_input, single_col)
526 } else {
527 break 'fusion None;
528 }
529 } else {
530 (flat_map_input, 0)
531 };
532 if let MirRelationExpr::Reduce {
533 input,
534 group_key,
535 aggregates,
536 monotonic,
537 expected_group_size,
538 } = &**maybe_reduce
539 {
540 if group_key.len() != num_grouping_keys
541 || aggregates.len() != 1
542 || !aggregates[0].func.can_fuse_with_unnest_list()
543 {
544 break 'fusion None;
545 }
546 // At the beginning, `non_fused_mfp_above_flat_map` will be the original MFP
547 // above the FlatMap. Later, we'll mutate this to be the residual MFP that
548 // didn't get fused into the `Reduce`.
549 let non_fused_mfp_above_flat_map = &mut mfp;
550 let reduce_output_arity = num_grouping_keys + 1;
551 // We are fusing away the list that the FlatMap would have been unnesting,
552 // so the column that had that list disappears, so we have to permute the
553 // MFP above the FlatMap with this column disappearance.
554 let tweaked_mfp = {
555 let mut mfp = non_fused_mfp_above_flat_map.clone();
556 if mfp.demand().contains(&0) {
557 // I don't think this can happen currently that this MFP would
558 // refer to the list column, because both the list column and the
559 // MFP were constructed by the HIR-to-MIR lowering, so it's not just
560 // some random MFP that we are seeing here. But anyhow, it's better
561 // to check this here for robustness against future code changes.
562 break 'fusion None;
563 }
564 let permutation: BTreeMap<_, _> =
565 (1..mfp.input_arity).map(|col| (col, col - 1)).collect();
566 mfp.permute_fn(|c| permutation[&c], mfp.input_arity - 1);
567 mfp
568 };
569 // We now put together the project that was before the FlatMap, and the
570 // tweaked version of the MFP that was after the FlatMap.
571 // (Part of this MFP might be fused into the Reduce.)
572 let mut project_and_tweaked_mfp = {
573 let mut mfp = MapFilterProject::new(reduce_output_arity);
574 mfp = mfp.project(vec![num_grouping_keys]);
575 mfp = MapFilterProject::compose(mfp, tweaked_mfp);
576 mfp
577 };
578 let fused = self.lower_reduce(
579 input,
580 group_key,
581 aggregates,
582 monotonic,
583 expected_group_size,
584 &mut project_and_tweaked_mfp,
585 true,
586 )?;
587 // Update the residual MFP.
588 *non_fused_mfp_above_flat_map = project_and_tweaked_mfp;
589 Some(fused)
590 } else {
591 break 'fusion None;
592 }
593 };
594 if let Some(fused_with_reduce) = fused_with_reduce {
595 fused_with_reduce
596 } else {
597 // Couldn't fuse it with a `Reduce`, so lower as a normal `FlatMap`.
598 let LoweredExpr {
599 plan: input,
600 keys,
601 has_future_updates: input_future,
602 } = self.lower_mir_expr(flat_map_input)?;
603 // This stage can absorb arbitrary MFP instances.
604 let mfp = mfp.take();
605 let mut exprs = exprs.clone();
606 let input_key = if let Some((k, permutation, _)) = keys.arbitrary_arrangement()
607 {
608 // We don't permute the MFP here, because it runs _after_ the table function,
609 // whose output is in a fixed order.
610 //
611 // We _do_, however, need to permute the `expr`s that provide input to the
612 // `func`.
613 let permutation = permutation.iter().cloned().enumerate().collect();
614 for expr in &mut exprs {
615 expr.permute_map(&permutation);
616 }
617
618 Some(k.clone())
619 } else {
620 None
621 };
622
623 let lir_id = self.allocate_lir_id();
624 // The absorbed `mfp` may contain temporal predicates, which can
625 // introduce future-stamped updates that aren't present on the input.
626 let has_future_updates = input_future || mfp.has_temporal_predicates();
627 // Return the plan, and no arrangements.
628 LoweredExpr {
629 plan: PlanNode::FlatMap {
630 input_key,
631 input: Box::new(input),
632 exprs: exprs.clone(),
633 func: func.clone(),
634 mfp_after: mfp,
635 }
636 .as_plan(lir_id),
637 keys: AvailableCollections::new_raw(),
638 has_future_updates,
639 }
640 }
641 }
642 MirRelationExpr::Join {
643 inputs,
644 equivalences,
645 implementation,
646 } => {
647 // Plan each of the join inputs independently.
648 // The `plans` get surfaced upwards, and the `input_keys` should
649 // be used as part of join planning / to validate the existing
650 // plans / to aid in indexed seeding of update streams.
651 let mut plans = Vec::new();
652 let mut input_keys = Vec::new();
653 let mut input_arities = Vec::new();
654 let mut input_futures = Vec::new();
655 for input in inputs.iter() {
656 let LoweredExpr {
657 plan,
658 keys,
659 has_future_updates: input_future,
660 } = self.lower_mir_expr(input)?;
661 input_arities.push(input.arity());
662 plans.push(plan);
663 input_keys.push(keys);
664 input_futures.push(input_future);
665 }
666 let any_input_future = input_futures.iter().any(|&f| f);
667
668 let input_mapper =
669 JoinInputMapper::new_from_input_arities(input_arities.iter().copied());
670
671 // Extract temporal predicates as joins cannot currently absorb them.
672 let (plan, missing) = match implementation {
673 IndexedFilter(_coll_id, _idx_id, key, _val) => {
674 // Start with the constant input. (This used to be important before database-issues#4016
675 // was fixed.)
676 let start: usize = 1;
677 let order = vec![(0usize, key.clone(), None)];
678 // All columns of the constant input will be part of the arrangement key.
679 let source_arrangement = (
680 (0..key.len())
681 .map(MirScalarExpr::column)
682 .collect::<Vec<_>>(),
683 (0..key.len()).collect::<Vec<_>>(),
684 Vec::<usize>::new(),
685 );
686 let (ljp, missing) = LinearJoinPlan::create_from(
687 start,
688 Some(&source_arrangement),
689 equivalences,
690 &order,
691 input_mapper,
692 &mut mfp,
693 &input_keys,
694 );
695 (JoinPlan::Linear(ljp), missing)
696 }
697 Differential((start, start_arr, _start_characteristic), order) => {
698 let source_arrangement = start_arr.as_ref().and_then(|key| {
699 input_keys[*start]
700 .arranged
701 .iter()
702 .find(|(k, _, _)| k == key)
703 .clone()
704 });
705 let (ljp, missing) = LinearJoinPlan::create_from(
706 *start,
707 source_arrangement,
708 equivalences,
709 order,
710 input_mapper,
711 &mut mfp,
712 &input_keys,
713 );
714 (JoinPlan::Linear(ljp), missing)
715 }
716 DeltaQuery(orders) => {
717 let (djp, missing) = DeltaJoinPlan::create_from(
718 equivalences,
719 orders,
720 input_mapper,
721 &mut mfp,
722 &input_keys,
723 );
724 (JoinPlan::Delta(djp), missing)
725 }
726 // Other plans are errors, and should be reported as such.
727 Unimplemented => return Err("unimplemented join".to_string()),
728 };
729 // The renderer will expect certain arrangements to exist; if any of those are not available, the join planning functions above should have returned them in
730 // `missing`. We thus need to plan them here so they'll exist.
731 let is_delta = matches!(plan, JoinPlan::Delta(_));
732 for ((((input_plan, input_keys), missing), arity), input_future) in plans
733 .iter_mut()
734 .zip_eq(input_keys.iter())
735 .zip_eq(missing)
736 .zip_eq(input_arities.iter().cloned())
737 .zip_eq(input_futures.iter().copied())
738 {
739 if missing != Default::default() {
740 if is_delta {
741 // join_implementation.rs produced a sub-optimal plan here;
742 // we shouldn't plan delta joins at all if not all of the required
743 // arrangements are available. Soft panic in CI and log an error in
744 // production to increase the chances that we will catch all situations
745 // that violate this constraint.
746 soft_panic_or_log!("Arrangements depended on by delta join alarmingly absent: {:?}
747Dataflow info: {}
748This is not expected to cause incorrect results, but could indicate a performance issue in Materialize.", missing, self.debug_info);
749 } else {
750 soft_panic_or_log!("Arrangements depended on by a non-delta join are absent: {:?}
751Dataflow info: {}
752This is not expected to cause incorrect results, but could indicate a performance issue in Materialize.", missing, self.debug_info);
753 // Nowadays MIR transforms take care to insert MIR ArrangeBys for each
754 // Join input. (Earlier, they were missing in the following cases:
755 // - They were const-folded away for constant inputs. This is not
756 // happening since
757 // https://github.com/MaterializeInc/materialize/pull/16351
758 // - They were not being inserted for the constant input of
759 // `IndexedFilter`s. This was fixed in
760 // https://github.com/MaterializeInc/materialize/pull/20920
761 // - They were not being inserted for the first input of Differential
762 // joins. This was fixed in
763 // https://github.com/MaterializeInc/materialize/pull/16099)
764 }
765 let lir_id = self.allocate_lir_id();
766 let raw_plan = std::mem::replace(
767 input_plan,
768 PlanNode::Constant {
769 rows: Ok(Vec::new()),
770 }
771 .as_plan(lir_id),
772 );
773 *input_plan =
774 self.arrange_by(raw_plan, missing, input_keys, arity, input_future);
775 }
776 }
777 // Return the plan, and no arrangements.
778 // Both linear and delta join planning extract temporal predicates back into the
779 // residual `mfp` (see `LinearJoinPlan::create_from` / `DeltaJoinPlan::create_from`),
780 // so the absorbed MFP cannot introduce future updates — the join's output future
781 // flag is just the OR of its inputs.
782 let lir_id = self.allocate_lir_id();
783 LoweredExpr {
784 plan: PlanNode::Join {
785 inputs: plans,
786 plan,
787 }
788 .as_plan(lir_id),
789 keys: AvailableCollections::new_raw(),
790 has_future_updates: any_input_future,
791 }
792 }
793 MirRelationExpr::Reduce {
794 input,
795 group_key,
796 aggregates,
797 monotonic,
798 expected_group_size,
799 } => {
800 if aggregates
801 .iter()
802 .any(|agg| agg.func.can_fuse_with_unnest_list())
803 {
804 // This case should have been handled at the `MirRelationExpr::FlatMap` case
805 // above. But that has a pretty complicated pattern matching, so it's not
806 // unthinkable that it fails.
807 soft_panic_or_log!(
808 "Window function performance issue: `reduce_unnest_list_fusion` failed"
809 );
810 }
811 self.lower_reduce(
812 input,
813 group_key,
814 aggregates,
815 monotonic,
816 expected_group_size,
817 &mut mfp,
818 false,
819 )?
820 }
821 MirRelationExpr::TopK {
822 input,
823 group_key,
824 order_key,
825 limit,
826 offset,
827 monotonic,
828 expected_group_size,
829 } => {
830 let arity = input.arity();
831 let LoweredExpr {
832 plan: input,
833 keys,
834 has_future_updates: input_future,
835 } = self.lower_mir_expr(input)?;
836
837 let top_k_plan = TopKPlan::create_from(
838 group_key.clone(),
839 order_key.clone(),
840 *offset,
841 limit.clone(),
842 arity,
843 *monotonic,
844 *expected_group_size,
845 );
846
847 // We don't have an MFP here -- install an operator to permute the
848 // input, if necessary.
849 let input = if !keys.raw {
850 self.arrange_by(
851 input,
852 AvailableCollections::new_raw(),
853 &keys,
854 arity,
855 input_future,
856 )
857 } else {
858 input
859 };
860 // Return the plan, and no arrangements.
861 let lir_id = self.allocate_lir_id();
862 LoweredExpr {
863 plan: PlanNode::TopK {
864 input: Box::new(input),
865 top_k_plan,
866 }
867 .as_plan(lir_id),
868 keys: AvailableCollections::new_raw(),
869 has_future_updates: input_future,
870 }
871 }
872 MirRelationExpr::Negate { input } => {
873 let arity = input.arity();
874 let LoweredExpr {
875 plan: input,
876 keys,
877 has_future_updates: input_future,
878 } = self.lower_mir_expr(input)?;
879
880 // We don't have an MFP here -- install an operator to permute the
881 // input, if necessary.
882 let input = if !keys.raw {
883 self.arrange_by(
884 input,
885 AvailableCollections::new_raw(),
886 &keys,
887 arity,
888 input_future,
889 )
890 } else {
891 input
892 };
893 // Return the plan, and no arrangements.
894 let lir_id = self.allocate_lir_id();
895 LoweredExpr {
896 plan: PlanNode::Negate {
897 input: Box::new(input),
898 }
899 .as_plan(lir_id),
900 keys: AvailableCollections::new_raw(),
901 has_future_updates: input_future,
902 }
903 }
904 MirRelationExpr::Threshold { input } => {
905 let LoweredExpr {
906 plan,
907 keys,
908 has_future_updates: input_future,
909 } = self.lower_mir_expr(input)?;
910 let arity = input.arity();
911 let (threshold_plan, required_arrangement) = ThresholdPlan::create_from(arity);
912 let plan = if !keys
913 .arranged
914 .iter()
915 .any(|(key, _, _)| key == &required_arrangement.0)
916 {
917 self.arrange_by(
918 plan,
919 AvailableCollections::new_arranged(vec![required_arrangement]),
920 &keys,
921 arity,
922 input_future,
923 )
924 } else {
925 plan
926 };
927
928 let output_keys = threshold_plan.keys();
929 // Return the plan, and any produced keys.
930 let lir_id = self.allocate_lir_id();
931 LoweredExpr {
932 plan: PlanNode::Threshold {
933 input: Box::new(plan),
934 threshold_plan,
935 }
936 .as_plan(lir_id),
937 keys: output_keys,
938 has_future_updates: input_future,
939 }
940 }
941 MirRelationExpr::Union { base, inputs } => {
942 let arity = base.arity();
943 let mut lowered_inputs = Vec::with_capacity(1 + inputs.len());
944 lowered_inputs.push(self.lower_mir_expr(base)?);
945 for input in inputs.iter() {
946 lowered_inputs.push(self.lower_mir_expr(input)?);
947 }
948 let any_future = lowered_inputs.iter().any(|l| l.has_future_updates);
949 let plans = lowered_inputs
950 .into_iter()
951 .map(
952 |LoweredExpr {
953 plan,
954 keys,
955 has_future_updates: future,
956 }| {
957 // We don't have an MFP here -- install an operator to permute the
958 // input, if necessary.
959 if !keys.raw {
960 self.arrange_by(
961 plan,
962 AvailableCollections::new_raw(),
963 &keys,
964 arity,
965 future,
966 )
967 } else {
968 plan
969 }
970 },
971 )
972 .collect();
973 // Return the plan and no arrangements.
974 let lir_id = self.allocate_lir_id();
975 LoweredExpr {
976 plan: PlanNode::Union {
977 inputs: plans,
978 consolidate_output: false,
979 }
980 .as_plan(lir_id),
981 keys: AvailableCollections::new_raw(),
982 has_future_updates: any_future,
983 }
984 }
985 MirRelationExpr::ArrangeBy { input, keys } => {
986 let input_mir = input;
987 let LoweredExpr {
988 plan: input,
989 keys: mut input_keys,
990 has_future_updates: future,
991 } = self.lower_mir_expr(input)?;
992 // Fill the `types` in `input_keys` if not already present.
993 let arity = input_mir.arity();
994
995 // Determine keys that are not present in `input_keys`.
996 let new_keys = keys
997 .iter()
998 .filter(|k1| !input_keys.arranged.iter().any(|(k2, _, _)| k1 == &k2))
999 .cloned()
1000 .collect::<Vec<_>>();
1001 if new_keys.is_empty() {
1002 LoweredExpr {
1003 plan: input,
1004 keys: input_keys,
1005 has_future_updates: future,
1006 }
1007 } else {
1008 let mut new_keys = new_keys
1009 .iter()
1010 .cloned()
1011 .map(|k| {
1012 let (permutation, thinning) = permutation_for_arrangement(&k, arity);
1013 (k, permutation, thinning)
1014 })
1015 .collect::<Vec<_>>();
1016 let forms = AvailableCollections {
1017 raw: input_keys.raw,
1018 arranged: new_keys.clone(),
1019 };
1020 let (input_key, input_mfp) = if let Some((input_key, permutation, thinning)) =
1021 input_keys.arbitrary_arrangement()
1022 {
1023 let mut mfp = MapFilterProject::new(arity);
1024 mfp.permute_fn(|c| permutation[c], thinning.len() + input_key.len());
1025 (Some(input_key.clone()), mfp)
1026 } else {
1027 (None, MapFilterProject::new(arity))
1028 };
1029 input_keys.arranged.append(&mut new_keys);
1030 input_keys.arranged.sort_by(|k1, k2| k1.0.cmp(&k2.0));
1031
1032 // Return the plan and extended keys.
1033 let lir_id = self.allocate_lir_id();
1034 LoweredExpr {
1035 plan: PlanNode::ArrangeBy {
1036 input_key,
1037 input: Box::new(input),
1038 input_mfp,
1039 forms,
1040 strategy: strategy_from_future(future),
1041 }
1042 .as_plan(lir_id),
1043 keys: input_keys,
1044 has_future_updates: future,
1045 }
1046 }
1047 }
1048 };
1049
1050 // If the plan stage did not absorb all linear operators, introduce a new stage to implement them.
1051 if !mfp.is_identity() {
1052 // Check if this MFP introduces future updates.
1053 let mfp_is_temporal = mfp.has_temporal_predicates();
1054 has_future_updates = has_future_updates || mfp_is_temporal;
1055 // Seek out an arrangement key that might be constrained to a literal.
1056 // TODO: Improve key selection heuristic.
1057 let key_val = keys
1058 .arranged
1059 .iter()
1060 .filter_map(|(key, permutation, thinning)| {
1061 let mut mfp = mfp.clone();
1062 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
1063 mfp.literal_constraints(key)
1064 .map(|val| (key.clone(), permutation, thinning, val))
1065 })
1066 .max_by_key(|(key, _, _, _)| key.len());
1067
1068 // Input key selection strategy:
1069 // (1) If we can read a key at a particular value, do so
1070 // (2) Otherwise, if there is a key that causes the MFP to be the identity, and
1071 // therefore allows us to avoid discarding the arrangement, use that.
1072 // (3) Otherwise, if there is _some_ key, use that,
1073 // (4) Otherwise just read the raw collection.
1074 let input_key_val = if let Some((key, permutation, thinning, val)) = key_val {
1075 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
1076
1077 Some((key, Some(val)))
1078 } else if let Some((key, permutation, thinning)) =
1079 keys.arranged.iter().find(|(key, permutation, thinning)| {
1080 let mut mfp = mfp.clone();
1081 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
1082 mfp.is_identity()
1083 })
1084 {
1085 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
1086 Some((key.clone(), None))
1087 } else if let Some((key, permutation, thinning)) = keys.arbitrary_arrangement() {
1088 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
1089 Some((key.clone(), None))
1090 } else {
1091 None
1092 };
1093
1094 if mfp.is_identity() {
1095 // We have discovered a key
1096 // whose permutation causes the MFP to actually
1097 // be the identity! We can keep it around,
1098 // but without its permutation this time,
1099 // and with a trivial thinning of the right length.
1100 let (key, val) = input_key_val.unwrap();
1101 let (_old_key, old_permutation, old_thinning) = keys
1102 .arranged
1103 .iter_mut()
1104 .find(|(key2, _, _)| key2 == &key)
1105 .unwrap();
1106 *old_permutation = (0..mfp.input_arity).collect();
1107 let old_thinned_arity = old_thinning.len();
1108 *old_thinning = (0..old_thinned_arity).collect();
1109 // Get rid of all other forms, as this is now the only one known to be valid.
1110 // TODO[btv] we can probably save the other arrangements too, if we adjust their permutations.
1111 // This is not hard to do, but leaving it for a quick follow-up to avoid making the present diff too unwieldy.
1112 keys.arranged.retain(|(key2, _, _)| key2 == &key);
1113 keys.raw = false;
1114
1115 // Creating a Plan::Mfp node is now logically unnecessary, but we
1116 // should do so anyway when `val` is populated, so that
1117 // the `key_val` optimization gets applied.
1118 let lir_id = self.allocate_lir_id();
1119 if val.is_some() {
1120 plan = PlanNode::Mfp {
1121 input: Box::new(plan),
1122 mfp,
1123 input_key_val: Some((key, val)),
1124 }
1125 .as_plan(lir_id)
1126 }
1127 } else {
1128 let lir_id = self.allocate_lir_id();
1129 plan = PlanNode::Mfp {
1130 input: Box::new(plan),
1131 mfp,
1132 input_key_val,
1133 }
1134 .as_plan(lir_id);
1135 keys = AvailableCollections::new_raw();
1136 }
1137 }
1138
1139 Ok(LoweredExpr {
1140 plan,
1141 keys,
1142 has_future_updates,
1143 })
1144 }
1145
1146 /// Lowers a `Reduce` with the given fields and an `mfp_on_top`, which is the MFP that is
1147 /// originally on top of the `Reduce`. This MFP, or a part of it, might be fused into the
1148 /// `Reduce`, in which case `mfp_on_top` is mutated to be the residual MFP, i.e., what was not
1149 /// fused.
1150 fn lower_reduce(
1151 &mut self,
1152 input: &MirRelationExpr,
1153 group_key: &Vec<MirScalarExpr>,
1154 aggregates: &Vec<AggregateExpr>,
1155 monotonic: &bool,
1156 expected_group_size: &Option<u64>,
1157 mfp_on_top: &mut MapFilterProject,
1158 fused_unnest_list: bool,
1159 ) -> Result<LoweredExpr, String> {
1160 let input_arity = input.arity();
1161 let LoweredExpr {
1162 plan: input,
1163 keys,
1164 has_future_updates: input_future,
1165 } = self.lower_mir_expr(input)?;
1166 let (input_key, permutation_and_new_arity) =
1167 if let Some((input_key, permutation, thinning)) = keys.arbitrary_arrangement() {
1168 (
1169 Some(input_key.clone()),
1170 Some((permutation.clone(), thinning.len() + input_key.len())),
1171 )
1172 } else {
1173 (None, None)
1174 };
1175 let key_val_plan = KeyValPlan::new(
1176 input_arity,
1177 group_key,
1178 aggregates,
1179 permutation_and_new_arity,
1180 );
1181 let reduce_plan = ReducePlan::create_from(
1182 aggregates.clone(),
1183 *monotonic,
1184 *expected_group_size,
1185 fused_unnest_list,
1186 );
1187 // Return the plan, and the keys it produces.
1188 let mfp_after;
1189 let output_arity;
1190 if self.enable_reduce_mfp_fusion {
1191 (mfp_after, *mfp_on_top, output_arity) =
1192 reduce_plan.extract_mfp_after(mfp_on_top.clone(), group_key.len());
1193 } else {
1194 (mfp_after, output_arity) = (
1195 MapFilterProject::new(mfp_on_top.input_arity),
1196 group_key.len() + aggregates.len(),
1197 );
1198 }
1199 soft_assert_eq_or_log!(
1200 mfp_on_top.input_arity,
1201 output_arity,
1202 "Output arity of reduce must match input arity for MFP on top of it"
1203 );
1204 let output_keys = reduce_plan.keys(group_key.len(), output_arity);
1205 let lir_id = self.allocate_lir_id();
1206 // `extract_mfp_after` strips temporal predicates back into `*mfp_on_top` (the residual
1207 // MFP installed above the reduce), so `mfp_after` is non-temporal and cannot introduce
1208 // future updates. The output's future flag is just whatever the input had.
1209 Ok(LoweredExpr {
1210 plan: PlanNode::Reduce {
1211 input_key,
1212 input: Box::new(input),
1213 key_val_plan,
1214 plan: reduce_plan,
1215 mfp_after,
1216 }
1217 .as_plan(lir_id),
1218 keys: output_keys,
1219 has_future_updates: input_future,
1220 })
1221 }
1222
1223 /// Replace the plan with another one
1224 /// that has the collection in some additional forms.
1225 pub fn arrange_by(
1226 &mut self,
1227 plan: Plan,
1228 collections: AvailableCollections,
1229 old_collections: &AvailableCollections,
1230 arity: usize,
1231 has_future_updates: bool,
1232 ) -> Plan {
1233 if let Plan {
1234 node:
1235 PlanNode::ArrangeBy {
1236 input_key,
1237 input,
1238 input_mfp,
1239 mut forms,
1240 strategy,
1241 },
1242 lir_id,
1243 } = plan
1244 {
1245 forms.raw |= collections.raw;
1246 forms.arranged.extend(collections.arranged);
1247 forms.arranged.sort_by(|k1, k2| k1.0.cmp(&k2.0));
1248 forms.arranged.dedup_by(|k1, k2| k1.0 == k2.0);
1249 PlanNode::ArrangeBy {
1250 input_key,
1251 input,
1252 input_mfp,
1253 forms,
1254 strategy,
1255 }
1256 .as_plan(lir_id)
1257 } else {
1258 let (input_key, input_mfp) = if let Some((input_key, permutation, thinning)) =
1259 old_collections.arbitrary_arrangement()
1260 {
1261 let mut mfp = MapFilterProject::new(arity);
1262 mfp.permute_fn(|c| permutation[c], thinning.len() + input_key.len());
1263 (Some(input_key.clone()), mfp)
1264 } else {
1265 (None, MapFilterProject::new(arity))
1266 };
1267 let lir_id = self.allocate_lir_id();
1268
1269 PlanNode::ArrangeBy {
1270 input_key,
1271 input: Box::new(plan),
1272 input_mfp,
1273 forms: collections,
1274 strategy: strategy_from_future(has_future_updates),
1275 }
1276 .as_plan(lir_id)
1277 }
1278 }
1279}
1280
1281/// Various bits of state to print along with error messages during LIR planning,
1282/// to aid debugging.
1283#[derive(Clone, Debug)]
1284pub struct LirDebugInfo {
1285 debug_name: String,
1286 id: GlobalId,
1287}
1288
1289impl std::fmt::Display for LirDebugInfo {
1290 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1291 write!(f, "Debug name: {}; id: {}", self.debug_name, self.id)
1292 }
1293}