mz_compute_types/plan/lowering.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Lowering [`DataflowDescription`]s from MIR ([`MirRelationExpr`]) to LIR ([`Plan`]).
11
12use std::collections::BTreeMap;
13
14use columnar::Len;
15use mz_expr::JoinImplementation::{DeltaQuery, Differential, IndexedFilter, Unimplemented};
16use mz_expr::{
17 AggregateExpr, Id, JoinInputMapper, MapFilterProject, MirRelationExpr, MirScalarExpr,
18 OptimizedMirRelationExpr, TableFunc, permutation_for_arrangement,
19};
20use mz_ore::{assert_none, soft_assert_eq_or_log, soft_panic_or_log};
21use mz_repr::GlobalId;
22use mz_repr::optimize::OptimizerFeatures;
23use timely::Container;
24use timely::progress::Timestamp;
25
26use crate::dataflows::{BuildDesc, DataflowDescription, IndexImport};
27use crate::plan::join::{DeltaJoinPlan, JoinPlan, LinearJoinPlan};
28use crate::plan::reduce::{KeyValPlan, ReducePlan};
29use crate::plan::threshold::ThresholdPlan;
30use crate::plan::top_k::TopKPlan;
31use crate::plan::{AvailableCollections, GetPlan, LirId, Plan, PlanNode};
32
33pub(super) struct Context {
34 /// Known bindings to (possibly arranged) collections.
35 arrangements: BTreeMap<Id, AvailableCollections>,
36 /// Tracks the next available `LirId`.
37 next_lir_id: LirId,
38 /// Information to print along with error messages.
39 debug_info: LirDebugInfo,
40 /// Whether to enable fusion of MFPs in reductions.
41 enable_reduce_mfp_fusion: bool,
42}
43
44impl Context {
45 pub fn new(debug_name: String, features: &OptimizerFeatures) -> Self {
46 Self {
47 arrangements: Default::default(),
48 next_lir_id: LirId(1),
49 debug_info: LirDebugInfo {
50 debug_name,
51 id: GlobalId::Transient(0),
52 },
53 enable_reduce_mfp_fusion: features.enable_reduce_mfp_fusion,
54 }
55 }
56
57 fn allocate_lir_id(&mut self) -> LirId {
58 let id = self.next_lir_id;
59 self.next_lir_id = LirId(
60 self.next_lir_id
61 .0
62 .checked_add(1)
63 .expect("No LirId overflow"),
64 );
65 id
66 }
67
68 pub fn lower<T: Timestamp>(
69 mut self,
70 desc: DataflowDescription<OptimizedMirRelationExpr>,
71 ) -> Result<DataflowDescription<Plan<T>>, String> {
72 // Sources might provide arranged forms of their data, in the future.
73 // Indexes provide arranged forms of their data.
74 for IndexImport {
75 desc: index_desc,
76 typ,
77 ..
78 } in desc.index_imports.values()
79 {
80 let key = index_desc.key.clone();
81 // TODO[btv] - We should be told the permutation by
82 // `index_desc`, and it should have been generated
83 // at the same point the thinning logic was.
84 //
85 // We should for sure do that soon, but it requires
86 // a bit of a refactor, so for now we just
87 // _assume_ that they were both generated by `permutation_for_arrangement`,
88 // and recover it here.
89 let (permutation, thinning) = permutation_for_arrangement(&key, typ.arity());
90 let index_keys = self
91 .arrangements
92 .entry(Id::Global(index_desc.on_id))
93 .or_insert_with(AvailableCollections::default);
94 index_keys.arranged.push((key, permutation, thinning));
95 }
96 for id in desc.source_imports.keys() {
97 self.arrangements
98 .entry(Id::Global(*id))
99 .or_insert_with(AvailableCollections::new_raw);
100 }
101
102 // Build each object in order, registering the arrangements it forms.
103 let mut objects_to_build = Vec::with_capacity(desc.objects_to_build.len());
104 for build in desc.objects_to_build {
105 self.debug_info.id = build.id;
106 let (plan, keys) = self.lower_mir_expr(&build.plan)?;
107
108 self.arrangements.insert(Id::Global(build.id), keys);
109 objects_to_build.push(BuildDesc { id: build.id, plan });
110 }
111
112 Ok(DataflowDescription {
113 source_imports: desc.source_imports,
114 index_imports: desc.index_imports,
115 objects_to_build,
116 index_exports: desc.index_exports,
117 sink_exports: desc.sink_exports,
118 as_of: desc.as_of,
119 until: desc.until,
120 initial_storage_as_of: desc.initial_storage_as_of,
121 refresh_schedule: desc.refresh_schedule,
122 debug_name: desc.debug_name,
123 time_dependence: desc.time_dependence,
124 })
125 }
126
127 /// This method converts a MirRelationExpr into a plan that can be directly rendered.
128 ///
129 /// The rough structure is that we repeatedly extract map/filter/project operators
130 /// from each expression we see, bundle them up as a `MapFilterProject` object, and
131 /// then produce a plan for the combination of that with the next operator.
132 ///
133 /// The method accesses `self.arrangements`, which it will locally add to and remove from for
134 /// `Let` bindings (by the end of the call it should contain the same bindings as when it
135 /// started).
136 ///
137 /// The result of the method is both a `Plan`, but also a list of arrangements that
138 /// are certain to be produced, which can be relied on by the next steps in the plan.
139 /// Each of the arrangement keys is associated with an MFP that must be applied if that
140 /// arrangement is used, to back out the permutation associated with that arrangement.
141 ///
142 /// An empty list of arrangement keys indicates that only a `Collection` stream can
143 /// be assumed to exist.
144 fn lower_mir_expr<T: Timestamp>(
145 &mut self,
146 expr: &MirRelationExpr,
147 ) -> Result<(Plan<T>, AvailableCollections), String> {
148 // This function is recursive and can overflow its stack, so grow it if
149 // needed. The growth here is unbounded. Our general solution for this problem
150 // is to use [`ore::stack::RecursionGuard`] to additionally limit the stack
151 // depth. That however requires upstream error handling. This function is
152 // currently called by the Coordinator after calls to `catalog_transact`,
153 // and thus are not allowed to fail. Until that allows errors, we choose
154 // to allow the unbounded growth here. We are though somewhat protected by
155 // higher levels enforcing their own limits on stack depth (in the parser,
156 // transformer/desugarer, and planner).
157 mz_ore::stack::maybe_grow(|| self.lower_mir_expr_stack_safe(expr))
158 }
159
160 fn lower_mir_expr_stack_safe<T>(
161 &mut self,
162 expr: &MirRelationExpr,
163 ) -> Result<(Plan<T>, AvailableCollections), String>
164 where
165 T: Timestamp,
166 {
167 // Extract a maximally large MapFilterProject from `expr`.
168 // We will then try and push this in to the resulting expression.
169 //
170 // Importantly, `mfp` may contain temporal operators and not be a "safe" MFP.
171 // While we would eventually like all plan stages to be able to absorb such
172 // general operators, not all of them can.
173 let (mut mfp, expr) = MapFilterProject::extract_from_expression(expr);
174 // We attempt to plan what we have remaining, in the context of `mfp`.
175 // We may not be able to do this, and must wrap some operators with a `Mfp` stage.
176 let (mut plan, mut keys) = match expr {
177 // These operators should have been extracted from the expression.
178 MirRelationExpr::Map { .. } => {
179 panic!("This operator should have been extracted");
180 }
181 MirRelationExpr::Filter { .. } => {
182 panic!("This operator should have been extracted");
183 }
184 MirRelationExpr::Project { .. } => {
185 panic!("This operator should have been extracted");
186 }
187 // These operators may not have been extracted, and need to result in a `Plan`.
188 MirRelationExpr::Constant { rows, typ: _ } => {
189 let lir_id = self.allocate_lir_id();
190 let node = PlanNode::Constant {
191 rows: rows.clone().map(|rows| {
192 rows.into_iter()
193 .map(|(row, diff)| (row, T::minimum(), diff))
194 .collect()
195 }),
196 };
197 // The plan, not arranged in any way.
198 (node.as_plan(lir_id), AvailableCollections::new_raw())
199 }
200 MirRelationExpr::Get { id, typ: _, .. } => {
201 // This stage can absorb arbitrary MFP operators.
202 let mut mfp = mfp.take();
203 // If `mfp` is the identity, we can surface all imported arrangements.
204 // Otherwise, we apply `mfp` and promise no arrangements.
205 let mut in_keys = self
206 .arrangements
207 .get(id)
208 .cloned()
209 .unwrap_or_else(AvailableCollections::new_raw);
210
211 // Seek out an arrangement key that might be constrained to a literal.
212 // Note: this code has very little use nowadays, as its job was mostly taken over
213 // by `LiteralConstraints` (see in the below longer comment).
214 let key_val = in_keys
215 .arranged
216 .iter()
217 .filter_map(|key| {
218 mfp.literal_constraints(&key.0)
219 .map(|val| (key.clone(), val))
220 })
221 .max_by_key(|(key, _val)| key.0.len());
222
223 // Determine the plan of action for the `Get` stage.
224 let plan = if let Some(((key, permutation, thinning), val)) = &key_val {
225 // This code path used to handle looking up literals from indexes, but it's
226 // mostly deprecated, as this is nowadays performed by the `LiteralConstraints`
227 // MIR transform instead. However, it's still called in a couple of tricky
228 // special cases:
229 // - `LiteralConstraints` handles only Gets of global ids, so this code still
230 // gets to handle Filters on top of Gets of local ids.
231 // - Lowering does a `MapFilterProject::extract_from_expression`, while
232 // `LiteralConstraints` does
233 // `MapFilterProject::extract_non_errors_from_expr_mut`.
234 // - It might happen that new literal constraint optimization opportunities
235 // appear somewhere near the end of the MIR optimizer after
236 // `LiteralConstraints` has already run.
237 // (Also note that a similar literal constraint handling machinery is also
238 // present when handling the leftover MFP after this big match.)
239 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
240 in_keys.arranged = vec![(key.clone(), permutation.clone(), thinning.clone())];
241 GetPlan::Arrangement(key.clone(), Some(val.clone()), mfp)
242 } else if !mfp.is_identity() {
243 // We need to ensure a collection exists, which means we must form it.
244 if let Some((key, permutation, thinning)) =
245 in_keys.arbitrary_arrangement().cloned()
246 {
247 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
248 in_keys.arranged = vec![(key.clone(), permutation, thinning)];
249 GetPlan::Arrangement(key, None, mfp)
250 } else {
251 GetPlan::Collection(mfp)
252 }
253 } else {
254 // By default, just pass input arrangements through.
255 GetPlan::PassArrangements
256 };
257
258 let out_keys = if let GetPlan::PassArrangements = plan {
259 in_keys.clone()
260 } else {
261 AvailableCollections::new_raw()
262 };
263
264 let lir_id = self.allocate_lir_id();
265 let node = PlanNode::Get {
266 id: id.clone(),
267 keys: in_keys,
268 plan,
269 };
270 // Return the plan, and any keys if an identity `mfp`.
271 (node.as_plan(lir_id), out_keys)
272 }
273 MirRelationExpr::Let { id, value, body } => {
274 // It would be unfortunate to have a non-trivial `mfp` here, as we hope
275 // that they would be pushed down. I am not sure if we should take the
276 // initiative to push down the `mfp` ourselves.
277
278 // Plan the value using only the initial arrangements, but
279 // introduce any resulting arrangements bound to `id`.
280 let (value, v_keys) = self.lower_mir_expr(value)?;
281 let pre_existing = self.arrangements.insert(Id::Local(*id), v_keys);
282 assert_none!(pre_existing);
283 // Plan the body using initial and `value` arrangements,
284 // and then remove reference to the value arrangements.
285 let (body, b_keys) = self.lower_mir_expr(body)?;
286 self.arrangements.remove(&Id::Local(*id));
287 // Return the plan, and any `body` arrangements.
288 let lir_id = self.allocate_lir_id();
289 (
290 PlanNode::Let {
291 id: id.clone(),
292 value: Box::new(value),
293 body: Box::new(body),
294 }
295 .as_plan(lir_id),
296 b_keys,
297 )
298 }
299 MirRelationExpr::LetRec {
300 ids,
301 values,
302 limits,
303 body,
304 } => {
305 assert_eq!(ids.len(), values.len());
306 assert_eq!(ids.len(), limits.len());
307 // Plan the values using only the available arrangements, but
308 // introduce any resulting arrangements bound to each `id`.
309 // Arrangements made available cannot be used by prior bindings,
310 // as we cannot circulate an arrangement through a `Variable` yet.
311 let mut lir_values = Vec::with_capacity(values.len());
312 for (id, value) in ids.iter().zip(values) {
313 let (mut lir_value, mut v_keys) = self.lower_mir_expr(value)?;
314 // If `v_keys` does not contain an unarranged collection, we must form it.
315 if !v_keys.raw {
316 // Choose an "arbitrary" arrangement; TODO: prefer a specific one.
317 let (input_key, permutation, thinning) =
318 v_keys.arbitrary_arrangement().unwrap();
319 let mut input_mfp = MapFilterProject::new(value.arity());
320 input_mfp.permute_fn(|c| permutation[c], thinning.len() + input_key.len());
321 let input_key = Some(input_key.clone());
322
323 let forms = AvailableCollections::new_raw();
324
325 // We just want to insert an `ArrangeBy` to form an unarranged collection,
326 // but there is a complication: We shouldn't break the invariant (created by
327 // `NormalizeLets`, and relied upon by the rendering) that there isn't
328 // anything between two `LetRec`s. So if `lir_value` is itself a `LetRec`,
329 // then we insert the `ArrangeBy` on the `body` of the inner `LetRec`,
330 // instead of on top of the inner `LetRec`.
331 lir_value = match lir_value {
332 Plan {
333 node:
334 PlanNode::LetRec {
335 ids,
336 values,
337 limits,
338 body,
339 },
340 lir_id,
341 } => {
342 let inner_lir_id = self.allocate_lir_id();
343 PlanNode::LetRec {
344 ids,
345 values,
346 limits,
347 body: Box::new(
348 PlanNode::ArrangeBy {
349 input_key,
350 input: body,
351 input_mfp,
352 forms,
353 }
354 .as_plan(inner_lir_id),
355 ),
356 }
357 .as_plan(lir_id)
358 }
359 lir_value => {
360 let lir_id = self.allocate_lir_id();
361 PlanNode::ArrangeBy {
362 input_key,
363 input: Box::new(lir_value),
364 input_mfp,
365 forms,
366 }
367 .as_plan(lir_id)
368 }
369 };
370 v_keys.raw = true;
371 }
372 let pre_existing = self.arrangements.insert(Id::Local(*id), v_keys);
373 assert_none!(pre_existing);
374 lir_values.push(lir_value);
375 }
376 // As we exit the iterative scope, we must leave all arrangements behind,
377 // as they reference a timestamp coordinate that must be stripped off.
378 for id in ids.iter() {
379 self.arrangements
380 .insert(Id::Local(*id), AvailableCollections::new_raw());
381 }
382 // Plan the body using initial and `value` arrangements,
383 // and then remove reference to the value arrangements.
384 let (body, b_keys) = self.lower_mir_expr(body)?;
385 for id in ids.iter() {
386 self.arrangements.remove(&Id::Local(*id));
387 }
388 // Return the plan, and any `body` arrangements.
389 let lir_id = self.allocate_lir_id();
390 (
391 PlanNode::LetRec {
392 ids: ids.clone(),
393 values: lir_values,
394 limits: limits.clone(),
395 body: Box::new(body),
396 }
397 .as_plan(lir_id),
398 b_keys,
399 )
400 }
401 MirRelationExpr::FlatMap {
402 input: flat_map_input,
403 func,
404 exprs,
405 } => {
406 // A `FlatMap UnnestList` that comes after the `Reduce` of a window function can be
407 // fused into the lowered `Reduce`.
408 //
409 // In theory, we could have implemented this also as an MIR transform. However, this
410 // is more of a physical optimization, which are sometimes unpleasant to make a part
411 // of the MIR pipeline. The specific problem here with putting this into the MIR
412 // pipeline would be that we'd need to modify MIR's semantics: MIR's Reduce
413 // currently always emits exactly 1 row per group, but the fused Reduce-FlatMap can
414 // emit multiple rows per group. Such semantic changes of MIR are very scary, since
415 // various parts of the optimizer assume that Reduce emits only 1 row per group, and
416 // it would be very hard to hunt down all these parts. (For example, key inference
417 // infers the group key as a unique key.)
418 let fused_with_reduce = 'fusion: {
419 if !matches!(func, TableFunc::UnnestList { .. }) {
420 break 'fusion None;
421 }
422 // We might have a Project of a single col between the FlatMap and the
423 // Reduce. (It projects away the grouping keys of the Reduce, and keeps the
424 // result of the window function.)
425 let (maybe_reduce, num_grouping_keys) = if let MirRelationExpr::Project {
426 input: project_input,
427 outputs: projection,
428 } = &**flat_map_input
429 {
430 // We want this to be a single column, because we'll want to deal with only
431 // one aggregation in the `Reduce`. (The aggregation of a window function
432 // always stands alone currently: we plan them separately from other
433 // aggregations, and Reduces are never fused. When window functions are
434 // fused with each other, they end up in one aggregation. When there are
435 // multiple window functions in the same SELECT, but can't be fused, they
436 // end up in different Reduces.)
437 if let &[single_col] = &**projection {
438 (project_input, single_col)
439 } else {
440 break 'fusion None;
441 }
442 } else {
443 (flat_map_input, 0)
444 };
445 if let MirRelationExpr::Reduce {
446 input,
447 group_key,
448 aggregates,
449 monotonic,
450 expected_group_size,
451 } = &**maybe_reduce
452 {
453 if group_key.len() != num_grouping_keys
454 || aggregates.len() != 1
455 || !aggregates[0].func.can_fuse_with_unnest_list()
456 {
457 break 'fusion None;
458 }
459 // At the beginning, `non_fused_mfp_above_flat_map` will be the original MFP
460 // above the FlatMap. Later, we'll mutate this to be the residual MFP that
461 // didn't get fused into the `Reduce`.
462 let non_fused_mfp_above_flat_map = &mut mfp;
463 let reduce_output_arity = num_grouping_keys + 1;
464 // We are fusing away the list that the FlatMap would have been unnesting,
465 // so the column that had that list disappears, so we have to permute the
466 // MFP above the FlatMap with this column disappearance.
467 let tweaked_mfp = {
468 let mut mfp = non_fused_mfp_above_flat_map.clone();
469 if mfp.demand().contains(&0) {
470 // I don't think this can happen currently that this MFP would
471 // refer to the list column, because both the list column and the
472 // MFP were constructed by the HIR-to-MIR lowering, so it's not just
473 // some random MFP that we are seeing here. But anyhow, it's better
474 // to check this here for robustness against future code changes.
475 break 'fusion None;
476 }
477 let permutation: BTreeMap<_, _> =
478 (1..mfp.input_arity).map(|col| (col, col - 1)).collect();
479 mfp.permute_fn(|c| permutation[&c], mfp.input_arity - 1);
480 mfp
481 };
482 // We now put together the project that was before the FlatMap, and the
483 // tweaked version of the MFP that was after the FlatMap.
484 // (Part of this MFP might be fused into the Reduce.)
485 let mut project_and_tweaked_mfp = {
486 let mut mfp = MapFilterProject::new(reduce_output_arity);
487 mfp = mfp.project(vec![num_grouping_keys]);
488 mfp = MapFilterProject::compose(mfp, tweaked_mfp);
489 mfp
490 };
491 let fused = self.lower_reduce(
492 input,
493 group_key,
494 aggregates,
495 monotonic,
496 expected_group_size,
497 &mut project_and_tweaked_mfp,
498 true,
499 )?;
500 // Update the residual MFP.
501 *non_fused_mfp_above_flat_map = project_and_tweaked_mfp;
502 Some(fused)
503 } else {
504 break 'fusion None;
505 }
506 };
507 if let Some(fused_with_reduce) = fused_with_reduce {
508 fused_with_reduce
509 } else {
510 // Couldn't fuse it with a `Reduce`, so lower as a normal `FlatMap`.
511 let (input, keys) = self.lower_mir_expr(flat_map_input)?;
512 // This stage can absorb arbitrary MFP instances.
513 let mfp = mfp.take();
514 let mut exprs = exprs.clone();
515 let input_key = if let Some((k, permutation, _)) = keys.arbitrary_arrangement()
516 {
517 // We don't permute the MFP here, because it runs _after_ the table function,
518 // whose output is in a fixed order.
519 //
520 // We _do_, however, need to permute the `expr`s that provide input to the
521 // `func`.
522 let permutation = permutation.iter().cloned().enumerate().collect();
523 for expr in &mut exprs {
524 expr.permute_map(&permutation);
525 }
526
527 Some(k.clone())
528 } else {
529 None
530 };
531
532 let lir_id = self.allocate_lir_id();
533 // Return the plan, and no arrangements.
534 (
535 PlanNode::FlatMap {
536 input_key,
537 input: Box::new(input),
538 exprs: exprs.clone(),
539 func: func.clone(),
540 mfp_after: mfp,
541 }
542 .as_plan(lir_id),
543 AvailableCollections::new_raw(),
544 )
545 }
546 }
547 MirRelationExpr::Join {
548 inputs,
549 equivalences,
550 implementation,
551 } => {
552 // Plan each of the join inputs independently.
553 // The `plans` get surfaced upwards, and the `input_keys` should
554 // be used as part of join planning / to validate the existing
555 // plans / to aid in indexed seeding of update streams.
556 let mut plans = Vec::new();
557 let mut input_keys = Vec::new();
558 let mut input_arities = Vec::new();
559 for input in inputs.iter() {
560 let (plan, keys) = self.lower_mir_expr(input)?;
561 input_arities.push(input.arity());
562 plans.push(plan);
563 input_keys.push(keys);
564 }
565
566 let input_mapper =
567 JoinInputMapper::new_from_input_arities(input_arities.iter().copied());
568
569 // Extract temporal predicates as joins cannot currently absorb them.
570 let (plan, missing) = match implementation {
571 IndexedFilter(_coll_id, _idx_id, key, _val) => {
572 // Start with the constant input. (This used to be important before database-issues#4016
573 // was fixed.)
574 let start: usize = 1;
575 let order = vec![(0usize, key.clone(), None)];
576 // All columns of the constant input will be part of the arrangement key.
577 let source_arrangement = (
578 (0..key.len())
579 .map(MirScalarExpr::column)
580 .collect::<Vec<_>>(),
581 (0..key.len()).collect::<Vec<_>>(),
582 Vec::<usize>::new(),
583 );
584 let (ljp, missing) = LinearJoinPlan::create_from(
585 start,
586 Some(&source_arrangement),
587 equivalences,
588 &order,
589 input_mapper,
590 &mut mfp,
591 &input_keys,
592 );
593 (JoinPlan::Linear(ljp), missing)
594 }
595 Differential((start, start_arr, _start_characteristic), order) => {
596 let source_arrangement = start_arr.as_ref().and_then(|key| {
597 input_keys[*start]
598 .arranged
599 .iter()
600 .find(|(k, _, _)| k == key)
601 .clone()
602 });
603 let (ljp, missing) = LinearJoinPlan::create_from(
604 *start,
605 source_arrangement,
606 equivalences,
607 order,
608 input_mapper,
609 &mut mfp,
610 &input_keys,
611 );
612 (JoinPlan::Linear(ljp), missing)
613 }
614 DeltaQuery(orders) => {
615 let (djp, missing) = DeltaJoinPlan::create_from(
616 equivalences,
617 orders,
618 input_mapper,
619 &mut mfp,
620 &input_keys,
621 );
622 (JoinPlan::Delta(djp), missing)
623 }
624 // Other plans are errors, and should be reported as such.
625 Unimplemented => return Err("unimplemented join".to_string()),
626 };
627 // The renderer will expect certain arrangements to exist; if any of those are not available, the join planning functions above should have returned them in
628 // `missing`. We thus need to plan them here so they'll exist.
629 let is_delta = matches!(plan, JoinPlan::Delta(_));
630 for (((input_plan, input_keys), missing), arity) in plans
631 .iter_mut()
632 .zip(input_keys.iter())
633 .zip(missing.into_iter())
634 .zip(input_arities.iter().cloned())
635 {
636 if missing != Default::default() {
637 if is_delta {
638 // join_implementation.rs produced a sub-optimal plan here;
639 // we shouldn't plan delta joins at all if not all of the required
640 // arrangements are available. Soft panic in CI and log an error in
641 // production to increase the chances that we will catch all situations
642 // that violate this constraint.
643 soft_panic_or_log!("Arrangements depended on by delta join alarmingly absent: {:?}
644Dataflow info: {}
645This is not expected to cause incorrect results, but could indicate a performance issue in Materialize.", missing, self.debug_info);
646 } else {
647 soft_panic_or_log!("Arrangements depended on by a non-delta join are absent: {:?}
648Dataflow info: {}
649This is not expected to cause incorrect results, but could indicate a performance issue in Materialize.", missing, self.debug_info);
650 // Nowadays MIR transforms take care to insert MIR ArrangeBys for each
651 // Join input. (Earlier, they were missing in the following cases:
652 // - They were const-folded away for constant inputs. This is not
653 // happening since
654 // https://github.com/MaterializeInc/materialize/pull/16351
655 // - They were not being inserted for the constant input of
656 // `IndexedFilter`s. This was fixed in
657 // https://github.com/MaterializeInc/materialize/pull/20920
658 // - They were not being inserted for the first input of Differential
659 // joins. This was fixed in
660 // https://github.com/MaterializeInc/materialize/pull/16099)
661 }
662 let lir_id = self.allocate_lir_id();
663 let raw_plan = std::mem::replace(
664 input_plan,
665 PlanNode::Constant {
666 rows: Ok(Vec::new()),
667 }
668 .as_plan(lir_id),
669 );
670 *input_plan = self.arrange_by(raw_plan, missing, input_keys, arity);
671 }
672 }
673 // Return the plan, and no arrangements.
674 let lir_id = self.allocate_lir_id();
675 (
676 PlanNode::Join {
677 inputs: plans,
678 plan,
679 }
680 .as_plan(lir_id),
681 AvailableCollections::new_raw(),
682 )
683 }
684 MirRelationExpr::Reduce {
685 input,
686 group_key,
687 aggregates,
688 monotonic,
689 expected_group_size,
690 } => {
691 if aggregates
692 .iter()
693 .any(|agg| agg.func.can_fuse_with_unnest_list())
694 {
695 // This case should have been handled at the `MirRelationExpr::FlatMap` case
696 // above. But that has a pretty complicated pattern matching, so it's not
697 // unthinkable that it fails.
698 soft_panic_or_log!(
699 "Window function performance issue: `reduce_unnest_list_fusion` failed"
700 );
701 }
702 self.lower_reduce(
703 input,
704 group_key,
705 aggregates,
706 monotonic,
707 expected_group_size,
708 &mut mfp,
709 false,
710 )?
711 }
712 MirRelationExpr::TopK {
713 input,
714 group_key,
715 order_key,
716 limit,
717 offset,
718 monotonic,
719 expected_group_size,
720 } => {
721 let arity = input.arity();
722 let (input, keys) = self.lower_mir_expr(input)?;
723
724 let top_k_plan = TopKPlan::create_from(
725 group_key.clone(),
726 order_key.clone(),
727 *offset,
728 limit.clone(),
729 arity,
730 *monotonic,
731 *expected_group_size,
732 );
733
734 // We don't have an MFP here -- install an operator to permute the
735 // input, if necessary.
736 let input = if !keys.raw {
737 self.arrange_by(input, AvailableCollections::new_raw(), &keys, arity)
738 } else {
739 input
740 };
741 // Return the plan, and no arrangements.
742 let lir_id = self.allocate_lir_id();
743 (
744 PlanNode::TopK {
745 input: Box::new(input),
746 top_k_plan,
747 }
748 .as_plan(lir_id),
749 AvailableCollections::new_raw(),
750 )
751 }
752 MirRelationExpr::Negate { input } => {
753 let arity = input.arity();
754 let (input, keys) = self.lower_mir_expr(input)?;
755
756 // We don't have an MFP here -- install an operator to permute the
757 // input, if necessary.
758 let input = if !keys.raw {
759 self.arrange_by(input, AvailableCollections::new_raw(), &keys, arity)
760 } else {
761 input
762 };
763 // Return the plan, and no arrangements.
764 let lir_id = self.allocate_lir_id();
765 (
766 PlanNode::Negate {
767 input: Box::new(input),
768 }
769 .as_plan(lir_id),
770 AvailableCollections::new_raw(),
771 )
772 }
773 MirRelationExpr::Threshold { input } => {
774 let (plan, keys) = self.lower_mir_expr(input)?;
775 let arity = input.arity();
776 let (threshold_plan, required_arrangement) = ThresholdPlan::create_from(arity);
777 let plan = if !keys
778 .arranged
779 .iter()
780 .any(|(key, _, _)| key == &required_arrangement.0)
781 {
782 self.arrange_by(
783 plan,
784 AvailableCollections::new_arranged(vec![required_arrangement]),
785 &keys,
786 arity,
787 )
788 } else {
789 plan
790 };
791
792 let output_keys = threshold_plan.keys();
793 // Return the plan, and any produced keys.
794 let lir_id = self.allocate_lir_id();
795 (
796 PlanNode::Threshold {
797 input: Box::new(plan),
798 threshold_plan,
799 }
800 .as_plan(lir_id),
801 output_keys,
802 )
803 }
804 MirRelationExpr::Union { base, inputs } => {
805 let arity = base.arity();
806 let mut plans_keys = Vec::with_capacity(1 + inputs.len());
807 let (plan, keys) = self.lower_mir_expr(base)?;
808 plans_keys.push((plan, keys));
809 for input in inputs.iter() {
810 let (plan, keys) = self.lower_mir_expr(input)?;
811 plans_keys.push((plan, keys));
812 }
813 let plans = plans_keys
814 .into_iter()
815 .map(|(plan, keys)| {
816 // We don't have an MFP here -- install an operator to permute the
817 // input, if necessary.
818 if !keys.raw {
819 self.arrange_by(plan, AvailableCollections::new_raw(), &keys, arity)
820 } else {
821 plan
822 }
823 })
824 .collect();
825 // Return the plan and no arrangements.
826 let lir_id = self.allocate_lir_id();
827 (
828 PlanNode::Union {
829 inputs: plans,
830 consolidate_output: false,
831 }
832 .as_plan(lir_id),
833 AvailableCollections::new_raw(),
834 )
835 }
836 MirRelationExpr::ArrangeBy { input, keys } => {
837 let input_mir = input;
838 let (input, mut input_keys) = self.lower_mir_expr(input)?;
839 // Fill the `types` in `input_keys` if not already present.
840 let arity = input_mir.arity();
841
842 // Determine keys that are not present in `input_keys`.
843 let new_keys = keys
844 .iter()
845 .filter(|k1| !input_keys.arranged.iter().any(|(k2, _, _)| k1 == &k2))
846 .cloned()
847 .collect::<Vec<_>>();
848 if new_keys.is_empty() {
849 (input, input_keys)
850 } else {
851 let mut new_keys = new_keys
852 .iter()
853 .cloned()
854 .map(|k| {
855 let (permutation, thinning) = permutation_for_arrangement(&k, arity);
856 (k, permutation, thinning)
857 })
858 .collect::<Vec<_>>();
859 let forms = AvailableCollections {
860 raw: input_keys.raw,
861 arranged: new_keys.clone(),
862 };
863 let (input_key, input_mfp) = if let Some((input_key, permutation, thinning)) =
864 input_keys.arbitrary_arrangement()
865 {
866 let mut mfp = MapFilterProject::new(arity);
867 mfp.permute_fn(|c| permutation[c], thinning.len() + input_key.len());
868 (Some(input_key.clone()), mfp)
869 } else {
870 (None, MapFilterProject::new(arity))
871 };
872 input_keys.arranged.append(&mut new_keys);
873 input_keys.arranged.sort_by(|k1, k2| k1.0.cmp(&k2.0));
874
875 // Return the plan and extended keys.
876 let lir_id = self.allocate_lir_id();
877 (
878 PlanNode::ArrangeBy {
879 input_key,
880 input: Box::new(input),
881 input_mfp,
882 forms,
883 }
884 .as_plan(lir_id),
885 input_keys,
886 )
887 }
888 }
889 };
890
891 // If the plan stage did not absorb all linear operators, introduce a new stage to implement them.
892 if !mfp.is_identity() {
893 // Seek out an arrangement key that might be constrained to a literal.
894 // TODO: Improve key selection heuristic.
895 let key_val = keys
896 .arranged
897 .iter()
898 .filter_map(|(key, permutation, thinning)| {
899 let mut mfp = mfp.clone();
900 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
901 mfp.literal_constraints(key)
902 .map(|val| (key.clone(), permutation, thinning, val))
903 })
904 .max_by_key(|(key, _, _, _)| key.len());
905
906 // Input key selection strategy:
907 // (1) If we can read a key at a particular value, do so
908 // (2) Otherwise, if there is a key that causes the MFP to be the identity, and
909 // therefore allows us to avoid discarding the arrangement, use that.
910 // (3) Otherwise, if there is _some_ key, use that,
911 // (4) Otherwise just read the raw collection.
912 let input_key_val = if let Some((key, permutation, thinning, val)) = key_val {
913 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
914
915 Some((key, Some(val)))
916 } else if let Some((key, permutation, thinning)) =
917 keys.arranged.iter().find(|(key, permutation, thinning)| {
918 let mut mfp = mfp.clone();
919 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
920 mfp.is_identity()
921 })
922 {
923 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
924 Some((key.clone(), None))
925 } else if let Some((key, permutation, thinning)) = keys.arbitrary_arrangement() {
926 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
927 Some((key.clone(), None))
928 } else {
929 None
930 };
931
932 if mfp.is_identity() {
933 // We have discovered a key
934 // whose permutation causes the MFP to actually
935 // be the identity! We can keep it around,
936 // but without its permutation this time,
937 // and with a trivial thinning of the right length.
938 let (key, val) = input_key_val.unwrap();
939 let (_old_key, old_permutation, old_thinning) = keys
940 .arranged
941 .iter_mut()
942 .find(|(key2, _, _)| key2 == &key)
943 .unwrap();
944 *old_permutation = (0..mfp.input_arity).collect();
945 let old_thinned_arity = old_thinning.len();
946 *old_thinning = (0..old_thinned_arity).collect();
947 // Get rid of all other forms, as this is now the only one known to be valid.
948 // TODO[btv] we can probably save the other arrangements too, if we adjust their permutations.
949 // This is not hard to do, but leaving it for a quick follow-up to avoid making the present diff too unwieldy.
950 keys.arranged.retain(|(key2, _, _)| key2 == &key);
951 keys.raw = false;
952
953 // Creating a Plan::Mfp node is now logically unnecessary, but we
954 // should do so anyway when `val` is populated, so that
955 // the `key_val` optimization gets applied.
956 let lir_id = self.allocate_lir_id();
957 if val.is_some() {
958 plan = PlanNode::Mfp {
959 input: Box::new(plan),
960 mfp,
961 input_key_val: Some((key, val)),
962 }
963 .as_plan(lir_id)
964 }
965 } else {
966 let lir_id = self.allocate_lir_id();
967 plan = PlanNode::Mfp {
968 input: Box::new(plan),
969 mfp,
970 input_key_val,
971 }
972 .as_plan(lir_id);
973 keys = AvailableCollections::new_raw();
974 }
975 }
976
977 Ok((plan, keys))
978 }
979
980 /// Lowers a `Reduce` with the given fields and an `mfp_on_top`, which is the MFP that is
981 /// originally on top of the `Reduce`. This MFP, or a part of it, might be fused into the
982 /// `Reduce`, in which case `mfp_on_top` is mutated to be the residual MFP, i.e., what was not
983 /// fused.
984 fn lower_reduce<T: Timestamp>(
985 &mut self,
986 input: &MirRelationExpr,
987 group_key: &Vec<MirScalarExpr>,
988 aggregates: &Vec<AggregateExpr>,
989 monotonic: &bool,
990 expected_group_size: &Option<u64>,
991 mfp_on_top: &mut MapFilterProject,
992 fused_unnest_list: bool,
993 ) -> Result<(Plan<T>, AvailableCollections), String> {
994 let input_arity = input.arity();
995 let (input, keys) = self.lower_mir_expr(input)?;
996 let (input_key, permutation_and_new_arity) =
997 if let Some((input_key, permutation, thinning)) = keys.arbitrary_arrangement() {
998 (
999 Some(input_key.clone()),
1000 Some((permutation.clone(), thinning.len() + input_key.len())),
1001 )
1002 } else {
1003 (None, None)
1004 };
1005 let key_val_plan = KeyValPlan::new(
1006 input_arity,
1007 group_key,
1008 aggregates,
1009 permutation_and_new_arity,
1010 );
1011 let reduce_plan = ReducePlan::create_from(
1012 aggregates.clone(),
1013 *monotonic,
1014 *expected_group_size,
1015 fused_unnest_list,
1016 );
1017 // Return the plan, and the keys it produces.
1018 let mfp_after;
1019 let output_arity;
1020 if self.enable_reduce_mfp_fusion {
1021 (mfp_after, *mfp_on_top, output_arity) =
1022 reduce_plan.extract_mfp_after(mfp_on_top.clone(), group_key.len());
1023 } else {
1024 (mfp_after, output_arity) = (
1025 MapFilterProject::new(mfp_on_top.input_arity),
1026 group_key.len() + aggregates.len(),
1027 );
1028 }
1029 soft_assert_eq_or_log!(
1030 mfp_on_top.input_arity,
1031 output_arity,
1032 "Output arity of reduce must match input arity for MFP on top of it"
1033 );
1034 let output_keys = reduce_plan.keys(group_key.len(), output_arity);
1035 let lir_id = self.allocate_lir_id();
1036 Ok((
1037 PlanNode::Reduce {
1038 input_key,
1039 input: Box::new(input),
1040 key_val_plan,
1041 plan: reduce_plan,
1042 mfp_after,
1043 }
1044 .as_plan(lir_id),
1045 output_keys,
1046 ))
1047 }
1048
1049 /// Replace the plan with another one
1050 /// that has the collection in some additional forms.
1051 pub fn arrange_by<T>(
1052 &mut self,
1053 plan: Plan<T>,
1054 collections: AvailableCollections,
1055 old_collections: &AvailableCollections,
1056 arity: usize,
1057 ) -> Plan<T> {
1058 if let Plan {
1059 node:
1060 PlanNode::ArrangeBy {
1061 input_key,
1062 input,
1063 input_mfp,
1064 mut forms,
1065 },
1066 lir_id,
1067 } = plan
1068 {
1069 forms.raw |= collections.raw;
1070 forms.arranged.extend(collections.arranged);
1071 forms.arranged.sort_by(|k1, k2| k1.0.cmp(&k2.0));
1072 forms.arranged.dedup_by(|k1, k2| k1.0 == k2.0);
1073 PlanNode::ArrangeBy {
1074 input_key,
1075 input,
1076 input_mfp,
1077 forms,
1078 }
1079 .as_plan(lir_id)
1080 } else {
1081 let (input_key, input_mfp) = if let Some((input_key, permutation, thinning)) =
1082 old_collections.arbitrary_arrangement()
1083 {
1084 let mut mfp = MapFilterProject::new(arity);
1085 mfp.permute_fn(|c| permutation[c], thinning.len() + input_key.len());
1086 (Some(input_key.clone()), mfp)
1087 } else {
1088 (None, MapFilterProject::new(arity))
1089 };
1090 let lir_id = self.allocate_lir_id();
1091
1092 PlanNode::ArrangeBy {
1093 input_key,
1094 input: Box::new(plan),
1095 input_mfp,
1096 forms: collections,
1097 }
1098 .as_plan(lir_id)
1099 }
1100 }
1101}
1102
1103/// Various bits of state to print along with error messages during LIR planning,
1104/// to aid debugging.
1105#[derive(Clone, Debug)]
1106pub struct LirDebugInfo {
1107 debug_name: String,
1108 id: GlobalId,
1109}
1110
1111impl std::fmt::Display for LirDebugInfo {
1112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1113 write!(f, "Debug name: {}; id: {}", self.debug_name, self.id)
1114 }
1115}