mz_compute_types/plan/lowering.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Lowering [`DataflowDescription`]s from MIR ([`MirRelationExpr`]) to LIR ([`Plan`]).
11
12use std::collections::BTreeMap;
13
14use mz_expr::JoinImplementation::{DeltaQuery, Differential, IndexedFilter, Unimplemented};
15use mz_expr::{
16 AggregateExpr, Id, JoinInputMapper, MapFilterProject, MirRelationExpr, MirScalarExpr,
17 OptimizedMirRelationExpr, TableFunc, permutation_for_arrangement,
18};
19use mz_ore::{assert_none, soft_assert_eq_or_log, soft_panic_or_log};
20use mz_repr::GlobalId;
21use mz_repr::optimize::OptimizerFeatures;
22use timely::progress::Timestamp;
23
24use crate::dataflows::{BuildDesc, DataflowDescription, IndexImport};
25use crate::plan::join::{DeltaJoinPlan, JoinPlan, LinearJoinPlan};
26use crate::plan::reduce::{KeyValPlan, ReducePlan};
27use crate::plan::threshold::ThresholdPlan;
28use crate::plan::top_k::TopKPlan;
29use crate::plan::{AvailableCollections, GetPlan, LirId, Plan, PlanNode};
30
31pub(super) struct Context {
32 /// Known bindings to (possibly arranged) collections.
33 arrangements: BTreeMap<Id, AvailableCollections>,
34 /// Tracks the next available `LirId`.
35 next_lir_id: LirId,
36 /// Information to print along with error messages.
37 debug_info: LirDebugInfo,
38 /// Whether to enable fusion of MFPs in reductions.
39 enable_reduce_mfp_fusion: bool,
40}
41
42impl Context {
43 pub fn new(debug_name: String, features: &OptimizerFeatures) -> Self {
44 Self {
45 arrangements: Default::default(),
46 next_lir_id: LirId(1),
47 debug_info: LirDebugInfo {
48 debug_name,
49 id: GlobalId::Transient(0),
50 },
51 enable_reduce_mfp_fusion: features.enable_reduce_mfp_fusion,
52 }
53 }
54
55 fn allocate_lir_id(&mut self) -> LirId {
56 let id = self.next_lir_id;
57 self.next_lir_id = LirId(
58 self.next_lir_id
59 .0
60 .checked_add(1)
61 .expect("No LirId overflow"),
62 );
63 id
64 }
65
66 pub fn lower<T: Timestamp>(
67 mut self,
68 desc: DataflowDescription<OptimizedMirRelationExpr>,
69 ) -> Result<DataflowDescription<Plan<T>>, String> {
70 // Sources might provide arranged forms of their data, in the future.
71 // Indexes provide arranged forms of their data.
72 for IndexImport {
73 desc: index_desc,
74 typ,
75 ..
76 } in desc.index_imports.values()
77 {
78 let key = index_desc.key.clone();
79 // TODO[btv] - We should be told the permutation by
80 // `index_desc`, and it should have been generated
81 // at the same point the thinning logic was.
82 //
83 // We should for sure do that soon, but it requires
84 // a bit of a refactor, so for now we just
85 // _assume_ that they were both generated by `permutation_for_arrangement`,
86 // and recover it here.
87 let (permutation, thinning) = permutation_for_arrangement(&key, typ.arity());
88 let index_keys = self
89 .arrangements
90 .entry(Id::Global(index_desc.on_id))
91 .or_insert_with(AvailableCollections::default);
92 index_keys.arranged.push((key, permutation, thinning));
93 index_keys.types = Some(typ.column_types.clone());
94 }
95 for id in desc.source_imports.keys() {
96 self.arrangements
97 .entry(Id::Global(*id))
98 .or_insert_with(AvailableCollections::new_raw);
99 }
100
101 // Build each object in order, registering the arrangements it forms.
102 let mut objects_to_build = Vec::with_capacity(desc.objects_to_build.len());
103 for build in desc.objects_to_build {
104 self.debug_info.id = build.id;
105 let (plan, keys) = self.lower_mir_expr(&build.plan)?;
106
107 self.arrangements.insert(Id::Global(build.id), keys);
108 objects_to_build.push(BuildDesc { id: build.id, plan });
109 }
110
111 Ok(DataflowDescription {
112 source_imports: desc.source_imports,
113 index_imports: desc.index_imports,
114 objects_to_build,
115 index_exports: desc.index_exports,
116 sink_exports: desc.sink_exports,
117 as_of: desc.as_of,
118 until: desc.until,
119 initial_storage_as_of: desc.initial_storage_as_of,
120 refresh_schedule: desc.refresh_schedule,
121 debug_name: desc.debug_name,
122 time_dependence: desc.time_dependence,
123 })
124 }
125
126 /// This method converts a MirRelationExpr into a plan that can be directly rendered.
127 ///
128 /// The rough structure is that we repeatedly extract map/filter/project operators
129 /// from each expression we see, bundle them up as a `MapFilterProject` object, and
130 /// then produce a plan for the combination of that with the next operator.
131 ///
132 /// The method accesses `self.arrangements`, which it will locally add to and remove from for
133 /// `Let` bindings (by the end of the call it should contain the same bindings as when it
134 /// started).
135 ///
136 /// The result of the method is both a `Plan`, but also a list of arrangements that
137 /// are certain to be produced, which can be relied on by the next steps in the plan.
138 /// Each of the arrangement keys is associated with an MFP that must be applied if that
139 /// arrangement is used, to back out the permutation associated with that arrangement.
140 ///
141 /// An empty list of arrangement keys indicates that only a `Collection` stream can
142 /// be assumed to exist.
143 fn lower_mir_expr<T: Timestamp>(
144 &mut self,
145 expr: &MirRelationExpr,
146 ) -> Result<(Plan<T>, AvailableCollections), String> {
147 // This function is recursive and can overflow its stack, so grow it if
148 // needed. The growth here is unbounded. Our general solution for this problem
149 // is to use [`ore::stack::RecursionGuard`] to additionally limit the stack
150 // depth. That however requires upstream error handling. This function is
151 // currently called by the Coordinator after calls to `catalog_transact`,
152 // and thus are not allowed to fail. Until that allows errors, we choose
153 // to allow the unbounded growth here. We are though somewhat protected by
154 // higher levels enforcing their own limits on stack depth (in the parser,
155 // transformer/desugarer, and planner).
156 mz_ore::stack::maybe_grow(|| self.lower_mir_expr_stack_safe(expr))
157 }
158
159 fn lower_mir_expr_stack_safe<T>(
160 &mut self,
161 expr: &MirRelationExpr,
162 ) -> Result<(Plan<T>, AvailableCollections), String>
163 where
164 T: Timestamp,
165 {
166 // Extract a maximally large MapFilterProject from `expr`.
167 // We will then try and push this in to the resulting expression.
168 //
169 // Importantly, `mfp` may contain temporal operators and not be a "safe" MFP.
170 // While we would eventually like all plan stages to be able to absorb such
171 // general operators, not all of them can.
172 let (mut mfp, expr) = MapFilterProject::extract_from_expression(expr);
173 // We attempt to plan what we have remaining, in the context of `mfp`.
174 // We may not be able to do this, and must wrap some operators with a `Mfp` stage.
175 let (mut plan, mut keys) = match expr {
176 // These operators should have been extracted from the expression.
177 MirRelationExpr::Map { .. } => {
178 panic!("This operator should have been extracted");
179 }
180 MirRelationExpr::Filter { .. } => {
181 panic!("This operator should have been extracted");
182 }
183 MirRelationExpr::Project { .. } => {
184 panic!("This operator should have been extracted");
185 }
186 // These operators may not have been extracted, and need to result in a `Plan`.
187 MirRelationExpr::Constant { rows, typ: _ } => {
188 let lir_id = self.allocate_lir_id();
189 let node = PlanNode::Constant {
190 rows: rows.clone().map(|rows| {
191 rows.into_iter()
192 .map(|(row, diff)| (row, T::minimum(), diff))
193 .collect()
194 }),
195 };
196 // The plan, not arranged in any way.
197 (node.as_plan(lir_id), AvailableCollections::new_raw())
198 }
199 MirRelationExpr::Get { id, typ: _, .. } => {
200 // This stage can absorb arbitrary MFP operators.
201 let mut mfp = mfp.take();
202 // If `mfp` is the identity, we can surface all imported arrangements.
203 // Otherwise, we apply `mfp` and promise no arrangements.
204 let mut in_keys = self
205 .arrangements
206 .get(id)
207 .cloned()
208 .unwrap_or_else(AvailableCollections::new_raw);
209
210 // Seek out an arrangement key that might be constrained to a literal.
211 // Note: this code has very little use nowadays, as its job was mostly taken over
212 // by `LiteralConstraints` (see in the below longer comment).
213 let key_val = in_keys
214 .arranged
215 .iter()
216 .filter_map(|key| {
217 mfp.literal_constraints(&key.0)
218 .map(|val| (key.clone(), val))
219 })
220 .max_by_key(|(key, _val)| key.0.len());
221
222 // Determine the plan of action for the `Get` stage.
223 let plan = if let Some(((key, permutation, thinning), val)) = &key_val {
224 // This code path used to handle looking up literals from indexes, but it's
225 // mostly deprecated, as this is nowadays performed by the `LiteralConstraints`
226 // MIR transform instead. However, it's still called in a couple of tricky
227 // special cases:
228 // - `LiteralConstraints` handles only Gets of global ids, so this code still
229 // gets to handle Filters on top of Gets of local ids.
230 // - Lowering does a `MapFilterProject::extract_from_expression`, while
231 // `LiteralConstraints` does
232 // `MapFilterProject::extract_non_errors_from_expr_mut`.
233 // - It might happen that new literal constraint optimization opportunities
234 // appear somewhere near the end of the MIR optimizer after
235 // `LiteralConstraints` has already run.
236 // (Also note that a similar literal constraint handling machinery is also
237 // present when handling the leftover MFP after this big match.)
238 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
239 in_keys.arranged = vec![(key.clone(), permutation.clone(), thinning.clone())];
240 GetPlan::Arrangement(key.clone(), Some(val.clone()), mfp)
241 } else if !mfp.is_identity() {
242 // We need to ensure a collection exists, which means we must form it.
243 if let Some((key, permutation, thinning)) =
244 in_keys.arbitrary_arrangement().cloned()
245 {
246 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
247 in_keys.arranged = vec![(key.clone(), permutation, thinning)];
248 GetPlan::Arrangement(key, None, mfp)
249 } else {
250 GetPlan::Collection(mfp)
251 }
252 } else {
253 // By default, just pass input arrangements through.
254 GetPlan::PassArrangements
255 };
256
257 let out_keys = if let GetPlan::PassArrangements = plan {
258 in_keys.clone()
259 } else {
260 AvailableCollections::new_raw()
261 };
262
263 let lir_id = self.allocate_lir_id();
264 let node = PlanNode::Get {
265 id: id.clone(),
266 keys: in_keys,
267 plan,
268 };
269 // Return the plan, and any keys if an identity `mfp`.
270 (node.as_plan(lir_id), out_keys)
271 }
272 MirRelationExpr::Let { id, value, body } => {
273 // It would be unfortunate to have a non-trivial `mfp` here, as we hope
274 // that they would be pushed down. I am not sure if we should take the
275 // initiative to push down the `mfp` ourselves.
276
277 // Plan the value using only the initial arrangements, but
278 // introduce any resulting arrangements bound to `id`.
279 let (value, v_keys) = self.lower_mir_expr(value)?;
280 let pre_existing = self.arrangements.insert(Id::Local(*id), v_keys);
281 assert_none!(pre_existing);
282 // Plan the body using initial and `value` arrangements,
283 // and then remove reference to the value arrangements.
284 let (body, b_keys) = self.lower_mir_expr(body)?;
285 self.arrangements.remove(&Id::Local(*id));
286 // Return the plan, and any `body` arrangements.
287 let lir_id = self.allocate_lir_id();
288 (
289 PlanNode::Let {
290 id: id.clone(),
291 value: Box::new(value),
292 body: Box::new(body),
293 }
294 .as_plan(lir_id),
295 b_keys,
296 )
297 }
298 MirRelationExpr::LetRec {
299 ids,
300 values,
301 limits,
302 body,
303 } => {
304 assert_eq!(ids.len(), values.len());
305 assert_eq!(ids.len(), limits.len());
306 // Plan the values using only the available arrangements, but
307 // introduce any resulting arrangements bound to each `id`.
308 // Arrangements made available cannot be used by prior bindings,
309 // as we cannot circulate an arrangement through a `Variable` yet.
310 let mut lir_values = Vec::with_capacity(values.len());
311 for (id, value) in ids.iter().zip(values) {
312 let (mut lir_value, mut v_keys) = self.lower_mir_expr(value)?;
313 // If `v_keys` does not contain an unarranged collection, we must form it.
314 if !v_keys.raw {
315 // Choose an "arbitrary" arrangement; TODO: prefer a specific one.
316 let (input_key, permutation, thinning) =
317 v_keys.arbitrary_arrangement().unwrap();
318 let mut input_mfp = MapFilterProject::new(value.arity());
319 input_mfp.permute_fn(|c| permutation[c], thinning.len() + input_key.len());
320 let input_key = Some(input_key.clone());
321
322 let forms = AvailableCollections::new_raw();
323
324 // We just want to insert an `ArrangeBy` to form an unarranged collection,
325 // but there is a complication: We shouldn't break the invariant (created by
326 // `NormalizeLets`, and relied upon by the rendering) that there isn't
327 // anything between two `LetRec`s. So if `lir_value` is itself a `LetRec`,
328 // then we insert the `ArrangeBy` on the `body` of the inner `LetRec`,
329 // instead of on top of the inner `LetRec`.
330 lir_value = match lir_value {
331 Plan {
332 node:
333 PlanNode::LetRec {
334 ids,
335 values,
336 limits,
337 body,
338 },
339 lir_id,
340 } => {
341 let inner_lir_id = self.allocate_lir_id();
342 PlanNode::LetRec {
343 ids,
344 values,
345 limits,
346 body: Box::new(
347 PlanNode::ArrangeBy {
348 input: body,
349 forms,
350 input_key,
351 input_mfp,
352 }
353 .as_plan(inner_lir_id),
354 ),
355 }
356 .as_plan(lir_id)
357 }
358 lir_value => {
359 let lir_id = self.allocate_lir_id();
360 PlanNode::ArrangeBy {
361 input: Box::new(lir_value),
362 forms,
363 input_key,
364 input_mfp,
365 }
366 .as_plan(lir_id)
367 }
368 };
369 v_keys.raw = true;
370 }
371 let pre_existing = self.arrangements.insert(Id::Local(*id), v_keys);
372 assert_none!(pre_existing);
373 lir_values.push(lir_value);
374 }
375 // As we exit the iterative scope, we must leave all arrangements behind,
376 // as they reference a timestamp coordinate that must be stripped off.
377 for id in ids.iter() {
378 self.arrangements
379 .insert(Id::Local(*id), AvailableCollections::new_raw());
380 }
381 // Plan the body using initial and `value` arrangements,
382 // and then remove reference to the value arrangements.
383 let (body, b_keys) = self.lower_mir_expr(body)?;
384 for id in ids.iter() {
385 self.arrangements.remove(&Id::Local(*id));
386 }
387 // Return the plan, and any `body` arrangements.
388 let lir_id = self.allocate_lir_id();
389 (
390 PlanNode::LetRec {
391 ids: ids.clone(),
392 values: lir_values,
393 limits: limits.clone(),
394 body: Box::new(body),
395 }
396 .as_plan(lir_id),
397 b_keys,
398 )
399 }
400 MirRelationExpr::FlatMap {
401 input: flat_map_input,
402 func,
403 exprs,
404 } => {
405 // A `FlatMap UnnestList` that comes after the `Reduce` of a window function can be
406 // fused into the lowered `Reduce`.
407 //
408 // In theory, we could have implemented this also as an MIR transform. However, this
409 // is more of a physical optimization, which are sometimes unpleasant to make a part
410 // of the MIR pipeline. The specific problem here with putting this into the MIR
411 // pipeline would be that we'd need to modify MIR's semantics: MIR's Reduce
412 // currently always emits exactly 1 row per group, but the fused Reduce-FlatMap can
413 // emit multiple rows per group. Such semantic changes of MIR are very scary, since
414 // various parts of the optimizer assume that Reduce emits only 1 row per group, and
415 // it would be very hard to hunt down all these parts. (For example, key inference
416 // infers the group key as a unique key.)
417 let fused_with_reduce = 'fusion: {
418 if !matches!(func, TableFunc::UnnestList { .. }) {
419 break 'fusion None;
420 }
421 // We might have a Project of a single col between the FlatMap and the
422 // Reduce. (It projects away the grouping keys of the Reduce, and keeps the
423 // result of the window function.)
424 let (maybe_reduce, num_grouping_keys) = if let MirRelationExpr::Project {
425 input: project_input,
426 outputs: projection,
427 } = &**flat_map_input
428 {
429 // We want this to be a single column, because we'll want to deal with only
430 // one aggregation in the `Reduce`. (The aggregation of a window function
431 // always stands alone currently: we plan them separately from other
432 // aggregations, and Reduces are never fused. When window functions are
433 // fused with each other, they end up in one aggregation. When there are
434 // multiple window functions in the same SELECT, but can't be fused, they
435 // end up in different Reduces.)
436 if let &[single_col] = &**projection {
437 (project_input, single_col)
438 } else {
439 break 'fusion None;
440 }
441 } else {
442 (flat_map_input, 0)
443 };
444 if let MirRelationExpr::Reduce {
445 input,
446 group_key,
447 aggregates,
448 monotonic,
449 expected_group_size,
450 } = &**maybe_reduce
451 {
452 if group_key.len() != num_grouping_keys
453 || aggregates.len() != 1
454 || !aggregates[0].func.can_fuse_with_unnest_list()
455 {
456 break 'fusion None;
457 }
458 // At the beginning, `non_fused_mfp_above_flat_map` will be the original MFP
459 // above the FlatMap. Later, we'll mutate this to be the residual MFP that
460 // didn't get fused into the `Reduce`.
461 let non_fused_mfp_above_flat_map = &mut mfp;
462 let reduce_output_arity = num_grouping_keys + 1;
463 // We are fusing away the list that the FlatMap would have been unnesting,
464 // so the column that had that list disappears, so we have to permute the
465 // MFP above the FlatMap with this column disappearance.
466 let tweaked_mfp = {
467 let mut mfp = non_fused_mfp_above_flat_map.clone();
468 if mfp.demand().contains(&0) {
469 // I don't think this can happen currently that this MFP would
470 // refer to the list column, because both the list column and the
471 // MFP were constructed by the HIR-to-MIR lowering, so it's not just
472 // some random MFP that we are seeing here. But anyhow, it's better
473 // to check this here for robustness against future code changes.
474 break 'fusion None;
475 }
476 let permutation: BTreeMap<_, _> =
477 (1..mfp.input_arity).map(|col| (col, col - 1)).collect();
478 mfp.permute_fn(|c| permutation[&c], mfp.input_arity - 1);
479 mfp
480 };
481 // We now put together the project that was before the FlatMap, and the
482 // tweaked version of the MFP that was after the FlatMap.
483 // (Part of this MFP might be fused into the Reduce.)
484 let mut project_and_tweaked_mfp = {
485 let mut mfp = MapFilterProject::new(reduce_output_arity);
486 mfp = mfp.project(vec![num_grouping_keys]);
487 mfp = MapFilterProject::compose(mfp, tweaked_mfp);
488 mfp
489 };
490 let fused = self.lower_reduce(
491 input,
492 group_key,
493 aggregates,
494 monotonic,
495 expected_group_size,
496 &mut project_and_tweaked_mfp,
497 true,
498 )?;
499 // Update the residual MFP.
500 *non_fused_mfp_above_flat_map = project_and_tweaked_mfp;
501 Some(fused)
502 } else {
503 break 'fusion None;
504 }
505 };
506 if let Some(fused_with_reduce) = fused_with_reduce {
507 fused_with_reduce
508 } else {
509 // Couldn't fuse it with a `Reduce`, so lower as a normal `FlatMap`.
510 let (input, keys) = self.lower_mir_expr(flat_map_input)?;
511 // This stage can absorb arbitrary MFP instances.
512 let mfp = mfp.take();
513 let mut exprs = exprs.clone();
514 let input_key = if let Some((k, permutation, _)) = keys.arbitrary_arrangement()
515 {
516 // We don't permute the MFP here, because it runs _after_ the table function,
517 // whose output is in a fixed order.
518 //
519 // We _do_, however, need to permute the `expr`s that provide input to the
520 // `func`.
521 let permutation = permutation.iter().cloned().enumerate().collect();
522 for expr in &mut exprs {
523 expr.permute_map(&permutation);
524 }
525
526 Some(k.clone())
527 } else {
528 None
529 };
530
531 let lir_id = self.allocate_lir_id();
532 // Return the plan, and no arrangements.
533 (
534 PlanNode::FlatMap {
535 input: Box::new(input),
536 func: func.clone(),
537 exprs: exprs.clone(),
538 mfp_after: mfp,
539 input_key,
540 }
541 .as_plan(lir_id),
542 AvailableCollections::new_raw(),
543 )
544 }
545 }
546 MirRelationExpr::Join {
547 inputs,
548 equivalences,
549 implementation,
550 } => {
551 // Plan each of the join inputs independently.
552 // The `plans` get surfaced upwards, and the `input_keys` should
553 // be used as part of join planning / to validate the existing
554 // plans / to aid in indexed seeding of update streams.
555 let mut plans = Vec::new();
556 let mut input_keys = Vec::new();
557 let mut input_arities = Vec::new();
558 for input in inputs.iter() {
559 let (plan, keys) = self.lower_mir_expr(input)?;
560 input_arities.push(input.arity());
561 plans.push(plan);
562 input_keys.push(keys);
563 }
564
565 let input_mapper =
566 JoinInputMapper::new_from_input_arities(input_arities.iter().copied());
567
568 // Extract temporal predicates as joins cannot currently absorb them.
569 let (plan, missing) = match implementation {
570 IndexedFilter(_coll_id, _idx_id, key, _val) => {
571 // Start with the constant input. (This used to be important before database-issues#4016
572 // was fixed.)
573 let start: usize = 1;
574 let order = vec![(0usize, key.clone(), None)];
575 // All columns of the constant input will be part of the arrangement key.
576 let source_arrangement = (
577 (0..key.len())
578 .map(MirScalarExpr::Column)
579 .collect::<Vec<_>>(),
580 (0..key.len()).collect::<Vec<_>>(),
581 Vec::<usize>::new(),
582 );
583 let (ljp, missing) = LinearJoinPlan::create_from(
584 start,
585 Some(&source_arrangement),
586 equivalences,
587 &order,
588 input_mapper,
589 &mut mfp,
590 &input_keys,
591 );
592 (JoinPlan::Linear(ljp), missing)
593 }
594 Differential((start, start_arr, _start_characteristic), order) => {
595 let source_arrangement = start_arr.as_ref().and_then(|key| {
596 input_keys[*start]
597 .arranged
598 .iter()
599 .find(|(k, _, _)| k == key)
600 .clone()
601 });
602 let (ljp, missing) = LinearJoinPlan::create_from(
603 *start,
604 source_arrangement,
605 equivalences,
606 order,
607 input_mapper,
608 &mut mfp,
609 &input_keys,
610 );
611 (JoinPlan::Linear(ljp), missing)
612 }
613 DeltaQuery(orders) => {
614 let (djp, missing) = DeltaJoinPlan::create_from(
615 equivalences,
616 orders,
617 input_mapper,
618 &mut mfp,
619 &input_keys,
620 );
621 (JoinPlan::Delta(djp), missing)
622 }
623 // Other plans are errors, and should be reported as such.
624 Unimplemented => return Err("unimplemented join".to_string()),
625 };
626 // The renderer will expect certain arrangements to exist; if any of those are not available, the join planning functions above should have returned them in
627 // `missing`. We thus need to plan them here so they'll exist.
628 let is_delta = matches!(plan, JoinPlan::Delta(_));
629 for (((input_plan, input_keys), missing), arity) in plans
630 .iter_mut()
631 .zip(input_keys.iter())
632 .zip(missing.into_iter())
633 .zip(input_arities.iter().cloned())
634 {
635 if missing != Default::default() {
636 if is_delta {
637 // join_implementation.rs produced a sub-optimal plan here;
638 // we shouldn't plan delta joins at all if not all of the required
639 // arrangements are available. Soft panic in CI and log an error in
640 // production to increase the chances that we will catch all situations
641 // that violate this constraint.
642 soft_panic_or_log!("Arrangements depended on by delta join alarmingly absent: {:?}
643Dataflow info: {}
644This is not expected to cause incorrect results, but could indicate a performance issue in Materialize.", missing, self.debug_info);
645 } else {
646 soft_panic_or_log!("Arrangements depended on by a non-delta join are absent: {:?}
647Dataflow info: {}
648This is not expected to cause incorrect results, but could indicate a performance issue in Materialize.", missing, self.debug_info);
649 // Nowadays MIR transforms take care to insert MIR ArrangeBys for each
650 // Join input. (Earlier, they were missing in the following cases:
651 // - They were const-folded away for constant inputs. This is not
652 // happening since
653 // https://github.com/MaterializeInc/materialize/pull/16351
654 // - They were not being inserted for the constant input of
655 // `IndexedFilter`s. This was fixed in
656 // https://github.com/MaterializeInc/materialize/pull/20920
657 // - They were not being inserted for the first input of Differential
658 // joins. This was fixed in
659 // https://github.com/MaterializeInc/materialize/pull/16099)
660 }
661 let lir_id = self.allocate_lir_id();
662 let raw_plan = std::mem::replace(
663 input_plan,
664 PlanNode::Constant {
665 rows: Ok(Vec::new()),
666 }
667 .as_plan(lir_id),
668 );
669 *input_plan = self.arrange_by(raw_plan, missing, input_keys, arity);
670 }
671 }
672 // Return the plan, and no arrangements.
673 let lir_id = self.allocate_lir_id();
674 (
675 PlanNode::Join {
676 inputs: plans,
677 plan,
678 }
679 .as_plan(lir_id),
680 AvailableCollections::new_raw(),
681 )
682 }
683 MirRelationExpr::Reduce {
684 input,
685 group_key,
686 aggregates,
687 monotonic,
688 expected_group_size,
689 } => {
690 if aggregates
691 .iter()
692 .any(|agg| agg.func.can_fuse_with_unnest_list())
693 {
694 // This case should have been handled at the `MirRelationExpr::FlatMap` case
695 // above. But that has a pretty complicated pattern matching, so it's not
696 // unthinkable that it fails.
697 soft_panic_or_log!(
698 "Window function performance issue: `reduce_unnest_list_fusion` failed"
699 );
700 }
701 self.lower_reduce(
702 input,
703 group_key,
704 aggregates,
705 monotonic,
706 expected_group_size,
707 &mut mfp,
708 false,
709 )?
710 }
711 MirRelationExpr::TopK {
712 input,
713 group_key,
714 order_key,
715 limit,
716 offset,
717 monotonic,
718 expected_group_size,
719 } => {
720 let arity = input.arity();
721 let (input, keys) = self.lower_mir_expr(input)?;
722
723 let top_k_plan = TopKPlan::create_from(
724 group_key.clone(),
725 order_key.clone(),
726 *offset,
727 limit.clone(),
728 arity,
729 *monotonic,
730 *expected_group_size,
731 );
732
733 // We don't have an MFP here -- install an operator to permute the
734 // input, if necessary.
735 let input = if !keys.raw {
736 self.arrange_by(input, AvailableCollections::new_raw(), &keys, arity)
737 } else {
738 input
739 };
740 // Return the plan, and no arrangements.
741 let lir_id = self.allocate_lir_id();
742 (
743 PlanNode::TopK {
744 input: Box::new(input),
745 top_k_plan,
746 }
747 .as_plan(lir_id),
748 AvailableCollections::new_raw(),
749 )
750 }
751 MirRelationExpr::Negate { input } => {
752 let arity = input.arity();
753 let (input, keys) = self.lower_mir_expr(input)?;
754
755 // We don't have an MFP here -- install an operator to permute the
756 // input, if necessary.
757 let input = if !keys.raw {
758 self.arrange_by(input, AvailableCollections::new_raw(), &keys, arity)
759 } else {
760 input
761 };
762 // Return the plan, and no arrangements.
763 let lir_id = self.allocate_lir_id();
764 (
765 PlanNode::Negate {
766 input: Box::new(input),
767 }
768 .as_plan(lir_id),
769 AvailableCollections::new_raw(),
770 )
771 }
772 MirRelationExpr::Threshold { input } => {
773 let (plan, keys) = self.lower_mir_expr(input)?;
774 let arity = keys
775 .types
776 .as_ref()
777 .map(|types| types.len())
778 .unwrap_or_else(|| input.arity());
779 let (threshold_plan, required_arrangement) = ThresholdPlan::create_from(arity);
780 let mut types = keys.types.clone();
781 let plan = if !keys
782 .arranged
783 .iter()
784 .any(|(key, _, _)| key == &required_arrangement.0)
785 {
786 types = Some(types.unwrap_or_else(|| input.typ().column_types));
787 self.arrange_by(
788 plan,
789 AvailableCollections::new_arranged(
790 vec![required_arrangement],
791 types.clone(),
792 ),
793 &keys,
794 arity,
795 )
796 } else {
797 plan
798 };
799
800 let output_keys = threshold_plan.keys(types);
801 // Return the plan, and any produced keys.
802 let lir_id = self.allocate_lir_id();
803 (
804 PlanNode::Threshold {
805 input: Box::new(plan),
806 threshold_plan,
807 }
808 .as_plan(lir_id),
809 output_keys,
810 )
811 }
812 MirRelationExpr::Union { base, inputs } => {
813 let arity = base.arity();
814 let mut plans_keys = Vec::with_capacity(1 + inputs.len());
815 let (plan, keys) = self.lower_mir_expr(base)?;
816 plans_keys.push((plan, keys));
817 for input in inputs.iter() {
818 let (plan, keys) = self.lower_mir_expr(input)?;
819 plans_keys.push((plan, keys));
820 }
821 let plans = plans_keys
822 .into_iter()
823 .map(|(plan, keys)| {
824 // We don't have an MFP here -- install an operator to permute the
825 // input, if necessary.
826 if !keys.raw {
827 self.arrange_by(plan, AvailableCollections::new_raw(), &keys, arity)
828 } else {
829 plan
830 }
831 })
832 .collect();
833 // Return the plan and no arrangements.
834 let lir_id = self.allocate_lir_id();
835 (
836 PlanNode::Union {
837 inputs: plans,
838 consolidate_output: false,
839 }
840 .as_plan(lir_id),
841 AvailableCollections::new_raw(),
842 )
843 }
844 MirRelationExpr::ArrangeBy { input, keys } => {
845 let input_mir = input;
846 let (input, mut input_keys) = self.lower_mir_expr(input)?;
847 // Fill the `types` in `input_keys` if not already present.
848 let input_types = input_keys
849 .types
850 .get_or_insert_with(|| input_mir.typ().column_types);
851 let arity = input_types.len();
852
853 // Determine keys that are not present in `input_keys`.
854 let new_keys = keys
855 .iter()
856 .filter(|k1| !input_keys.arranged.iter().any(|(k2, _, _)| k1 == &k2))
857 .cloned()
858 .collect::<Vec<_>>();
859 if new_keys.is_empty() {
860 (input, input_keys)
861 } else {
862 let new_keys = new_keys.iter().cloned().map(|k| {
863 let (permutation, thinning) = permutation_for_arrangement(&k, arity);
864 (k, permutation, thinning)
865 });
866 let (input_key, input_mfp) = if let Some((input_key, permutation, thinning)) =
867 input_keys.arbitrary_arrangement()
868 {
869 let mut mfp = MapFilterProject::new(arity);
870 mfp.permute_fn(|c| permutation[c], thinning.len() + input_key.len());
871 (Some(input_key.clone()), mfp)
872 } else {
873 (None, MapFilterProject::new(arity))
874 };
875 input_keys.arranged.extend(new_keys);
876 input_keys.arranged.sort_by(|k1, k2| k1.0.cmp(&k2.0));
877
878 // Return the plan and extended keys.
879 let lir_id = self.allocate_lir_id();
880 (
881 PlanNode::ArrangeBy {
882 input: Box::new(input),
883 forms: input_keys.clone(),
884 input_key,
885 input_mfp,
886 }
887 .as_plan(lir_id),
888 input_keys,
889 )
890 }
891 }
892 };
893
894 // If the plan stage did not absorb all linear operators, introduce a new stage to implement them.
895 if !mfp.is_identity() {
896 // Seek out an arrangement key that might be constrained to a literal.
897 // TODO: Improve key selection heuristic.
898 let key_val = keys
899 .arranged
900 .iter()
901 .filter_map(|(key, permutation, thinning)| {
902 let mut mfp = mfp.clone();
903 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
904 mfp.literal_constraints(key)
905 .map(|val| (key.clone(), permutation, thinning, val))
906 })
907 .max_by_key(|(key, _, _, _)| key.len());
908
909 // Input key selection strategy:
910 // (1) If we can read a key at a particular value, do so
911 // (2) Otherwise, if there is a key that causes the MFP to be the identity, and
912 // therefore allows us to avoid discarding the arrangement, use that.
913 // (3) Otherwise, if there is _some_ key, use that,
914 // (4) Otherwise just read the raw collection.
915 let input_key_val = if let Some((key, permutation, thinning, val)) = key_val {
916 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
917
918 Some((key, Some(val)))
919 } else if let Some((key, permutation, thinning)) =
920 keys.arranged.iter().find(|(key, permutation, thinning)| {
921 let mut mfp = mfp.clone();
922 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
923 mfp.is_identity()
924 })
925 {
926 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
927 Some((key.clone(), None))
928 } else if let Some((key, permutation, thinning)) = keys.arbitrary_arrangement() {
929 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
930 Some((key.clone(), None))
931 } else {
932 None
933 };
934
935 if mfp.is_identity() {
936 // We have discovered a key
937 // whose permutation causes the MFP to actually
938 // be the identity! We can keep it around,
939 // but without its permutation this time,
940 // and with a trivial thinning of the right length.
941 let (key, val) = input_key_val.unwrap();
942 let (_old_key, old_permutation, old_thinning) = keys
943 .arranged
944 .iter_mut()
945 .find(|(key2, _, _)| key2 == &key)
946 .unwrap();
947 *old_permutation = (0..mfp.input_arity).collect();
948 let old_thinned_arity = old_thinning.len();
949 *old_thinning = (0..old_thinned_arity).collect();
950 // Get rid of all other forms, as this is now the only one known to be valid.
951 // TODO[btv] we can probably save the other arrangements too, if we adjust their permutations.
952 // This is not hard to do, but leaving it for a quick follow-up to avoid making the present diff too unwieldy.
953 keys.arranged.retain(|(key2, _, _)| key2 == &key);
954 keys.raw = false;
955
956 // Creating a Plan::Mfp node is now logically unnecessary, but we
957 // should do so anyway when `val` is populated, so that
958 // the `key_val` optimization gets applied.
959 let lir_id = self.allocate_lir_id();
960 if val.is_some() {
961 plan = PlanNode::Mfp {
962 input: Box::new(plan),
963 mfp,
964 input_key_val: Some((key, val)),
965 }
966 .as_plan(lir_id)
967 }
968 } else {
969 let lir_id = self.allocate_lir_id();
970 plan = PlanNode::Mfp {
971 input: Box::new(plan),
972 mfp,
973 input_key_val,
974 }
975 .as_plan(lir_id);
976 keys = AvailableCollections::new_raw();
977 }
978 }
979
980 Ok((plan, keys))
981 }
982
983 /// Lowers a `Reduce` with the given fields and an `mfp_on_top`, which is the MFP that is
984 /// originally on top of the `Reduce`. This MFP, or a part of it, might be fused into the
985 /// `Reduce`, in which case `mfp_on_top` is mutated to be the residual MFP, i.e., what was not
986 /// fused.
987 fn lower_reduce<T: Timestamp>(
988 &mut self,
989 input: &MirRelationExpr,
990 group_key: &Vec<MirScalarExpr>,
991 aggregates: &Vec<AggregateExpr>,
992 monotonic: &bool,
993 expected_group_size: &Option<u64>,
994 mfp_on_top: &mut MapFilterProject,
995 fused_unnest_list: bool,
996 ) -> Result<(Plan<T>, AvailableCollections), String> {
997 let input_arity = input.arity();
998 let (input, keys) = self.lower_mir_expr(input)?;
999 let (input_key, permutation_and_new_arity) =
1000 if let Some((input_key, permutation, thinning)) = keys.arbitrary_arrangement() {
1001 (
1002 Some(input_key.clone()),
1003 Some((permutation.clone(), thinning.len() + input_key.len())),
1004 )
1005 } else {
1006 (None, None)
1007 };
1008 let key_val_plan = KeyValPlan::new(
1009 input_arity,
1010 group_key,
1011 aggregates,
1012 permutation_and_new_arity,
1013 );
1014 let reduce_plan = ReducePlan::create_from(
1015 aggregates.clone(),
1016 *monotonic,
1017 *expected_group_size,
1018 fused_unnest_list,
1019 );
1020 // Return the plan, and the keys it produces.
1021 let mfp_after;
1022 let output_arity;
1023 if self.enable_reduce_mfp_fusion {
1024 (mfp_after, *mfp_on_top, output_arity) =
1025 reduce_plan.extract_mfp_after(mfp_on_top.clone(), group_key.len());
1026 } else {
1027 (mfp_after, output_arity) = (
1028 MapFilterProject::new(mfp_on_top.input_arity),
1029 group_key.len() + aggregates.len(),
1030 );
1031 }
1032 soft_assert_eq_or_log!(
1033 mfp_on_top.input_arity,
1034 output_arity,
1035 "Output arity of reduce must match input arity for MFP on top of it"
1036 );
1037 let output_keys = reduce_plan.keys(group_key.len(), output_arity);
1038 let lir_id = self.allocate_lir_id();
1039 Ok((
1040 PlanNode::Reduce {
1041 input: Box::new(input),
1042 key_val_plan,
1043 plan: reduce_plan,
1044 input_key,
1045 mfp_after,
1046 }
1047 .as_plan(lir_id),
1048 output_keys,
1049 ))
1050 }
1051
1052 /// Replace the plan with another one
1053 /// that has the collection in some additional forms.
1054 pub fn arrange_by<T>(
1055 &mut self,
1056 plan: Plan<T>,
1057 collections: AvailableCollections,
1058 old_collections: &AvailableCollections,
1059 arity: usize,
1060 ) -> Plan<T> {
1061 if let Plan {
1062 node:
1063 PlanNode::ArrangeBy {
1064 input,
1065 mut forms,
1066 input_key,
1067 input_mfp,
1068 },
1069 lir_id,
1070 } = plan
1071 {
1072 forms.raw |= collections.raw;
1073 forms.arranged.extend(collections.arranged);
1074 forms.arranged.sort_by(|k1, k2| k1.0.cmp(&k2.0));
1075 forms.arranged.dedup_by(|k1, k2| k1.0 == k2.0);
1076 if forms.types.is_none() {
1077 forms.types = collections.types;
1078 } else {
1079 assert!(collections.types.is_none() || collections.types == forms.types);
1080 }
1081 PlanNode::ArrangeBy {
1082 input,
1083 forms,
1084 input_key,
1085 input_mfp,
1086 }
1087 .as_plan(lir_id)
1088 } else {
1089 let (input_key, input_mfp) = if let Some((input_key, permutation, thinning)) =
1090 old_collections.arbitrary_arrangement()
1091 {
1092 let mut mfp = MapFilterProject::new(arity);
1093 mfp.permute_fn(|c| permutation[c], thinning.len() + input_key.len());
1094 (Some(input_key.clone()), mfp)
1095 } else {
1096 (None, MapFilterProject::new(arity))
1097 };
1098 let lir_id = self.allocate_lir_id();
1099 PlanNode::ArrangeBy {
1100 input: Box::new(plan),
1101 forms: collections,
1102 input_key,
1103 input_mfp,
1104 }
1105 .as_plan(lir_id)
1106 }
1107 }
1108}
1109
1110/// Various bits of state to print along with error messages during LIR planning,
1111/// to aid debugging.
1112#[derive(Clone, Debug)]
1113pub struct LirDebugInfo {
1114 debug_name: String,
1115 id: GlobalId,
1116}
1117
1118impl std::fmt::Display for LirDebugInfo {
1119 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1120 write!(f, "Debug name: {}; id: {}", self.debug_name, self.id)
1121 }
1122}