mz_compute_types/plan/lowering.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Lowering [`DataflowDescription`]s from MIR ([`MirRelationExpr`]) to LIR ([`Plan`]).
11
12use std::collections::BTreeMap;
13
14use columnar::Len;
15use itertools::Itertools;
16use mz_expr::JoinImplementation::{DeltaQuery, Differential, IndexedFilter, Unimplemented};
17use mz_expr::{
18 AggregateExpr, Id, JoinInputMapper, MapFilterProject, MirRelationExpr, MirScalarExpr,
19 OptimizedMirRelationExpr, TableFunc, permutation_for_arrangement,
20};
21use mz_ore::{assert_none, soft_assert_eq_or_log, soft_panic_or_log};
22use mz_repr::optimize::OptimizerFeatures;
23use mz_repr::{GlobalId, Timestamp};
24
25use crate::dataflows::{BuildDesc, DataflowDescription, IndexImport};
26use crate::plan::join::{DeltaJoinPlan, JoinPlan, LinearJoinPlan};
27use crate::plan::reduce::{KeyValPlan, ReducePlan};
28use crate::plan::threshold::ThresholdPlan;
29use crate::plan::top_k::TopKPlan;
30use crate::plan::{AvailableCollections, GetPlan, LirId, Plan, PlanNode};
31
32pub(super) struct Context {
33 /// Known bindings to (possibly arranged) collections.
34 arrangements: BTreeMap<Id, AvailableCollections>,
35 /// Tracks the next available `LirId`.
36 next_lir_id: LirId,
37 /// Information to print along with error messages.
38 debug_info: LirDebugInfo,
39 /// Whether to enable fusion of MFPs in reductions.
40 enable_reduce_mfp_fusion: bool,
41}
42
43impl Context {
44 pub fn new(debug_name: String, features: &OptimizerFeatures) -> Self {
45 Self {
46 arrangements: Default::default(),
47 next_lir_id: LirId(1),
48 debug_info: LirDebugInfo {
49 debug_name,
50 id: GlobalId::Transient(0),
51 },
52 enable_reduce_mfp_fusion: features.enable_reduce_mfp_fusion,
53 }
54 }
55
56 fn allocate_lir_id(&mut self) -> LirId {
57 let id = self.next_lir_id;
58 self.next_lir_id = LirId(
59 self.next_lir_id
60 .0
61 .checked_add(1)
62 .expect("No LirId overflow"),
63 );
64 id
65 }
66
67 pub fn lower(
68 mut self,
69 desc: DataflowDescription<OptimizedMirRelationExpr>,
70 ) -> Result<DataflowDescription<Plan>, String> {
71 // Sources might provide arranged forms of their data, in the future.
72 // Indexes provide arranged forms of their data.
73 for IndexImport {
74 desc: index_desc,
75 typ,
76 ..
77 } in desc.index_imports.values()
78 {
79 let key = index_desc.key.clone();
80 // TODO[btv] - We should be told the permutation by
81 // `index_desc`, and it should have been generated
82 // at the same point the thinning logic was.
83 //
84 // We should for sure do that soon, but it requires
85 // a bit of a refactor, so for now we just
86 // _assume_ that they were both generated by `permutation_for_arrangement`,
87 // and recover it here.
88 let (permutation, thinning) = permutation_for_arrangement(&key, typ.arity());
89 let index_keys = self
90 .arrangements
91 .entry(Id::Global(index_desc.on_id))
92 .or_insert_with(AvailableCollections::default);
93 index_keys.arranged.push((key, permutation, thinning));
94 }
95 for id in desc.source_imports.keys() {
96 self.arrangements
97 .entry(Id::Global(*id))
98 .or_insert_with(AvailableCollections::new_raw);
99 }
100
101 // Build each object in order, registering the arrangements it forms.
102 let mut objects_to_build = Vec::with_capacity(desc.objects_to_build.len());
103 for build in desc.objects_to_build {
104 self.debug_info.id = build.id;
105 let (plan, keys) = self.lower_mir_expr(&build.plan)?;
106
107 self.arrangements.insert(Id::Global(build.id), keys);
108 objects_to_build.push(BuildDesc { id: build.id, plan });
109 }
110
111 Ok(DataflowDescription {
112 source_imports: desc.source_imports,
113 index_imports: desc.index_imports,
114 objects_to_build,
115 index_exports: desc.index_exports,
116 sink_exports: desc.sink_exports,
117 as_of: desc.as_of,
118 until: desc.until,
119 initial_storage_as_of: desc.initial_storage_as_of,
120 refresh_schedule: desc.refresh_schedule,
121 debug_name: desc.debug_name,
122 time_dependence: desc.time_dependence,
123 })
124 }
125
126 /// This method converts a MirRelationExpr into a plan that can be directly rendered.
127 ///
128 /// The rough structure is that we repeatedly extract map/filter/project operators
129 /// from each expression we see, bundle them up as a `MapFilterProject` object, and
130 /// then produce a plan for the combination of that with the next operator.
131 ///
132 /// The method accesses `self.arrangements`, which it will locally add to and remove from for
133 /// `Let` bindings (by the end of the call it should contain the same bindings as when it
134 /// started).
135 ///
136 /// The result of the method is both a `Plan`, but also a list of arrangements that
137 /// are certain to be produced, which can be relied on by the next steps in the plan.
138 /// Each of the arrangement keys is associated with an MFP that must be applied if that
139 /// arrangement is used, to back out the permutation associated with that arrangement.
140 ///
141 /// An empty list of arrangement keys indicates that only a `Collection` stream can
142 /// be assumed to exist.
143 fn lower_mir_expr(
144 &mut self,
145 expr: &MirRelationExpr,
146 ) -> Result<(Plan, AvailableCollections), String> {
147 // This function is recursive and can overflow its stack, so grow it if
148 // needed. The growth here is unbounded. Our general solution for this problem
149 // is to use [`ore::stack::RecursionGuard`] to additionally limit the stack
150 // depth. That however requires upstream error handling. This function is
151 // currently called by the Coordinator after calls to `catalog_transact`,
152 // and thus are not allowed to fail. Until that allows errors, we choose
153 // to allow the unbounded growth here. We are though somewhat protected by
154 // higher levels enforcing their own limits on stack depth (in the parser,
155 // transformer/desugarer, and planner).
156 mz_ore::stack::maybe_grow(|| self.lower_mir_expr_stack_safe(expr))
157 }
158
159 fn lower_mir_expr_stack_safe(
160 &mut self,
161 expr: &MirRelationExpr,
162 ) -> Result<(Plan, AvailableCollections), String> {
163 // Extract a maximally large MapFilterProject from `expr`.
164 // We will then try and push this in to the resulting expression.
165 //
166 // Importantly, `mfp` may contain temporal operators and not be a "safe" MFP.
167 // While we would eventually like all plan stages to be able to absorb such
168 // general operators, not all of them can.
169 let (mut mfp, expr) = MapFilterProject::extract_from_expression(expr);
170 // We attempt to plan what we have remaining, in the context of `mfp`.
171 // We may not be able to do this, and must wrap some operators with a `Mfp` stage.
172 let (mut plan, mut keys) = match expr {
173 // These operators should have been extracted from the expression.
174 MirRelationExpr::Map { .. } => {
175 panic!("This operator should have been extracted");
176 }
177 MirRelationExpr::Filter { .. } => {
178 panic!("This operator should have been extracted");
179 }
180 MirRelationExpr::Project { .. } => {
181 panic!("This operator should have been extracted");
182 }
183 // These operators may not have been extracted, and need to result in a `Plan`.
184 MirRelationExpr::Constant { rows, typ: _ } => {
185 let lir_id = self.allocate_lir_id();
186 let node = PlanNode::Constant {
187 rows: rows.clone().map(|rows| {
188 rows.into_iter()
189 .map(|(row, diff)| (row, Timestamp::MIN, diff))
190 .collect()
191 }),
192 };
193 // The plan, not arranged in any way.
194 (node.as_plan(lir_id), AvailableCollections::new_raw())
195 }
196 MirRelationExpr::Get { id, typ: _, .. } => {
197 // This stage can absorb arbitrary MFP operators.
198 let mut mfp = mfp.take();
199 // If `mfp` is the identity, we can surface all imported arrangements.
200 // Otherwise, we apply `mfp` and promise no arrangements.
201 let mut in_keys = self
202 .arrangements
203 .get(id)
204 .cloned()
205 .unwrap_or_else(AvailableCollections::new_raw);
206
207 // Seek out an arrangement key that might be constrained to a literal.
208 // Note: this code has very little use nowadays, as its job was mostly taken over
209 // by `LiteralConstraints` (see in the below longer comment).
210 let key_val = in_keys
211 .arranged
212 .iter()
213 .filter_map(|key| {
214 mfp.literal_constraints(&key.0)
215 .map(|val| (key.clone(), val))
216 })
217 .max_by_key(|(key, _val)| key.0.len());
218
219 // Determine the plan of action for the `Get` stage.
220 let plan = if let Some(((key, permutation, thinning), val)) = &key_val {
221 // This code path used to handle looking up literals from indexes, but it's
222 // mostly deprecated, as this is nowadays performed by the `LiteralConstraints`
223 // MIR transform instead. However, it's still called in a couple of tricky
224 // special cases:
225 // - `LiteralConstraints` handles only Gets of global ids, so this code still
226 // gets to handle Filters on top of Gets of local ids.
227 // - Lowering does a `MapFilterProject::extract_from_expression`, while
228 // `LiteralConstraints` does
229 // `MapFilterProject::extract_non_errors_from_expr_mut`.
230 // - It might happen that new literal constraint optimization opportunities
231 // appear somewhere near the end of the MIR optimizer after
232 // `LiteralConstraints` has already run.
233 // (Also note that a similar literal constraint handling machinery is also
234 // present when handling the leftover MFP after this big match.)
235 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
236 in_keys.arranged = vec![(key.clone(), permutation.clone(), thinning.clone())];
237 GetPlan::Arrangement(key.clone(), Some(val.clone()), mfp)
238 } else if !mfp.is_identity() {
239 // We need to ensure a collection exists, which means we must form it.
240 if let Some((key, permutation, thinning)) =
241 in_keys.arbitrary_arrangement().cloned()
242 {
243 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
244 in_keys.arranged = vec![(key.clone(), permutation, thinning)];
245 GetPlan::Arrangement(key, None, mfp)
246 } else {
247 GetPlan::Collection(mfp)
248 }
249 } else {
250 // By default, just pass input arrangements through.
251 GetPlan::PassArrangements
252 };
253
254 let out_keys = if let GetPlan::PassArrangements = plan {
255 in_keys.clone()
256 } else {
257 AvailableCollections::new_raw()
258 };
259
260 let lir_id = self.allocate_lir_id();
261 let node = PlanNode::Get {
262 id: id.clone(),
263 keys: in_keys,
264 plan,
265 };
266 // Return the plan, and any keys if an identity `mfp`.
267 (node.as_plan(lir_id), out_keys)
268 }
269 MirRelationExpr::Let { id, value, body } => {
270 // It would be unfortunate to have a non-trivial `mfp` here, as we hope
271 // that they would be pushed down. I am not sure if we should take the
272 // initiative to push down the `mfp` ourselves.
273
274 // Plan the value using only the initial arrangements, but
275 // introduce any resulting arrangements bound to `id`.
276 let (value, v_keys) = self.lower_mir_expr(value)?;
277 let pre_existing = self.arrangements.insert(Id::Local(*id), v_keys);
278 assert_none!(pre_existing);
279 // Plan the body using initial and `value` arrangements,
280 // and then remove reference to the value arrangements.
281 let (body, b_keys) = self.lower_mir_expr(body)?;
282 self.arrangements.remove(&Id::Local(*id));
283 // Return the plan, and any `body` arrangements.
284 let lir_id = self.allocate_lir_id();
285 (
286 PlanNode::Let {
287 id: id.clone(),
288 value: Box::new(value),
289 body: Box::new(body),
290 }
291 .as_plan(lir_id),
292 b_keys,
293 )
294 }
295 MirRelationExpr::LetRec {
296 ids,
297 values,
298 limits,
299 body,
300 } => {
301 assert_eq!(ids.len(), values.len());
302 assert_eq!(ids.len(), limits.len());
303 // Plan the values using only the available arrangements, but
304 // introduce any resulting arrangements bound to each `id`.
305 // Arrangements made available cannot be used by prior bindings,
306 // as we cannot circulate an arrangement through a `Variable` yet.
307 let mut lir_values = Vec::with_capacity(values.len());
308 for (id, value) in ids.iter().zip_eq(values) {
309 let (mut lir_value, mut v_keys) = self.lower_mir_expr(value)?;
310 // If `v_keys` does not contain an unarranged collection, we must form it.
311 if !v_keys.raw {
312 // Choose an "arbitrary" arrangement; TODO: prefer a specific one.
313 let (input_key, permutation, thinning) =
314 v_keys.arbitrary_arrangement().unwrap();
315 let mut input_mfp = MapFilterProject::new(value.arity());
316 input_mfp.permute_fn(|c| permutation[c], thinning.len() + input_key.len());
317 let input_key = Some(input_key.clone());
318
319 let forms = AvailableCollections::new_raw();
320
321 // We just want to insert an `ArrangeBy` to form an unarranged collection,
322 // but there is a complication: We shouldn't break the invariant (created by
323 // `NormalizeLets`, and relied upon by the rendering) that there isn't
324 // anything between two `LetRec`s. So if `lir_value` is itself a `LetRec`,
325 // then we insert the `ArrangeBy` on the `body` of the inner `LetRec`,
326 // instead of on top of the inner `LetRec`.
327 lir_value = match lir_value {
328 Plan {
329 node:
330 PlanNode::LetRec {
331 ids,
332 values,
333 limits,
334 body,
335 },
336 lir_id,
337 } => {
338 let inner_lir_id = self.allocate_lir_id();
339 PlanNode::LetRec {
340 ids,
341 values,
342 limits,
343 body: Box::new(
344 PlanNode::ArrangeBy {
345 input_key,
346 input: body,
347 input_mfp,
348 forms,
349 }
350 .as_plan(inner_lir_id),
351 ),
352 }
353 .as_plan(lir_id)
354 }
355 lir_value => {
356 let lir_id = self.allocate_lir_id();
357 PlanNode::ArrangeBy {
358 input_key,
359 input: Box::new(lir_value),
360 input_mfp,
361 forms,
362 }
363 .as_plan(lir_id)
364 }
365 };
366 v_keys.raw = true;
367 }
368 let pre_existing = self.arrangements.insert(Id::Local(*id), v_keys);
369 assert_none!(pre_existing);
370 lir_values.push(lir_value);
371 }
372 // As we exit the iterative scope, we must leave all arrangements behind,
373 // as they reference a timestamp coordinate that must be stripped off.
374 for id in ids.iter() {
375 self.arrangements
376 .insert(Id::Local(*id), AvailableCollections::new_raw());
377 }
378 // Plan the body using initial and `value` arrangements,
379 // and then remove reference to the value arrangements.
380 let (body, b_keys) = self.lower_mir_expr(body)?;
381 for id in ids.iter() {
382 self.arrangements.remove(&Id::Local(*id));
383 }
384 // Return the plan, and any `body` arrangements.
385 let lir_id = self.allocate_lir_id();
386 (
387 PlanNode::LetRec {
388 ids: ids.clone(),
389 values: lir_values,
390 limits: limits.clone(),
391 body: Box::new(body),
392 }
393 .as_plan(lir_id),
394 b_keys,
395 )
396 }
397 MirRelationExpr::FlatMap {
398 input: flat_map_input,
399 func,
400 exprs,
401 } => {
402 // A `FlatMap UnnestList` that comes after the `Reduce` of a window function can be
403 // fused into the lowered `Reduce`.
404 //
405 // In theory, we could have implemented this also as an MIR transform. However, this
406 // is more of a physical optimization, which are sometimes unpleasant to make a part
407 // of the MIR pipeline. The specific problem here with putting this into the MIR
408 // pipeline would be that we'd need to modify MIR's semantics: MIR's Reduce
409 // currently always emits exactly 1 row per group, but the fused Reduce-FlatMap can
410 // emit multiple rows per group. Such semantic changes of MIR are very scary, since
411 // various parts of the optimizer assume that Reduce emits only 1 row per group, and
412 // it would be very hard to hunt down all these parts. (For example, key inference
413 // infers the group key as a unique key.)
414 let fused_with_reduce = 'fusion: {
415 if !matches!(func, TableFunc::UnnestList { .. }) {
416 break 'fusion None;
417 }
418 // We might have a Project of a single col between the FlatMap and the
419 // Reduce. (It projects away the grouping keys of the Reduce, and keeps the
420 // result of the window function.)
421 let (maybe_reduce, num_grouping_keys) = if let MirRelationExpr::Project {
422 input: project_input,
423 outputs: projection,
424 } = &**flat_map_input
425 {
426 // We want this to be a single column, because we'll want to deal with only
427 // one aggregation in the `Reduce`. (The aggregation of a window function
428 // always stands alone currently: we plan them separately from other
429 // aggregations, and Reduces are never fused. When window functions are
430 // fused with each other, they end up in one aggregation. When there are
431 // multiple window functions in the same SELECT, but can't be fused, they
432 // end up in different Reduces.)
433 if let &[single_col] = &**projection {
434 (project_input, single_col)
435 } else {
436 break 'fusion None;
437 }
438 } else {
439 (flat_map_input, 0)
440 };
441 if let MirRelationExpr::Reduce {
442 input,
443 group_key,
444 aggregates,
445 monotonic,
446 expected_group_size,
447 } = &**maybe_reduce
448 {
449 if group_key.len() != num_grouping_keys
450 || aggregates.len() != 1
451 || !aggregates[0].func.can_fuse_with_unnest_list()
452 {
453 break 'fusion None;
454 }
455 // At the beginning, `non_fused_mfp_above_flat_map` will be the original MFP
456 // above the FlatMap. Later, we'll mutate this to be the residual MFP that
457 // didn't get fused into the `Reduce`.
458 let non_fused_mfp_above_flat_map = &mut mfp;
459 let reduce_output_arity = num_grouping_keys + 1;
460 // We are fusing away the list that the FlatMap would have been unnesting,
461 // so the column that had that list disappears, so we have to permute the
462 // MFP above the FlatMap with this column disappearance.
463 let tweaked_mfp = {
464 let mut mfp = non_fused_mfp_above_flat_map.clone();
465 if mfp.demand().contains(&0) {
466 // I don't think this can happen currently that this MFP would
467 // refer to the list column, because both the list column and the
468 // MFP were constructed by the HIR-to-MIR lowering, so it's not just
469 // some random MFP that we are seeing here. But anyhow, it's better
470 // to check this here for robustness against future code changes.
471 break 'fusion None;
472 }
473 let permutation: BTreeMap<_, _> =
474 (1..mfp.input_arity).map(|col| (col, col - 1)).collect();
475 mfp.permute_fn(|c| permutation[&c], mfp.input_arity - 1);
476 mfp
477 };
478 // We now put together the project that was before the FlatMap, and the
479 // tweaked version of the MFP that was after the FlatMap.
480 // (Part of this MFP might be fused into the Reduce.)
481 let mut project_and_tweaked_mfp = {
482 let mut mfp = MapFilterProject::new(reduce_output_arity);
483 mfp = mfp.project(vec![num_grouping_keys]);
484 mfp = MapFilterProject::compose(mfp, tweaked_mfp);
485 mfp
486 };
487 let fused = self.lower_reduce(
488 input,
489 group_key,
490 aggregates,
491 monotonic,
492 expected_group_size,
493 &mut project_and_tweaked_mfp,
494 true,
495 )?;
496 // Update the residual MFP.
497 *non_fused_mfp_above_flat_map = project_and_tweaked_mfp;
498 Some(fused)
499 } else {
500 break 'fusion None;
501 }
502 };
503 if let Some(fused_with_reduce) = fused_with_reduce {
504 fused_with_reduce
505 } else {
506 // Couldn't fuse it with a `Reduce`, so lower as a normal `FlatMap`.
507 let (input, keys) = self.lower_mir_expr(flat_map_input)?;
508 // This stage can absorb arbitrary MFP instances.
509 let mfp = mfp.take();
510 let mut exprs = exprs.clone();
511 let input_key = if let Some((k, permutation, _)) = keys.arbitrary_arrangement()
512 {
513 // We don't permute the MFP here, because it runs _after_ the table function,
514 // whose output is in a fixed order.
515 //
516 // We _do_, however, need to permute the `expr`s that provide input to the
517 // `func`.
518 let permutation = permutation.iter().cloned().enumerate().collect();
519 for expr in &mut exprs {
520 expr.permute_map(&permutation);
521 }
522
523 Some(k.clone())
524 } else {
525 None
526 };
527
528 let lir_id = self.allocate_lir_id();
529 // Return the plan, and no arrangements.
530 (
531 PlanNode::FlatMap {
532 input_key,
533 input: Box::new(input),
534 exprs: exprs.clone(),
535 func: func.clone(),
536 mfp_after: mfp,
537 }
538 .as_plan(lir_id),
539 AvailableCollections::new_raw(),
540 )
541 }
542 }
543 MirRelationExpr::Join {
544 inputs,
545 equivalences,
546 implementation,
547 } => {
548 // Plan each of the join inputs independently.
549 // The `plans` get surfaced upwards, and the `input_keys` should
550 // be used as part of join planning / to validate the existing
551 // plans / to aid in indexed seeding of update streams.
552 let mut plans = Vec::new();
553 let mut input_keys = Vec::new();
554 let mut input_arities = Vec::new();
555 for input in inputs.iter() {
556 let (plan, keys) = self.lower_mir_expr(input)?;
557 input_arities.push(input.arity());
558 plans.push(plan);
559 input_keys.push(keys);
560 }
561
562 let input_mapper =
563 JoinInputMapper::new_from_input_arities(input_arities.iter().copied());
564
565 // Extract temporal predicates as joins cannot currently absorb them.
566 let (plan, missing) = match implementation {
567 IndexedFilter(_coll_id, _idx_id, key, _val) => {
568 // Start with the constant input. (This used to be important before database-issues#4016
569 // was fixed.)
570 let start: usize = 1;
571 let order = vec![(0usize, key.clone(), None)];
572 // All columns of the constant input will be part of the arrangement key.
573 let source_arrangement = (
574 (0..key.len())
575 .map(MirScalarExpr::column)
576 .collect::<Vec<_>>(),
577 (0..key.len()).collect::<Vec<_>>(),
578 Vec::<usize>::new(),
579 );
580 let (ljp, missing) = LinearJoinPlan::create_from(
581 start,
582 Some(&source_arrangement),
583 equivalences,
584 &order,
585 input_mapper,
586 &mut mfp,
587 &input_keys,
588 );
589 (JoinPlan::Linear(ljp), missing)
590 }
591 Differential((start, start_arr, _start_characteristic), order) => {
592 let source_arrangement = start_arr.as_ref().and_then(|key| {
593 input_keys[*start]
594 .arranged
595 .iter()
596 .find(|(k, _, _)| k == key)
597 .clone()
598 });
599 let (ljp, missing) = LinearJoinPlan::create_from(
600 *start,
601 source_arrangement,
602 equivalences,
603 order,
604 input_mapper,
605 &mut mfp,
606 &input_keys,
607 );
608 (JoinPlan::Linear(ljp), missing)
609 }
610 DeltaQuery(orders) => {
611 let (djp, missing) = DeltaJoinPlan::create_from(
612 equivalences,
613 orders,
614 input_mapper,
615 &mut mfp,
616 &input_keys,
617 );
618 (JoinPlan::Delta(djp), missing)
619 }
620 // Other plans are errors, and should be reported as such.
621 Unimplemented => return Err("unimplemented join".to_string()),
622 };
623 // The renderer will expect certain arrangements to exist; if any of those are not available, the join planning functions above should have returned them in
624 // `missing`. We thus need to plan them here so they'll exist.
625 let is_delta = matches!(plan, JoinPlan::Delta(_));
626 for (((input_plan, input_keys), missing), arity) in plans
627 .iter_mut()
628 .zip_eq(input_keys.iter())
629 .zip_eq(missing.into_iter())
630 .zip_eq(input_arities.iter().cloned())
631 {
632 if missing != Default::default() {
633 if is_delta {
634 // join_implementation.rs produced a sub-optimal plan here;
635 // we shouldn't plan delta joins at all if not all of the required
636 // arrangements are available. Soft panic in CI and log an error in
637 // production to increase the chances that we will catch all situations
638 // that violate this constraint.
639 soft_panic_or_log!("Arrangements depended on by delta join alarmingly absent: {:?}
640Dataflow info: {}
641This is not expected to cause incorrect results, but could indicate a performance issue in Materialize.", missing, self.debug_info);
642 } else {
643 soft_panic_or_log!("Arrangements depended on by a non-delta join are absent: {:?}
644Dataflow info: {}
645This is not expected to cause incorrect results, but could indicate a performance issue in Materialize.", missing, self.debug_info);
646 // Nowadays MIR transforms take care to insert MIR ArrangeBys for each
647 // Join input. (Earlier, they were missing in the following cases:
648 // - They were const-folded away for constant inputs. This is not
649 // happening since
650 // https://github.com/MaterializeInc/materialize/pull/16351
651 // - They were not being inserted for the constant input of
652 // `IndexedFilter`s. This was fixed in
653 // https://github.com/MaterializeInc/materialize/pull/20920
654 // - They were not being inserted for the first input of Differential
655 // joins. This was fixed in
656 // https://github.com/MaterializeInc/materialize/pull/16099)
657 }
658 let lir_id = self.allocate_lir_id();
659 let raw_plan = std::mem::replace(
660 input_plan,
661 PlanNode::Constant {
662 rows: Ok(Vec::new()),
663 }
664 .as_plan(lir_id),
665 );
666 *input_plan = self.arrange_by(raw_plan, missing, input_keys, arity);
667 }
668 }
669 // Return the plan, and no arrangements.
670 let lir_id = self.allocate_lir_id();
671 (
672 PlanNode::Join {
673 inputs: plans,
674 plan,
675 }
676 .as_plan(lir_id),
677 AvailableCollections::new_raw(),
678 )
679 }
680 MirRelationExpr::Reduce {
681 input,
682 group_key,
683 aggregates,
684 monotonic,
685 expected_group_size,
686 } => {
687 if aggregates
688 .iter()
689 .any(|agg| agg.func.can_fuse_with_unnest_list())
690 {
691 // This case should have been handled at the `MirRelationExpr::FlatMap` case
692 // above. But that has a pretty complicated pattern matching, so it's not
693 // unthinkable that it fails.
694 soft_panic_or_log!(
695 "Window function performance issue: `reduce_unnest_list_fusion` failed"
696 );
697 }
698 self.lower_reduce(
699 input,
700 group_key,
701 aggregates,
702 monotonic,
703 expected_group_size,
704 &mut mfp,
705 false,
706 )?
707 }
708 MirRelationExpr::TopK {
709 input,
710 group_key,
711 order_key,
712 limit,
713 offset,
714 monotonic,
715 expected_group_size,
716 } => {
717 let arity = input.arity();
718 let (input, keys) = self.lower_mir_expr(input)?;
719
720 let top_k_plan = TopKPlan::create_from(
721 group_key.clone(),
722 order_key.clone(),
723 *offset,
724 limit.clone(),
725 arity,
726 *monotonic,
727 *expected_group_size,
728 );
729
730 // We don't have an MFP here -- install an operator to permute the
731 // input, if necessary.
732 let input = if !keys.raw {
733 self.arrange_by(input, AvailableCollections::new_raw(), &keys, arity)
734 } else {
735 input
736 };
737 // Return the plan, and no arrangements.
738 let lir_id = self.allocate_lir_id();
739 (
740 PlanNode::TopK {
741 input: Box::new(input),
742 top_k_plan,
743 }
744 .as_plan(lir_id),
745 AvailableCollections::new_raw(),
746 )
747 }
748 MirRelationExpr::Negate { input } => {
749 let arity = input.arity();
750 let (input, keys) = self.lower_mir_expr(input)?;
751
752 // We don't have an MFP here -- install an operator to permute the
753 // input, if necessary.
754 let input = if !keys.raw {
755 self.arrange_by(input, AvailableCollections::new_raw(), &keys, arity)
756 } else {
757 input
758 };
759 // Return the plan, and no arrangements.
760 let lir_id = self.allocate_lir_id();
761 (
762 PlanNode::Negate {
763 input: Box::new(input),
764 }
765 .as_plan(lir_id),
766 AvailableCollections::new_raw(),
767 )
768 }
769 MirRelationExpr::Threshold { input } => {
770 let (plan, keys) = self.lower_mir_expr(input)?;
771 let arity = input.arity();
772 let (threshold_plan, required_arrangement) = ThresholdPlan::create_from(arity);
773 let plan = if !keys
774 .arranged
775 .iter()
776 .any(|(key, _, _)| key == &required_arrangement.0)
777 {
778 self.arrange_by(
779 plan,
780 AvailableCollections::new_arranged(vec![required_arrangement]),
781 &keys,
782 arity,
783 )
784 } else {
785 plan
786 };
787
788 let output_keys = threshold_plan.keys();
789 // Return the plan, and any produced keys.
790 let lir_id = self.allocate_lir_id();
791 (
792 PlanNode::Threshold {
793 input: Box::new(plan),
794 threshold_plan,
795 }
796 .as_plan(lir_id),
797 output_keys,
798 )
799 }
800 MirRelationExpr::Union { base, inputs } => {
801 let arity = base.arity();
802 let mut plans_keys = Vec::with_capacity(1 + inputs.len());
803 let (plan, keys) = self.lower_mir_expr(base)?;
804 plans_keys.push((plan, keys));
805 for input in inputs.iter() {
806 let (plan, keys) = self.lower_mir_expr(input)?;
807 plans_keys.push((plan, keys));
808 }
809 let plans = plans_keys
810 .into_iter()
811 .map(|(plan, keys)| {
812 // We don't have an MFP here -- install an operator to permute the
813 // input, if necessary.
814 if !keys.raw {
815 self.arrange_by(plan, AvailableCollections::new_raw(), &keys, arity)
816 } else {
817 plan
818 }
819 })
820 .collect();
821 // Return the plan and no arrangements.
822 let lir_id = self.allocate_lir_id();
823 (
824 PlanNode::Union {
825 inputs: plans,
826 consolidate_output: false,
827 }
828 .as_plan(lir_id),
829 AvailableCollections::new_raw(),
830 )
831 }
832 MirRelationExpr::ArrangeBy { input, keys } => {
833 let input_mir = input;
834 let (input, mut input_keys) = self.lower_mir_expr(input)?;
835 // Fill the `types` in `input_keys` if not already present.
836 let arity = input_mir.arity();
837
838 // Determine keys that are not present in `input_keys`.
839 let new_keys = keys
840 .iter()
841 .filter(|k1| !input_keys.arranged.iter().any(|(k2, _, _)| k1 == &k2))
842 .cloned()
843 .collect::<Vec<_>>();
844 if new_keys.is_empty() {
845 (input, input_keys)
846 } else {
847 let mut new_keys = new_keys
848 .iter()
849 .cloned()
850 .map(|k| {
851 let (permutation, thinning) = permutation_for_arrangement(&k, arity);
852 (k, permutation, thinning)
853 })
854 .collect::<Vec<_>>();
855 let forms = AvailableCollections {
856 raw: input_keys.raw,
857 arranged: new_keys.clone(),
858 };
859 let (input_key, input_mfp) = if let Some((input_key, permutation, thinning)) =
860 input_keys.arbitrary_arrangement()
861 {
862 let mut mfp = MapFilterProject::new(arity);
863 mfp.permute_fn(|c| permutation[c], thinning.len() + input_key.len());
864 (Some(input_key.clone()), mfp)
865 } else {
866 (None, MapFilterProject::new(arity))
867 };
868 input_keys.arranged.append(&mut new_keys);
869 input_keys.arranged.sort_by(|k1, k2| k1.0.cmp(&k2.0));
870
871 // Return the plan and extended keys.
872 let lir_id = self.allocate_lir_id();
873 (
874 PlanNode::ArrangeBy {
875 input_key,
876 input: Box::new(input),
877 input_mfp,
878 forms,
879 }
880 .as_plan(lir_id),
881 input_keys,
882 )
883 }
884 }
885 };
886
887 // If the plan stage did not absorb all linear operators, introduce a new stage to implement them.
888 if !mfp.is_identity() {
889 // Seek out an arrangement key that might be constrained to a literal.
890 // TODO: Improve key selection heuristic.
891 let key_val = keys
892 .arranged
893 .iter()
894 .filter_map(|(key, permutation, thinning)| {
895 let mut mfp = mfp.clone();
896 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
897 mfp.literal_constraints(key)
898 .map(|val| (key.clone(), permutation, thinning, val))
899 })
900 .max_by_key(|(key, _, _, _)| key.len());
901
902 // Input key selection strategy:
903 // (1) If we can read a key at a particular value, do so
904 // (2) Otherwise, if there is a key that causes the MFP to be the identity, and
905 // therefore allows us to avoid discarding the arrangement, use that.
906 // (3) Otherwise, if there is _some_ key, use that,
907 // (4) Otherwise just read the raw collection.
908 let input_key_val = if let Some((key, permutation, thinning, val)) = key_val {
909 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
910
911 Some((key, Some(val)))
912 } else if let Some((key, permutation, thinning)) =
913 keys.arranged.iter().find(|(key, permutation, thinning)| {
914 let mut mfp = mfp.clone();
915 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
916 mfp.is_identity()
917 })
918 {
919 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
920 Some((key.clone(), None))
921 } else if let Some((key, permutation, thinning)) = keys.arbitrary_arrangement() {
922 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
923 Some((key.clone(), None))
924 } else {
925 None
926 };
927
928 if mfp.is_identity() {
929 // We have discovered a key
930 // whose permutation causes the MFP to actually
931 // be the identity! We can keep it around,
932 // but without its permutation this time,
933 // and with a trivial thinning of the right length.
934 let (key, val) = input_key_val.unwrap();
935 let (_old_key, old_permutation, old_thinning) = keys
936 .arranged
937 .iter_mut()
938 .find(|(key2, _, _)| key2 == &key)
939 .unwrap();
940 *old_permutation = (0..mfp.input_arity).collect();
941 let old_thinned_arity = old_thinning.len();
942 *old_thinning = (0..old_thinned_arity).collect();
943 // Get rid of all other forms, as this is now the only one known to be valid.
944 // TODO[btv] we can probably save the other arrangements too, if we adjust their permutations.
945 // This is not hard to do, but leaving it for a quick follow-up to avoid making the present diff too unwieldy.
946 keys.arranged.retain(|(key2, _, _)| key2 == &key);
947 keys.raw = false;
948
949 // Creating a Plan::Mfp node is now logically unnecessary, but we
950 // should do so anyway when `val` is populated, so that
951 // the `key_val` optimization gets applied.
952 let lir_id = self.allocate_lir_id();
953 if val.is_some() {
954 plan = PlanNode::Mfp {
955 input: Box::new(plan),
956 mfp,
957 input_key_val: Some((key, val)),
958 }
959 .as_plan(lir_id)
960 }
961 } else {
962 let lir_id = self.allocate_lir_id();
963 plan = PlanNode::Mfp {
964 input: Box::new(plan),
965 mfp,
966 input_key_val,
967 }
968 .as_plan(lir_id);
969 keys = AvailableCollections::new_raw();
970 }
971 }
972
973 Ok((plan, keys))
974 }
975
976 /// Lowers a `Reduce` with the given fields and an `mfp_on_top`, which is the MFP that is
977 /// originally on top of the `Reduce`. This MFP, or a part of it, might be fused into the
978 /// `Reduce`, in which case `mfp_on_top` is mutated to be the residual MFP, i.e., what was not
979 /// fused.
980 fn lower_reduce(
981 &mut self,
982 input: &MirRelationExpr,
983 group_key: &Vec<MirScalarExpr>,
984 aggregates: &Vec<AggregateExpr>,
985 monotonic: &bool,
986 expected_group_size: &Option<u64>,
987 mfp_on_top: &mut MapFilterProject,
988 fused_unnest_list: bool,
989 ) -> Result<(Plan, AvailableCollections), String> {
990 let input_arity = input.arity();
991 let (input, keys) = self.lower_mir_expr(input)?;
992 let (input_key, permutation_and_new_arity) =
993 if let Some((input_key, permutation, thinning)) = keys.arbitrary_arrangement() {
994 (
995 Some(input_key.clone()),
996 Some((permutation.clone(), thinning.len() + input_key.len())),
997 )
998 } else {
999 (None, None)
1000 };
1001 let key_val_plan = KeyValPlan::new(
1002 input_arity,
1003 group_key,
1004 aggregates,
1005 permutation_and_new_arity,
1006 );
1007 let reduce_plan = ReducePlan::create_from(
1008 aggregates.clone(),
1009 *monotonic,
1010 *expected_group_size,
1011 fused_unnest_list,
1012 );
1013 // Return the plan, and the keys it produces.
1014 let mfp_after;
1015 let output_arity;
1016 if self.enable_reduce_mfp_fusion {
1017 (mfp_after, *mfp_on_top, output_arity) =
1018 reduce_plan.extract_mfp_after(mfp_on_top.clone(), group_key.len());
1019 } else {
1020 (mfp_after, output_arity) = (
1021 MapFilterProject::new(mfp_on_top.input_arity),
1022 group_key.len() + aggregates.len(),
1023 );
1024 }
1025 soft_assert_eq_or_log!(
1026 mfp_on_top.input_arity,
1027 output_arity,
1028 "Output arity of reduce must match input arity for MFP on top of it"
1029 );
1030 let output_keys = reduce_plan.keys(group_key.len(), output_arity);
1031 let lir_id = self.allocate_lir_id();
1032 Ok((
1033 PlanNode::Reduce {
1034 input_key,
1035 input: Box::new(input),
1036 key_val_plan,
1037 plan: reduce_plan,
1038 mfp_after,
1039 }
1040 .as_plan(lir_id),
1041 output_keys,
1042 ))
1043 }
1044
1045 /// Replace the plan with another one
1046 /// that has the collection in some additional forms.
1047 pub fn arrange_by(
1048 &mut self,
1049 plan: Plan,
1050 collections: AvailableCollections,
1051 old_collections: &AvailableCollections,
1052 arity: usize,
1053 ) -> Plan {
1054 if let Plan {
1055 node:
1056 PlanNode::ArrangeBy {
1057 input_key,
1058 input,
1059 input_mfp,
1060 mut forms,
1061 },
1062 lir_id,
1063 } = plan
1064 {
1065 forms.raw |= collections.raw;
1066 forms.arranged.extend(collections.arranged);
1067 forms.arranged.sort_by(|k1, k2| k1.0.cmp(&k2.0));
1068 forms.arranged.dedup_by(|k1, k2| k1.0 == k2.0);
1069 PlanNode::ArrangeBy {
1070 input_key,
1071 input,
1072 input_mfp,
1073 forms,
1074 }
1075 .as_plan(lir_id)
1076 } else {
1077 let (input_key, input_mfp) = if let Some((input_key, permutation, thinning)) =
1078 old_collections.arbitrary_arrangement()
1079 {
1080 let mut mfp = MapFilterProject::new(arity);
1081 mfp.permute_fn(|c| permutation[c], thinning.len() + input_key.len());
1082 (Some(input_key.clone()), mfp)
1083 } else {
1084 (None, MapFilterProject::new(arity))
1085 };
1086 let lir_id = self.allocate_lir_id();
1087
1088 PlanNode::ArrangeBy {
1089 input_key,
1090 input: Box::new(plan),
1091 input_mfp,
1092 forms: collections,
1093 }
1094 .as_plan(lir_id)
1095 }
1096 }
1097}
1098
1099/// Various bits of state to print along with error messages during LIR planning,
1100/// to aid debugging.
1101#[derive(Clone, Debug)]
1102pub struct LirDebugInfo {
1103 debug_name: String,
1104 id: GlobalId,
1105}
1106
1107impl std::fmt::Display for LirDebugInfo {
1108 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1109 write!(f, "Debug name: {}; id: {}", self.debug_name, self.id)
1110 }
1111}