Skip to main content

mz_compute_types/plan/
lowering.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Lowering [`DataflowDescription`]s from MIR ([`MirRelationExpr`]) to LIR ([`Plan`]).
11
12use std::collections::{BTreeMap, BTreeSet};
13
14use columnar::Len;
15use itertools::Itertools;
16use mz_expr::JoinImplementation::{DeltaQuery, Differential, IndexedFilter, Unimplemented};
17use mz_expr::{
18    AggregateExpr, Id, JoinInputMapper, MapFilterProject, MirRelationExpr, MirScalarExpr,
19    OptimizedMirRelationExpr, TableFunc, permutation_for_arrangement,
20};
21use mz_ore::{assert_none, soft_assert_eq_or_log, soft_panic_or_log};
22use mz_repr::optimize::OptimizerFeatures;
23use mz_repr::{GlobalId, Timestamp};
24
25use crate::dataflows::{BuildDesc, DataflowDescription, IndexImport};
26use crate::plan::join::{DeltaJoinPlan, JoinPlan, LinearJoinPlan};
27use crate::plan::reduce::{KeyValPlan, ReducePlan};
28use crate::plan::threshold::ThresholdPlan;
29use crate::plan::top_k::TopKPlan;
30use crate::plan::{ArrangementStrategy, AvailableCollections, GetPlan, LirId, Plan, PlanNode};
31
32/// Pick an [`ArrangementStrategy`] based on whether the input may contain future-stamped
33/// updates. Future updates are the only case where temporal bucketing pays off.
34fn strategy_from_future(has_future_updates: bool) -> ArrangementStrategy {
35    if has_future_updates {
36        ArrangementStrategy::TemporalBucketing
37    } else {
38        ArrangementStrategy::Direct
39    }
40}
41
42/// The result of lowering a [`MirRelationExpr`] to a [`Plan`].
43struct LoweredExpr {
44    /// The lowered plan.
45    plan: Plan,
46    /// The arrangement keys that the plan is certain to produce.
47    keys: AvailableCollections,
48    /// Whether the plan's output may contain updates at future timestamps,
49    /// e.g., from a temporal MFP using `mz_now()`.
50    has_future_updates: bool,
51}
52
53pub(super) struct Context {
54    /// Known bindings to (possibly arranged) collections.
55    arrangements: BTreeMap<Id, AvailableCollections>,
56    /// Ids whose collections may contain updates at future timestamps,
57    /// e.g., from a temporal MFP using `mz_now()`.
58    has_future_updates: BTreeSet<Id>,
59    /// Tracks the next available `LirId`.
60    next_lir_id: LirId,
61    /// Information to print along with error messages.
62    debug_info: LirDebugInfo,
63    /// Whether to enable fusion of MFPs in reductions.
64    enable_reduce_mfp_fusion: bool,
65}
66
67impl Context {
68    pub fn new(debug_name: String, features: &OptimizerFeatures) -> Self {
69        Self {
70            arrangements: Default::default(),
71            has_future_updates: Default::default(),
72            next_lir_id: LirId(1),
73            debug_info: LirDebugInfo {
74                debug_name,
75                id: GlobalId::Transient(0),
76            },
77            enable_reduce_mfp_fusion: features.enable_reduce_mfp_fusion,
78        }
79    }
80
81    fn allocate_lir_id(&mut self) -> LirId {
82        let id = self.next_lir_id;
83        self.next_lir_id = LirId(
84            self.next_lir_id
85                .0
86                .checked_add(1)
87                .expect("No LirId overflow"),
88        );
89        id
90    }
91
92    pub fn lower(
93        mut self,
94        desc: DataflowDescription<OptimizedMirRelationExpr>,
95    ) -> Result<DataflowDescription<Plan>, String> {
96        // Sources might provide arranged forms of their data, in the future.
97        // Indexes provide arranged forms of their data.
98        for IndexImport {
99            desc: index_desc,
100            typ,
101            ..
102        } in desc.index_imports.values()
103        {
104            let key = index_desc.key.clone();
105            // TODO[btv] - We should be told the permutation by
106            // `index_desc`, and it should have been generated
107            // at the same point the thinning logic was.
108            //
109            // We should for sure do that soon, but it requires
110            // a bit of a refactor, so for now we just
111            // _assume_ that they were both generated by `permutation_for_arrangement`,
112            // and recover it here.
113            let (permutation, thinning) = permutation_for_arrangement(&key, typ.arity());
114            let index_keys = self
115                .arrangements
116                .entry(Id::Global(index_desc.on_id))
117                .or_insert_with(AvailableCollections::default);
118            index_keys.arranged.push((key, permutation, thinning));
119        }
120        for id in desc.source_imports.keys() {
121            self.arrangements
122                .entry(Id::Global(*id))
123                .or_insert_with(AvailableCollections::new_raw);
124        }
125
126        // Build each object in order, registering the arrangements it forms.
127        let mut objects_to_build = Vec::with_capacity(desc.objects_to_build.len());
128        for build in desc.objects_to_build {
129            self.debug_info.id = build.id;
130            let LoweredExpr {
131                plan,
132                keys,
133                has_future_updates,
134            } = self.lower_mir_expr(&build.plan)?;
135
136            self.arrangements.insert(Id::Global(build.id), keys);
137            if has_future_updates {
138                self.has_future_updates.insert(Id::Global(build.id));
139            }
140            objects_to_build.push(BuildDesc { id: build.id, plan });
141        }
142
143        Ok(DataflowDescription {
144            source_imports: desc.source_imports,
145            index_imports: desc.index_imports,
146            objects_to_build,
147            index_exports: desc.index_exports,
148            sink_exports: desc.sink_exports,
149            as_of: desc.as_of,
150            until: desc.until,
151            initial_storage_as_of: desc.initial_storage_as_of,
152            refresh_schedule: desc.refresh_schedule,
153            debug_name: desc.debug_name,
154            time_dependence: desc.time_dependence,
155        })
156    }
157
158    /// This method converts a MirRelationExpr into a plan that can be directly rendered.
159    ///
160    /// The rough structure is that we repeatedly extract map/filter/project operators
161    /// from each expression we see, bundle them up as a `MapFilterProject` object, and
162    /// then produce a plan for the combination of that with the next operator.
163    ///
164    /// The method accesses `self.arrangements`, which it will locally add to and remove from for
165    /// `Let` bindings (by the end of the call it should contain the same bindings as when it
166    /// started).
167    ///
168    /// The result of the method is both a `Plan`, but also a list of arrangements that
169    /// are certain to be produced, which can be relied on by the next steps in the plan.
170    /// Each of the arrangement keys is associated with an MFP that must be applied if that
171    /// arrangement is used, to back out the permutation associated with that arrangement.
172    ///
173    /// An empty list of arrangement keys indicates that only a `Collection` stream can
174    /// be assumed to exist.
175    fn lower_mir_expr(&mut self, expr: &MirRelationExpr) -> Result<LoweredExpr, String> {
176        // This function is recursive and can overflow its stack, so grow it if
177        // needed. The growth here is unbounded. Our general solution for this problem
178        // is to use [`ore::stack::RecursionGuard`] to additionally limit the stack
179        // depth. That however requires upstream error handling. This function is
180        // currently called by the Coordinator after calls to `catalog_transact`,
181        // and thus are not allowed to fail. Until that allows errors, we choose
182        // to allow the unbounded growth here. We are though somewhat protected by
183        // higher levels enforcing their own limits on stack depth (in the parser,
184        // transformer/desugarer, and planner).
185        mz_ore::stack::maybe_grow(|| self.lower_mir_expr_stack_safe(expr))
186    }
187
188    fn lower_mir_expr_stack_safe(&mut self, expr: &MirRelationExpr) -> Result<LoweredExpr, String> {
189        // Extract a maximally large MapFilterProject from `expr`.
190        // We will then try and push this in to the resulting expression.
191        //
192        // Importantly, `mfp` may contain temporal operators and not be a "safe" MFP.
193        // While we would eventually like all plan stages to be able to absorb such
194        // general operators, not all of them can.
195        let (mut mfp, expr) = MapFilterProject::extract_from_expression(expr);
196        // We attempt to plan what we have remaining, in the context of `mfp`.
197        // We may not be able to do this, and must wrap some operators with a `Mfp` stage.
198        let LoweredExpr {
199            mut plan,
200            mut keys,
201            mut has_future_updates,
202        } = match expr {
203            // These operators should have been extracted from the expression.
204            MirRelationExpr::Map { .. } => {
205                panic!("This operator should have been extracted");
206            }
207            MirRelationExpr::Filter { .. } => {
208                panic!("This operator should have been extracted");
209            }
210            MirRelationExpr::Project { .. } => {
211                panic!("This operator should have been extracted");
212            }
213            // These operators may not have been extracted, and need to result in a `Plan`.
214            MirRelationExpr::Constant { rows, typ: _ } => {
215                let lir_id = self.allocate_lir_id();
216                let node = PlanNode::Constant {
217                    rows: rows.clone().map(|rows| {
218                        rows.into_iter()
219                            .map(|(row, diff)| (row, Timestamp::MIN, diff))
220                            .collect()
221                    }),
222                };
223                // The plan, not arranged in any way.
224                LoweredExpr {
225                    plan: node.as_plan(lir_id),
226                    keys: AvailableCollections::new_raw(),
227                    has_future_updates: false,
228                }
229            }
230            MirRelationExpr::Get { id, typ: _, .. } => {
231                // This stage can absorb arbitrary MFP operators.
232                let mut mfp = mfp.take();
233                // If `mfp` is the identity, we can surface all imported arrangements.
234                // Otherwise, we apply `mfp` and promise no arrangements.
235                let mut in_keys = self
236                    .arrangements
237                    .get(id)
238                    .cloned()
239                    .unwrap_or_else(AvailableCollections::new_raw);
240
241                // Seek out an arrangement key that might be constrained to a literal.
242                // Note: this code has very little use nowadays, as its job was mostly taken over
243                // by `LiteralConstraints` (see in the below longer comment).
244                let key_val = in_keys
245                    .arranged
246                    .iter()
247                    .filter_map(|key| {
248                        mfp.literal_constraints(&key.0)
249                            .map(|val| (key.clone(), val))
250                    })
251                    .max_by_key(|(key, _val)| key.0.len());
252
253                // Determine the plan of action for the `Get` stage.
254                let plan = if let Some(((key, permutation, thinning), val)) = &key_val {
255                    // This code path used to handle looking up literals from indexes, but it's
256                    // mostly deprecated, as this is nowadays performed by the `LiteralConstraints`
257                    // MIR transform instead. However, it's still called in a couple of tricky
258                    // special cases:
259                    // - `LiteralConstraints` handles only Gets of global ids, so this code still
260                    //   gets to handle Filters on top of Gets of local ids.
261                    // - Lowering does a `MapFilterProject::extract_from_expression`, while
262                    //   `LiteralConstraints` does
263                    //   `MapFilterProject::extract_non_errors_from_expr_mut`.
264                    // - It might happen that new literal constraint optimization opportunities
265                    //   appear somewhere near the end of the MIR optimizer after
266                    //   `LiteralConstraints` has already run.
267                    // (Also note that a similar literal constraint handling machinery is also
268                    // present when handling the leftover MFP after this big match.)
269                    mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
270                    in_keys.arranged = vec![(key.clone(), permutation.clone(), thinning.clone())];
271                    GetPlan::Arrangement(key.clone(), Some(val.clone()), mfp)
272                } else if !mfp.is_identity() {
273                    // We need to ensure a collection exists, which means we must form it.
274                    if let Some((key, permutation, thinning)) =
275                        in_keys.arbitrary_arrangement().cloned()
276                    {
277                        mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
278                        in_keys.arranged = vec![(key.clone(), permutation, thinning)];
279                        GetPlan::Arrangement(key, None, mfp)
280                    } else {
281                        GetPlan::Collection(mfp)
282                    }
283                } else {
284                    // By default, just pass input arrangements through.
285                    GetPlan::PassArrangements
286                };
287
288                let out_keys = if let GetPlan::PassArrangements = plan {
289                    in_keys.clone()
290                } else {
291                    AvailableCollections::new_raw()
292                };
293
294                // Even with a non-temporal MFP, we must propagate `has_future_updates`
295                // from the underlying binding — applying an MFP doesn't drop future-
296                // timestamped updates that already exist on the input.
297                let has_future_updates = self.has_future_updates.contains(id)
298                    || match &plan {
299                        GetPlan::Arrangement(_, _, mfp) | GetPlan::Collection(mfp) => {
300                            mfp.has_temporal_predicates()
301                        }
302                        GetPlan::PassArrangements => false,
303                    };
304
305                let lir_id = self.allocate_lir_id();
306                let node = PlanNode::Get {
307                    id: id.clone(),
308                    keys: in_keys,
309                    plan,
310                };
311                // Return the plan, and any keys if an identity `mfp`.
312                LoweredExpr {
313                    plan: node.as_plan(lir_id),
314                    keys: out_keys,
315                    has_future_updates,
316                }
317            }
318            MirRelationExpr::Let { id, value, body } => {
319                // It would be unfortunate to have a non-trivial `mfp` here, as we hope
320                // that they would be pushed down. I am not sure if we should take the
321                // initiative to push down the `mfp` ourselves.
322
323                // Plan the value using only the initial arrangements, but
324                // introduce any resulting arrangements bound to `id`.
325                let LoweredExpr {
326                    plan: value,
327                    keys: v_keys,
328                    has_future_updates: v_future,
329                } = self.lower_mir_expr(value)?;
330                let pre_existing = self.arrangements.insert(Id::Local(*id), v_keys);
331                assert_none!(pre_existing);
332                if v_future {
333                    self.has_future_updates.insert(Id::Local(*id));
334                }
335                // Plan the body using initial and `value` arrangements,
336                // and then remove reference to the value arrangements.
337                let LoweredExpr {
338                    plan: body,
339                    keys: b_keys,
340                    has_future_updates: b_future,
341                } = self.lower_mir_expr(body)?;
342                self.arrangements.remove(&Id::Local(*id));
343                self.has_future_updates.remove(&Id::Local(*id));
344                // Return the plan, and any `body` arrangements.
345                let lir_id = self.allocate_lir_id();
346                LoweredExpr {
347                    plan: PlanNode::Let {
348                        id: id.clone(),
349                        value: Box::new(value),
350                        body: Box::new(body),
351                    }
352                    .as_plan(lir_id),
353                    keys: b_keys,
354                    has_future_updates: b_future,
355                }
356            }
357            MirRelationExpr::LetRec {
358                ids,
359                values,
360                limits,
361                body,
362            } => {
363                assert_eq!(ids.len(), values.len());
364                assert_eq!(ids.len(), limits.len());
365                // Plan the values using only the available arrangements, but
366                // introduce any resulting arrangements bound to each `id`.
367                // Arrangements made available cannot be used by prior bindings,
368                // as we cannot circulate an arrangement through a `Variable` yet.
369                let mut lir_values = Vec::with_capacity(values.len());
370                let mut any_v_future = false;
371                for (id, value) in ids.iter().zip_eq(values) {
372                    let LoweredExpr {
373                        plan: mut lir_value,
374                        keys: mut v_keys,
375                        has_future_updates: v_future,
376                    } = self.lower_mir_expr(value)?;
377                    any_v_future |= v_future;
378                    // If `v_keys` does not contain an unarranged collection, we must form it.
379                    if !v_keys.raw {
380                        // Choose an "arbitrary" arrangement; TODO: prefer a specific one.
381                        let (input_key, permutation, thinning) =
382                            v_keys.arbitrary_arrangement().unwrap();
383                        let mut input_mfp = MapFilterProject::new(value.arity());
384                        input_mfp.permute_fn(|c| permutation[c], thinning.len() + input_key.len());
385                        let input_key = Some(input_key.clone());
386
387                        let forms = AvailableCollections::new_raw();
388
389                        // We just want to insert an `ArrangeBy` to form an unarranged collection,
390                        // but there is a complication: We shouldn't break the invariant (created by
391                        // `NormalizeLets`, and relied upon by the rendering) that there isn't
392                        // anything between two `LetRec`s. So if `lir_value` is itself a `LetRec`,
393                        // then we insert the `ArrangeBy` on the `body` of the inner `LetRec`,
394                        // instead of on top of the inner `LetRec`.
395                        //
396                        // We forward `v_future` for honesty; bucketing has no observable effect
397                        // inside an iterative scope, but the field should reflect reality.
398                        lir_value = match lir_value {
399                            Plan {
400                                node:
401                                    PlanNode::LetRec {
402                                        ids,
403                                        values,
404                                        limits,
405                                        body,
406                                    },
407                                lir_id,
408                            } => {
409                                let inner_lir_id = self.allocate_lir_id();
410                                PlanNode::LetRec {
411                                    ids,
412                                    values,
413                                    limits,
414                                    body: Box::new(
415                                        PlanNode::ArrangeBy {
416                                            input_key,
417                                            input: body,
418                                            input_mfp,
419                                            forms,
420                                            strategy: strategy_from_future(v_future),
421                                        }
422                                        .as_plan(inner_lir_id),
423                                    ),
424                                }
425                                .as_plan(lir_id)
426                            }
427                            lir_value => {
428                                let lir_id = self.allocate_lir_id();
429                                PlanNode::ArrangeBy {
430                                    input_key,
431                                    input: Box::new(lir_value),
432                                    input_mfp,
433                                    forms,
434                                    strategy: strategy_from_future(v_future),
435                                }
436                                .as_plan(lir_id)
437                            }
438                        };
439                        v_keys.raw = true;
440                    }
441                    let pre_existing = self.arrangements.insert(Id::Local(*id), v_keys);
442                    assert_none!(pre_existing);
443                    if v_future {
444                        self.has_future_updates.insert(Id::Local(*id));
445                    }
446                    lir_values.push(lir_value);
447                }
448                // As we exit the iterative scope, we must leave all arrangements behind,
449                // as they reference a timestamp coordinate that must be stripped off.
450                for id in ids.iter() {
451                    self.arrangements
452                        .insert(Id::Local(*id), AvailableCollections::new_raw());
453                }
454                // Plan the body using initial and `value` arrangements,
455                // and then remove reference to the value arrangements.
456                let LoweredExpr {
457                    plan: body,
458                    keys: b_keys,
459                    has_future_updates: b_future,
460                } = self.lower_mir_expr(body)?;
461                for id in ids.iter() {
462                    self.arrangements.remove(&Id::Local(*id));
463                    self.has_future_updates.remove(&Id::Local(*id));
464                }
465                // Return the plan, and any `body` arrangements.
466                //
467                // The body's `b_future` alone can under-report: an earlier binding may only
468                // inherit `has_future_updates` via a Variable to a *later* binding, which the
469                // sequential sweep can't observe at the time the earlier binding is lowered.
470                // A precise fix would require a fixpoint (or the MIR `Analysis` framework with
471                // a `true ⊑ false` lattice). As a cheap correct alternative, OR with the
472                // bindings' future flags: any cross-binding propagation must originate from a
473                // local temporal predicate inside *some* binding, so the OR captures it
474                // without forcing bucketing on a fully non-temporal LetRec.
475                let lir_id = self.allocate_lir_id();
476                LoweredExpr {
477                    plan: PlanNode::LetRec {
478                        ids: ids.clone(),
479                        values: lir_values,
480                        limits: limits.clone(),
481                        body: Box::new(body),
482                    }
483                    .as_plan(lir_id),
484                    keys: b_keys,
485                    has_future_updates: b_future || any_v_future,
486                }
487            }
488            MirRelationExpr::FlatMap {
489                input: flat_map_input,
490                func,
491                exprs,
492            } => {
493                // A `FlatMap UnnestList` that comes after the `Reduce` of a window function can be
494                // fused into the lowered `Reduce`.
495                //
496                // In theory, we could have implemented this also as an MIR transform. However, this
497                // is more of a physical optimization, which are sometimes unpleasant to make a part
498                // of the MIR pipeline. The specific problem here with putting this into the MIR
499                // pipeline would be that we'd need to modify MIR's semantics: MIR's Reduce
500                // currently always emits exactly 1 row per group, but the fused Reduce-FlatMap can
501                // emit multiple rows per group. Such semantic changes of MIR are very scary, since
502                // various parts of the optimizer assume that Reduce emits only 1 row per group, and
503                // it would be very hard to hunt down all these parts. (For example, key inference
504                // infers the group key as a unique key.)
505                let fused_with_reduce = 'fusion: {
506                    if !matches!(func, TableFunc::UnnestList { .. }) {
507                        break 'fusion None;
508                    }
509                    // We might have a Project of a single col between the FlatMap and the
510                    // Reduce. (It projects away the grouping keys of the Reduce, and keeps the
511                    // result of the window function.)
512                    let (maybe_reduce, num_grouping_keys) = if let MirRelationExpr::Project {
513                        input: project_input,
514                        outputs: projection,
515                    } = &**flat_map_input
516                    {
517                        // We want this to be a single column, because we'll want to deal with only
518                        // one aggregation in the `Reduce`. (The aggregation of a window function
519                        // always stands alone currently: we plan them separately from other
520                        // aggregations, and Reduces are never fused. When window functions are
521                        // fused with each other, they end up in one aggregation. When there are
522                        // multiple window functions in the same SELECT, but can't be fused, they
523                        // end up in different Reduces.)
524                        if let &[single_col] = &**projection {
525                            (project_input, single_col)
526                        } else {
527                            break 'fusion None;
528                        }
529                    } else {
530                        (flat_map_input, 0)
531                    };
532                    if let MirRelationExpr::Reduce {
533                        input,
534                        group_key,
535                        aggregates,
536                        monotonic,
537                        expected_group_size,
538                    } = &**maybe_reduce
539                    {
540                        if group_key.len() != num_grouping_keys
541                            || aggregates.len() != 1
542                            || !aggregates[0].func.can_fuse_with_unnest_list()
543                        {
544                            break 'fusion None;
545                        }
546                        // At the beginning, `non_fused_mfp_above_flat_map` will be the original MFP
547                        // above the FlatMap. Later, we'll mutate this to be the residual MFP that
548                        // didn't get fused into the `Reduce`.
549                        let non_fused_mfp_above_flat_map = &mut mfp;
550                        let reduce_output_arity = num_grouping_keys + 1;
551                        // We are fusing away the list that the FlatMap would have been unnesting,
552                        // so the column that had that list disappears, so we have to permute the
553                        // MFP above the FlatMap with this column disappearance.
554                        let tweaked_mfp = {
555                            let mut mfp = non_fused_mfp_above_flat_map.clone();
556                            if mfp.demand().contains(&0) {
557                                // I don't think this can happen currently that this MFP would
558                                // refer to the list column, because both the list column and the
559                                // MFP were constructed by the HIR-to-MIR lowering, so it's not just
560                                // some random MFP that we are seeing here. But anyhow, it's better
561                                // to check this here for robustness against future code changes.
562                                break 'fusion None;
563                            }
564                            let permutation: BTreeMap<_, _> =
565                                (1..mfp.input_arity).map(|col| (col, col - 1)).collect();
566                            mfp.permute_fn(|c| permutation[&c], mfp.input_arity - 1);
567                            mfp
568                        };
569                        // We now put together the project that was before the FlatMap, and the
570                        // tweaked version of the MFP that was after the FlatMap.
571                        // (Part of this MFP might be fused into the Reduce.)
572                        let mut project_and_tweaked_mfp = {
573                            let mut mfp = MapFilterProject::new(reduce_output_arity);
574                            mfp = mfp.project(vec![num_grouping_keys]);
575                            mfp = MapFilterProject::compose(mfp, tweaked_mfp);
576                            mfp
577                        };
578                        let fused = self.lower_reduce(
579                            input,
580                            group_key,
581                            aggregates,
582                            monotonic,
583                            expected_group_size,
584                            &mut project_and_tweaked_mfp,
585                            true,
586                        )?;
587                        // Update the residual MFP.
588                        *non_fused_mfp_above_flat_map = project_and_tweaked_mfp;
589                        Some(fused)
590                    } else {
591                        break 'fusion None;
592                    }
593                };
594                if let Some(fused_with_reduce) = fused_with_reduce {
595                    fused_with_reduce
596                } else {
597                    // Couldn't fuse it with a `Reduce`, so lower as a normal `FlatMap`.
598                    let LoweredExpr {
599                        plan: input,
600                        keys,
601                        has_future_updates: input_future,
602                    } = self.lower_mir_expr(flat_map_input)?;
603                    // This stage can absorb arbitrary MFP instances.
604                    let mfp = mfp.take();
605                    let mut exprs = exprs.clone();
606                    let input_key = if let Some((k, permutation, _)) = keys.arbitrary_arrangement()
607                    {
608                        // We don't permute the MFP here, because it runs _after_ the table function,
609                        // whose output is in a fixed order.
610                        //
611                        // We _do_, however, need to permute the `expr`s that provide input to the
612                        // `func`.
613                        let permutation = permutation.iter().cloned().enumerate().collect();
614                        for expr in &mut exprs {
615                            expr.permute_map(&permutation);
616                        }
617
618                        Some(k.clone())
619                    } else {
620                        None
621                    };
622
623                    let lir_id = self.allocate_lir_id();
624                    // The absorbed `mfp` may contain temporal predicates, which can
625                    // introduce future-stamped updates that aren't present on the input.
626                    let has_future_updates = input_future || mfp.has_temporal_predicates();
627                    // Return the plan, and no arrangements.
628                    LoweredExpr {
629                        plan: PlanNode::FlatMap {
630                            input_key,
631                            input: Box::new(input),
632                            exprs: exprs.clone(),
633                            func: func.clone(),
634                            mfp_after: mfp,
635                        }
636                        .as_plan(lir_id),
637                        keys: AvailableCollections::new_raw(),
638                        has_future_updates,
639                    }
640                }
641            }
642            MirRelationExpr::Join {
643                inputs,
644                equivalences,
645                implementation,
646            } => {
647                // Plan each of the join inputs independently.
648                // The `plans` get surfaced upwards, and the `input_keys` should
649                // be used as part of join planning / to validate the existing
650                // plans / to aid in indexed seeding of update streams.
651                let mut plans = Vec::new();
652                let mut input_keys = Vec::new();
653                let mut input_arities = Vec::new();
654                let mut input_futures = Vec::new();
655                for input in inputs.iter() {
656                    let LoweredExpr {
657                        plan,
658                        keys,
659                        has_future_updates: input_future,
660                    } = self.lower_mir_expr(input)?;
661                    input_arities.push(input.arity());
662                    plans.push(plan);
663                    input_keys.push(keys);
664                    input_futures.push(input_future);
665                }
666                let any_input_future = input_futures.iter().any(|&f| f);
667
668                let input_mapper =
669                    JoinInputMapper::new_from_input_arities(input_arities.iter().copied());
670
671                // Extract temporal predicates as joins cannot currently absorb them.
672                let (plan, missing) = match implementation {
673                    IndexedFilter(_coll_id, _idx_id, key, _val) => {
674                        // Start with the constant input. (This used to be important before database-issues#4016
675                        // was fixed.)
676                        let start: usize = 1;
677                        let order = vec![(0usize, key.clone(), None)];
678                        // All columns of the constant input will be part of the arrangement key.
679                        let source_arrangement = (
680                            (0..key.len())
681                                .map(MirScalarExpr::column)
682                                .collect::<Vec<_>>(),
683                            (0..key.len()).collect::<Vec<_>>(),
684                            Vec::<usize>::new(),
685                        );
686                        let (ljp, missing) = LinearJoinPlan::create_from(
687                            start,
688                            Some(&source_arrangement),
689                            equivalences,
690                            &order,
691                            input_mapper,
692                            &mut mfp,
693                            &input_keys,
694                        );
695                        (JoinPlan::Linear(ljp), missing)
696                    }
697                    Differential((start, start_arr, _start_characteristic), order) => {
698                        let source_arrangement = start_arr.as_ref().and_then(|key| {
699                            input_keys[*start]
700                                .arranged
701                                .iter()
702                                .find(|(k, _, _)| k == key)
703                                .clone()
704                        });
705                        let (ljp, missing) = LinearJoinPlan::create_from(
706                            *start,
707                            source_arrangement,
708                            equivalences,
709                            order,
710                            input_mapper,
711                            &mut mfp,
712                            &input_keys,
713                        );
714                        (JoinPlan::Linear(ljp), missing)
715                    }
716                    DeltaQuery(orders) => {
717                        let (djp, missing) = DeltaJoinPlan::create_from(
718                            equivalences,
719                            orders,
720                            input_mapper,
721                            &mut mfp,
722                            &input_keys,
723                        );
724                        (JoinPlan::Delta(djp), missing)
725                    }
726                    // Other plans are errors, and should be reported as such.
727                    Unimplemented => return Err("unimplemented join".to_string()),
728                };
729                // The renderer will expect certain arrangements to exist; if any of those are not available, the join planning functions above should have returned them in
730                // `missing`. We thus need to plan them here so they'll exist.
731                let is_delta = matches!(plan, JoinPlan::Delta(_));
732                for ((((input_plan, input_keys), missing), arity), input_future) in plans
733                    .iter_mut()
734                    .zip_eq(input_keys.iter())
735                    .zip_eq(missing)
736                    .zip_eq(input_arities.iter().cloned())
737                    .zip_eq(input_futures.iter().copied())
738                {
739                    if missing != Default::default() {
740                        if is_delta {
741                            // join_implementation.rs produced a sub-optimal plan here;
742                            // we shouldn't plan delta joins at all if not all of the required
743                            // arrangements are available. Soft panic in CI and log an error in
744                            // production to increase the chances that we will catch all situations
745                            // that violate this constraint.
746                            soft_panic_or_log!("Arrangements depended on by delta join alarmingly absent: {:?}
747Dataflow info: {}
748This is not expected to cause incorrect results, but could indicate a performance issue in Materialize.", missing, self.debug_info);
749                        } else {
750                            soft_panic_or_log!("Arrangements depended on by a non-delta join are absent: {:?}
751Dataflow info: {}
752This is not expected to cause incorrect results, but could indicate a performance issue in Materialize.", missing, self.debug_info);
753                            // Nowadays MIR transforms take care to insert MIR ArrangeBys for each
754                            // Join input. (Earlier, they were missing in the following cases:
755                            //  - They were const-folded away for constant inputs. This is not
756                            //    happening since
757                            //    https://github.com/MaterializeInc/materialize/pull/16351
758                            //  - They were not being inserted for the constant input of
759                            //    `IndexedFilter`s. This was fixed in
760                            //    https://github.com/MaterializeInc/materialize/pull/20920
761                            //  - They were not being inserted for the first input of Differential
762                            //    joins. This was fixed in
763                            //    https://github.com/MaterializeInc/materialize/pull/16099)
764                        }
765                        let lir_id = self.allocate_lir_id();
766                        let raw_plan = std::mem::replace(
767                            input_plan,
768                            PlanNode::Constant {
769                                rows: Ok(Vec::new()),
770                            }
771                            .as_plan(lir_id),
772                        );
773                        *input_plan =
774                            self.arrange_by(raw_plan, missing, input_keys, arity, input_future);
775                    }
776                }
777                // Return the plan, and no arrangements.
778                // Both linear and delta join planning extract temporal predicates back into the
779                // residual `mfp` (see `LinearJoinPlan::create_from` / `DeltaJoinPlan::create_from`),
780                // so the absorbed MFP cannot introduce future updates — the join's output future
781                // flag is just the OR of its inputs.
782                let lir_id = self.allocate_lir_id();
783                LoweredExpr {
784                    plan: PlanNode::Join {
785                        inputs: plans,
786                        plan,
787                    }
788                    .as_plan(lir_id),
789                    keys: AvailableCollections::new_raw(),
790                    has_future_updates: any_input_future,
791                }
792            }
793            MirRelationExpr::Reduce {
794                input,
795                group_key,
796                aggregates,
797                monotonic,
798                expected_group_size,
799            } => {
800                if aggregates
801                    .iter()
802                    .any(|agg| agg.func.can_fuse_with_unnest_list())
803                {
804                    // This case should have been handled at the `MirRelationExpr::FlatMap` case
805                    // above. But that has a pretty complicated pattern matching, so it's not
806                    // unthinkable that it fails.
807                    soft_panic_or_log!(
808                        "Window function performance issue: `reduce_unnest_list_fusion` failed"
809                    );
810                }
811                self.lower_reduce(
812                    input,
813                    group_key,
814                    aggregates,
815                    monotonic,
816                    expected_group_size,
817                    &mut mfp,
818                    false,
819                )?
820            }
821            MirRelationExpr::TopK {
822                input,
823                group_key,
824                order_key,
825                limit,
826                offset,
827                monotonic,
828                expected_group_size,
829            } => {
830                let arity = input.arity();
831                let LoweredExpr {
832                    plan: input,
833                    keys,
834                    has_future_updates: input_future,
835                } = self.lower_mir_expr(input)?;
836
837                let top_k_plan = TopKPlan::create_from(
838                    group_key.clone(),
839                    order_key.clone(),
840                    *offset,
841                    limit.clone(),
842                    arity,
843                    *monotonic,
844                    *expected_group_size,
845                );
846
847                // We don't have an MFP here -- install an operator to permute the
848                // input, if necessary.
849                let input = if !keys.raw {
850                    self.arrange_by(
851                        input,
852                        AvailableCollections::new_raw(),
853                        &keys,
854                        arity,
855                        input_future,
856                    )
857                } else {
858                    input
859                };
860                // Return the plan, and no arrangements.
861                let lir_id = self.allocate_lir_id();
862                LoweredExpr {
863                    plan: PlanNode::TopK {
864                        input: Box::new(input),
865                        top_k_plan,
866                    }
867                    .as_plan(lir_id),
868                    keys: AvailableCollections::new_raw(),
869                    has_future_updates: input_future,
870                }
871            }
872            MirRelationExpr::Negate { input } => {
873                let arity = input.arity();
874                let LoweredExpr {
875                    plan: input,
876                    keys,
877                    has_future_updates: input_future,
878                } = self.lower_mir_expr(input)?;
879
880                // We don't have an MFP here -- install an operator to permute the
881                // input, if necessary.
882                let input = if !keys.raw {
883                    self.arrange_by(
884                        input,
885                        AvailableCollections::new_raw(),
886                        &keys,
887                        arity,
888                        input_future,
889                    )
890                } else {
891                    input
892                };
893                // Return the plan, and no arrangements.
894                let lir_id = self.allocate_lir_id();
895                LoweredExpr {
896                    plan: PlanNode::Negate {
897                        input: Box::new(input),
898                    }
899                    .as_plan(lir_id),
900                    keys: AvailableCollections::new_raw(),
901                    has_future_updates: input_future,
902                }
903            }
904            MirRelationExpr::Threshold { input } => {
905                let LoweredExpr {
906                    plan,
907                    keys,
908                    has_future_updates: input_future,
909                } = self.lower_mir_expr(input)?;
910                let arity = input.arity();
911                let (threshold_plan, required_arrangement) = ThresholdPlan::create_from(arity);
912                let plan = if !keys
913                    .arranged
914                    .iter()
915                    .any(|(key, _, _)| key == &required_arrangement.0)
916                {
917                    self.arrange_by(
918                        plan,
919                        AvailableCollections::new_arranged(vec![required_arrangement]),
920                        &keys,
921                        arity,
922                        input_future,
923                    )
924                } else {
925                    plan
926                };
927
928                let output_keys = threshold_plan.keys();
929                // Return the plan, and any produced keys.
930                let lir_id = self.allocate_lir_id();
931                LoweredExpr {
932                    plan: PlanNode::Threshold {
933                        input: Box::new(plan),
934                        threshold_plan,
935                    }
936                    .as_plan(lir_id),
937                    keys: output_keys,
938                    has_future_updates: input_future,
939                }
940            }
941            MirRelationExpr::Union { base, inputs } => {
942                let arity = base.arity();
943                let mut lowered_inputs = Vec::with_capacity(1 + inputs.len());
944                lowered_inputs.push(self.lower_mir_expr(base)?);
945                for input in inputs.iter() {
946                    lowered_inputs.push(self.lower_mir_expr(input)?);
947                }
948                let any_future = lowered_inputs.iter().any(|l| l.has_future_updates);
949                let plans = lowered_inputs
950                    .into_iter()
951                    .map(
952                        |LoweredExpr {
953                             plan,
954                             keys,
955                             has_future_updates: future,
956                         }| {
957                            // We don't have an MFP here -- install an operator to permute the
958                            // input, if necessary.
959                            if !keys.raw {
960                                self.arrange_by(
961                                    plan,
962                                    AvailableCollections::new_raw(),
963                                    &keys,
964                                    arity,
965                                    future,
966                                )
967                            } else {
968                                plan
969                            }
970                        },
971                    )
972                    .collect();
973                // Return the plan and no arrangements.
974                let lir_id = self.allocate_lir_id();
975                LoweredExpr {
976                    plan: PlanNode::Union {
977                        inputs: plans,
978                        consolidate_output: false,
979                    }
980                    .as_plan(lir_id),
981                    keys: AvailableCollections::new_raw(),
982                    has_future_updates: any_future,
983                }
984            }
985            MirRelationExpr::ArrangeBy { input, keys } => {
986                let input_mir = input;
987                let LoweredExpr {
988                    plan: input,
989                    keys: mut input_keys,
990                    has_future_updates: future,
991                } = self.lower_mir_expr(input)?;
992                // Fill the `types` in `input_keys` if not already present.
993                let arity = input_mir.arity();
994
995                // Determine keys that are not present in `input_keys`.
996                let new_keys = keys
997                    .iter()
998                    .filter(|k1| !input_keys.arranged.iter().any(|(k2, _, _)| k1 == &k2))
999                    .cloned()
1000                    .collect::<Vec<_>>();
1001                if new_keys.is_empty() {
1002                    LoweredExpr {
1003                        plan: input,
1004                        keys: input_keys,
1005                        has_future_updates: future,
1006                    }
1007                } else {
1008                    let mut new_keys = new_keys
1009                        .iter()
1010                        .cloned()
1011                        .map(|k| {
1012                            let (permutation, thinning) = permutation_for_arrangement(&k, arity);
1013                            (k, permutation, thinning)
1014                        })
1015                        .collect::<Vec<_>>();
1016                    let forms = AvailableCollections {
1017                        raw: input_keys.raw,
1018                        arranged: new_keys.clone(),
1019                    };
1020                    let (input_key, input_mfp) = if let Some((input_key, permutation, thinning)) =
1021                        input_keys.arbitrary_arrangement()
1022                    {
1023                        let mut mfp = MapFilterProject::new(arity);
1024                        mfp.permute_fn(|c| permutation[c], thinning.len() + input_key.len());
1025                        (Some(input_key.clone()), mfp)
1026                    } else {
1027                        (None, MapFilterProject::new(arity))
1028                    };
1029                    input_keys.arranged.append(&mut new_keys);
1030                    input_keys.arranged.sort_by(|k1, k2| k1.0.cmp(&k2.0));
1031
1032                    // Return the plan and extended keys.
1033                    let lir_id = self.allocate_lir_id();
1034                    LoweredExpr {
1035                        plan: PlanNode::ArrangeBy {
1036                            input_key,
1037                            input: Box::new(input),
1038                            input_mfp,
1039                            forms,
1040                            strategy: strategy_from_future(future),
1041                        }
1042                        .as_plan(lir_id),
1043                        keys: input_keys,
1044                        has_future_updates: future,
1045                    }
1046                }
1047            }
1048        };
1049
1050        // If the plan stage did not absorb all linear operators, introduce a new stage to implement them.
1051        if !mfp.is_identity() {
1052            // Check if this MFP introduces future updates.
1053            let mfp_is_temporal = mfp.has_temporal_predicates();
1054            has_future_updates = has_future_updates || mfp_is_temporal;
1055            // Seek out an arrangement key that might be constrained to a literal.
1056            // TODO: Improve key selection heuristic.
1057            let key_val = keys
1058                .arranged
1059                .iter()
1060                .filter_map(|(key, permutation, thinning)| {
1061                    let mut mfp = mfp.clone();
1062                    mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
1063                    mfp.literal_constraints(key)
1064                        .map(|val| (key.clone(), permutation, thinning, val))
1065                })
1066                .max_by_key(|(key, _, _, _)| key.len());
1067
1068            // Input key selection strategy:
1069            // (1) If we can read a key at a particular value, do so
1070            // (2) Otherwise, if there is a key that causes the MFP to be the identity, and
1071            // therefore allows us to avoid discarding the arrangement, use that.
1072            // (3) Otherwise, if there is _some_ key, use that,
1073            // (4) Otherwise just read the raw collection.
1074            let input_key_val = if let Some((key, permutation, thinning, val)) = key_val {
1075                mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
1076
1077                Some((key, Some(val)))
1078            } else if let Some((key, permutation, thinning)) =
1079                keys.arranged.iter().find(|(key, permutation, thinning)| {
1080                    let mut mfp = mfp.clone();
1081                    mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
1082                    mfp.is_identity()
1083                })
1084            {
1085                mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
1086                Some((key.clone(), None))
1087            } else if let Some((key, permutation, thinning)) = keys.arbitrary_arrangement() {
1088                mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
1089                Some((key.clone(), None))
1090            } else {
1091                None
1092            };
1093
1094            if mfp.is_identity() {
1095                // We have discovered a key
1096                // whose permutation causes the MFP to actually
1097                // be the identity! We can keep it around,
1098                // but without its permutation this time,
1099                // and with a trivial thinning of the right length.
1100                let (key, val) = input_key_val.unwrap();
1101                let (_old_key, old_permutation, old_thinning) = keys
1102                    .arranged
1103                    .iter_mut()
1104                    .find(|(key2, _, _)| key2 == &key)
1105                    .unwrap();
1106                *old_permutation = (0..mfp.input_arity).collect();
1107                let old_thinned_arity = old_thinning.len();
1108                *old_thinning = (0..old_thinned_arity).collect();
1109                // Get rid of all other forms, as this is now the only one known to be valid.
1110                // TODO[btv] we can probably save the other arrangements too, if we adjust their permutations.
1111                // This is not hard to do, but leaving it for a quick follow-up to avoid making the present diff too unwieldy.
1112                keys.arranged.retain(|(key2, _, _)| key2 == &key);
1113                keys.raw = false;
1114
1115                // Creating a Plan::Mfp node is now logically unnecessary, but we
1116                // should do so anyway when `val` is populated, so that
1117                // the `key_val` optimization gets applied.
1118                let lir_id = self.allocate_lir_id();
1119                if val.is_some() {
1120                    plan = PlanNode::Mfp {
1121                        input: Box::new(plan),
1122                        mfp,
1123                        input_key_val: Some((key, val)),
1124                    }
1125                    .as_plan(lir_id)
1126                }
1127            } else {
1128                let lir_id = self.allocate_lir_id();
1129                plan = PlanNode::Mfp {
1130                    input: Box::new(plan),
1131                    mfp,
1132                    input_key_val,
1133                }
1134                .as_plan(lir_id);
1135                keys = AvailableCollections::new_raw();
1136            }
1137        }
1138
1139        Ok(LoweredExpr {
1140            plan,
1141            keys,
1142            has_future_updates,
1143        })
1144    }
1145
1146    /// Lowers a `Reduce` with the given fields and an `mfp_on_top`, which is the MFP that is
1147    /// originally on top of the `Reduce`. This MFP, or a part of it, might be fused into the
1148    /// `Reduce`, in which case `mfp_on_top` is mutated to be the residual MFP, i.e., what was not
1149    /// fused.
1150    fn lower_reduce(
1151        &mut self,
1152        input: &MirRelationExpr,
1153        group_key: &Vec<MirScalarExpr>,
1154        aggregates: &Vec<AggregateExpr>,
1155        monotonic: &bool,
1156        expected_group_size: &Option<u64>,
1157        mfp_on_top: &mut MapFilterProject,
1158        fused_unnest_list: bool,
1159    ) -> Result<LoweredExpr, String> {
1160        let input_arity = input.arity();
1161        let LoweredExpr {
1162            plan: input,
1163            keys,
1164            has_future_updates: input_future,
1165        } = self.lower_mir_expr(input)?;
1166        let (input_key, permutation_and_new_arity) =
1167            if let Some((input_key, permutation, thinning)) = keys.arbitrary_arrangement() {
1168                (
1169                    Some(input_key.clone()),
1170                    Some((permutation.clone(), thinning.len() + input_key.len())),
1171                )
1172            } else {
1173                (None, None)
1174            };
1175        let key_val_plan = KeyValPlan::new(
1176            input_arity,
1177            group_key,
1178            aggregates,
1179            permutation_and_new_arity,
1180        );
1181        let reduce_plan = ReducePlan::create_from(
1182            aggregates.clone(),
1183            *monotonic,
1184            *expected_group_size,
1185            fused_unnest_list,
1186        );
1187        // Return the plan, and the keys it produces.
1188        let mfp_after;
1189        let output_arity;
1190        if self.enable_reduce_mfp_fusion {
1191            (mfp_after, *mfp_on_top, output_arity) =
1192                reduce_plan.extract_mfp_after(mfp_on_top.clone(), group_key.len());
1193        } else {
1194            (mfp_after, output_arity) = (
1195                MapFilterProject::new(mfp_on_top.input_arity),
1196                group_key.len() + aggregates.len(),
1197            );
1198        }
1199        soft_assert_eq_or_log!(
1200            mfp_on_top.input_arity,
1201            output_arity,
1202            "Output arity of reduce must match input arity for MFP on top of it"
1203        );
1204        let output_keys = reduce_plan.keys(group_key.len(), output_arity);
1205        let lir_id = self.allocate_lir_id();
1206        // `extract_mfp_after` strips temporal predicates back into `*mfp_on_top` (the residual
1207        // MFP installed above the reduce), so `mfp_after` is non-temporal and cannot introduce
1208        // future updates. The output's future flag is just whatever the input had.
1209        Ok(LoweredExpr {
1210            plan: PlanNode::Reduce {
1211                input_key,
1212                input: Box::new(input),
1213                key_val_plan,
1214                plan: reduce_plan,
1215                mfp_after,
1216            }
1217            .as_plan(lir_id),
1218            keys: output_keys,
1219            has_future_updates: input_future,
1220        })
1221    }
1222
1223    /// Replace the plan with another one
1224    /// that has the collection in some additional forms.
1225    pub fn arrange_by(
1226        &mut self,
1227        plan: Plan,
1228        collections: AvailableCollections,
1229        old_collections: &AvailableCollections,
1230        arity: usize,
1231        has_future_updates: bool,
1232    ) -> Plan {
1233        if let Plan {
1234            node:
1235                PlanNode::ArrangeBy {
1236                    input_key,
1237                    input,
1238                    input_mfp,
1239                    mut forms,
1240                    strategy,
1241                },
1242            lir_id,
1243        } = plan
1244        {
1245            forms.raw |= collections.raw;
1246            forms.arranged.extend(collections.arranged);
1247            forms.arranged.sort_by(|k1, k2| k1.0.cmp(&k2.0));
1248            forms.arranged.dedup_by(|k1, k2| k1.0 == k2.0);
1249            PlanNode::ArrangeBy {
1250                input_key,
1251                input,
1252                input_mfp,
1253                forms,
1254                strategy,
1255            }
1256            .as_plan(lir_id)
1257        } else {
1258            let (input_key, input_mfp) = if let Some((input_key, permutation, thinning)) =
1259                old_collections.arbitrary_arrangement()
1260            {
1261                let mut mfp = MapFilterProject::new(arity);
1262                mfp.permute_fn(|c| permutation[c], thinning.len() + input_key.len());
1263                (Some(input_key.clone()), mfp)
1264            } else {
1265                (None, MapFilterProject::new(arity))
1266            };
1267            let lir_id = self.allocate_lir_id();
1268
1269            PlanNode::ArrangeBy {
1270                input_key,
1271                input: Box::new(plan),
1272                input_mfp,
1273                forms: collections,
1274                strategy: strategy_from_future(has_future_updates),
1275            }
1276            .as_plan(lir_id)
1277        }
1278    }
1279}
1280
1281/// Various bits of state to print along with error messages during LIR planning,
1282/// to aid debugging.
1283#[derive(Clone, Debug)]
1284pub struct LirDebugInfo {
1285    debug_name: String,
1286    id: GlobalId,
1287}
1288
1289impl std::fmt::Display for LirDebugInfo {
1290    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1291        write!(f, "Debug name: {}; id: {}", self.debug_name, self.id)
1292    }
1293}