mz_compute_types/plan/
lowering.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Lowering [`DataflowDescription`]s from MIR ([`MirRelationExpr`]) to LIR ([`Plan`]).
11
12use std::collections::BTreeMap;
13
14use columnar::Len;
15use mz_expr::JoinImplementation::{DeltaQuery, Differential, IndexedFilter, Unimplemented};
16use mz_expr::{
17    AggregateExpr, Id, JoinInputMapper, MapFilterProject, MirRelationExpr, MirScalarExpr,
18    OptimizedMirRelationExpr, TableFunc, permutation_for_arrangement,
19};
20use mz_ore::{assert_none, soft_assert_eq_or_log, soft_panic_or_log};
21use mz_repr::GlobalId;
22use mz_repr::optimize::OptimizerFeatures;
23use timely::Container;
24use timely::progress::Timestamp;
25
26use crate::dataflows::{BuildDesc, DataflowDescription, IndexImport};
27use crate::plan::join::{DeltaJoinPlan, JoinPlan, LinearJoinPlan};
28use crate::plan::reduce::{KeyValPlan, ReducePlan};
29use crate::plan::threshold::ThresholdPlan;
30use crate::plan::top_k::TopKPlan;
31use crate::plan::{AvailableCollections, GetPlan, LirId, Plan, PlanNode};
32
33pub(super) struct Context {
34    /// Known bindings to (possibly arranged) collections.
35    arrangements: BTreeMap<Id, AvailableCollections>,
36    /// Tracks the next available `LirId`.
37    next_lir_id: LirId,
38    /// Information to print along with error messages.
39    debug_info: LirDebugInfo,
40    /// Whether to enable fusion of MFPs in reductions.
41    enable_reduce_mfp_fusion: bool,
42}
43
44impl Context {
45    pub fn new(debug_name: String, features: &OptimizerFeatures) -> Self {
46        Self {
47            arrangements: Default::default(),
48            next_lir_id: LirId(1),
49            debug_info: LirDebugInfo {
50                debug_name,
51                id: GlobalId::Transient(0),
52            },
53            enable_reduce_mfp_fusion: features.enable_reduce_mfp_fusion,
54        }
55    }
56
57    fn allocate_lir_id(&mut self) -> LirId {
58        let id = self.next_lir_id;
59        self.next_lir_id = LirId(
60            self.next_lir_id
61                .0
62                .checked_add(1)
63                .expect("No LirId overflow"),
64        );
65        id
66    }
67
68    pub fn lower<T: Timestamp>(
69        mut self,
70        desc: DataflowDescription<OptimizedMirRelationExpr>,
71    ) -> Result<DataflowDescription<Plan<T>>, String> {
72        // Sources might provide arranged forms of their data, in the future.
73        // Indexes provide arranged forms of their data.
74        for IndexImport {
75            desc: index_desc,
76            typ,
77            ..
78        } in desc.index_imports.values()
79        {
80            let key = index_desc.key.clone();
81            // TODO[btv] - We should be told the permutation by
82            // `index_desc`, and it should have been generated
83            // at the same point the thinning logic was.
84            //
85            // We should for sure do that soon, but it requires
86            // a bit of a refactor, so for now we just
87            // _assume_ that they were both generated by `permutation_for_arrangement`,
88            // and recover it here.
89            let (permutation, thinning) = permutation_for_arrangement(&key, typ.arity());
90            let index_keys = self
91                .arrangements
92                .entry(Id::Global(index_desc.on_id))
93                .or_insert_with(AvailableCollections::default);
94            index_keys.arranged.push((key, permutation, thinning));
95        }
96        for id in desc.source_imports.keys() {
97            self.arrangements
98                .entry(Id::Global(*id))
99                .or_insert_with(AvailableCollections::new_raw);
100        }
101
102        // Build each object in order, registering the arrangements it forms.
103        let mut objects_to_build = Vec::with_capacity(desc.objects_to_build.len());
104        for build in desc.objects_to_build {
105            self.debug_info.id = build.id;
106            let (plan, keys) = self.lower_mir_expr(&build.plan)?;
107
108            self.arrangements.insert(Id::Global(build.id), keys);
109            objects_to_build.push(BuildDesc { id: build.id, plan });
110        }
111
112        Ok(DataflowDescription {
113            source_imports: desc.source_imports,
114            index_imports: desc.index_imports,
115            objects_to_build,
116            index_exports: desc.index_exports,
117            sink_exports: desc.sink_exports,
118            as_of: desc.as_of,
119            until: desc.until,
120            initial_storage_as_of: desc.initial_storage_as_of,
121            refresh_schedule: desc.refresh_schedule,
122            debug_name: desc.debug_name,
123            time_dependence: desc.time_dependence,
124        })
125    }
126
127    /// This method converts a MirRelationExpr into a plan that can be directly rendered.
128    ///
129    /// The rough structure is that we repeatedly extract map/filter/project operators
130    /// from each expression we see, bundle them up as a `MapFilterProject` object, and
131    /// then produce a plan for the combination of that with the next operator.
132    ///
133    /// The method accesses `self.arrangements`, which it will locally add to and remove from for
134    /// `Let` bindings (by the end of the call it should contain the same bindings as when it
135    /// started).
136    ///
137    /// The result of the method is both a `Plan`, but also a list of arrangements that
138    /// are certain to be produced, which can be relied on by the next steps in the plan.
139    /// Each of the arrangement keys is associated with an MFP that must be applied if that
140    /// arrangement is used, to back out the permutation associated with that arrangement.
141    ///
142    /// An empty list of arrangement keys indicates that only a `Collection` stream can
143    /// be assumed to exist.
144    fn lower_mir_expr<T: Timestamp>(
145        &mut self,
146        expr: &MirRelationExpr,
147    ) -> Result<(Plan<T>, AvailableCollections), String> {
148        // This function is recursive and can overflow its stack, so grow it if
149        // needed. The growth here is unbounded. Our general solution for this problem
150        // is to use [`ore::stack::RecursionGuard`] to additionally limit the stack
151        // depth. That however requires upstream error handling. This function is
152        // currently called by the Coordinator after calls to `catalog_transact`,
153        // and thus are not allowed to fail. Until that allows errors, we choose
154        // to allow the unbounded growth here. We are though somewhat protected by
155        // higher levels enforcing their own limits on stack depth (in the parser,
156        // transformer/desugarer, and planner).
157        mz_ore::stack::maybe_grow(|| self.lower_mir_expr_stack_safe(expr))
158    }
159
160    fn lower_mir_expr_stack_safe<T>(
161        &mut self,
162        expr: &MirRelationExpr,
163    ) -> Result<(Plan<T>, AvailableCollections), String>
164    where
165        T: Timestamp,
166    {
167        // Extract a maximally large MapFilterProject from `expr`.
168        // We will then try and push this in to the resulting expression.
169        //
170        // Importantly, `mfp` may contain temporal operators and not be a "safe" MFP.
171        // While we would eventually like all plan stages to be able to absorb such
172        // general operators, not all of them can.
173        let (mut mfp, expr) = MapFilterProject::extract_from_expression(expr);
174        // We attempt to plan what we have remaining, in the context of `mfp`.
175        // We may not be able to do this, and must wrap some operators with a `Mfp` stage.
176        let (mut plan, mut keys) = match expr {
177            // These operators should have been extracted from the expression.
178            MirRelationExpr::Map { .. } => {
179                panic!("This operator should have been extracted");
180            }
181            MirRelationExpr::Filter { .. } => {
182                panic!("This operator should have been extracted");
183            }
184            MirRelationExpr::Project { .. } => {
185                panic!("This operator should have been extracted");
186            }
187            // These operators may not have been extracted, and need to result in a `Plan`.
188            MirRelationExpr::Constant { rows, typ: _ } => {
189                let lir_id = self.allocate_lir_id();
190                let node = PlanNode::Constant {
191                    rows: rows.clone().map(|rows| {
192                        rows.into_iter()
193                            .map(|(row, diff)| (row, T::minimum(), diff))
194                            .collect()
195                    }),
196                };
197                // The plan, not arranged in any way.
198                (node.as_plan(lir_id), AvailableCollections::new_raw())
199            }
200            MirRelationExpr::Get { id, typ: _, .. } => {
201                // This stage can absorb arbitrary MFP operators.
202                let mut mfp = mfp.take();
203                // If `mfp` is the identity, we can surface all imported arrangements.
204                // Otherwise, we apply `mfp` and promise no arrangements.
205                let mut in_keys = self
206                    .arrangements
207                    .get(id)
208                    .cloned()
209                    .unwrap_or_else(AvailableCollections::new_raw);
210
211                // Seek out an arrangement key that might be constrained to a literal.
212                // Note: this code has very little use nowadays, as its job was mostly taken over
213                // by `LiteralConstraints` (see in the below longer comment).
214                let key_val = in_keys
215                    .arranged
216                    .iter()
217                    .filter_map(|key| {
218                        mfp.literal_constraints(&key.0)
219                            .map(|val| (key.clone(), val))
220                    })
221                    .max_by_key(|(key, _val)| key.0.len());
222
223                // Determine the plan of action for the `Get` stage.
224                let plan = if let Some(((key, permutation, thinning), val)) = &key_val {
225                    // This code path used to handle looking up literals from indexes, but it's
226                    // mostly deprecated, as this is nowadays performed by the `LiteralConstraints`
227                    // MIR transform instead. However, it's still called in a couple of tricky
228                    // special cases:
229                    // - `LiteralConstraints` handles only Gets of global ids, so this code still
230                    //   gets to handle Filters on top of Gets of local ids.
231                    // - Lowering does a `MapFilterProject::extract_from_expression`, while
232                    //   `LiteralConstraints` does
233                    //   `MapFilterProject::extract_non_errors_from_expr_mut`.
234                    // - It might happen that new literal constraint optimization opportunities
235                    //   appear somewhere near the end of the MIR optimizer after
236                    //   `LiteralConstraints` has already run.
237                    // (Also note that a similar literal constraint handling machinery is also
238                    // present when handling the leftover MFP after this big match.)
239                    mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
240                    in_keys.arranged = vec![(key.clone(), permutation.clone(), thinning.clone())];
241                    GetPlan::Arrangement(key.clone(), Some(val.clone()), mfp)
242                } else if !mfp.is_identity() {
243                    // We need to ensure a collection exists, which means we must form it.
244                    if let Some((key, permutation, thinning)) =
245                        in_keys.arbitrary_arrangement().cloned()
246                    {
247                        mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
248                        in_keys.arranged = vec![(key.clone(), permutation, thinning)];
249                        GetPlan::Arrangement(key, None, mfp)
250                    } else {
251                        GetPlan::Collection(mfp)
252                    }
253                } else {
254                    // By default, just pass input arrangements through.
255                    GetPlan::PassArrangements
256                };
257
258                let out_keys = if let GetPlan::PassArrangements = plan {
259                    in_keys.clone()
260                } else {
261                    AvailableCollections::new_raw()
262                };
263
264                let lir_id = self.allocate_lir_id();
265                let node = PlanNode::Get {
266                    id: id.clone(),
267                    keys: in_keys,
268                    plan,
269                };
270                // Return the plan, and any keys if an identity `mfp`.
271                (node.as_plan(lir_id), out_keys)
272            }
273            MirRelationExpr::Let { id, value, body } => {
274                // It would be unfortunate to have a non-trivial `mfp` here, as we hope
275                // that they would be pushed down. I am not sure if we should take the
276                // initiative to push down the `mfp` ourselves.
277
278                // Plan the value using only the initial arrangements, but
279                // introduce any resulting arrangements bound to `id`.
280                let (value, v_keys) = self.lower_mir_expr(value)?;
281                let pre_existing = self.arrangements.insert(Id::Local(*id), v_keys);
282                assert_none!(pre_existing);
283                // Plan the body using initial and `value` arrangements,
284                // and then remove reference to the value arrangements.
285                let (body, b_keys) = self.lower_mir_expr(body)?;
286                self.arrangements.remove(&Id::Local(*id));
287                // Return the plan, and any `body` arrangements.
288                let lir_id = self.allocate_lir_id();
289                (
290                    PlanNode::Let {
291                        id: id.clone(),
292                        value: Box::new(value),
293                        body: Box::new(body),
294                    }
295                    .as_plan(lir_id),
296                    b_keys,
297                )
298            }
299            MirRelationExpr::LetRec {
300                ids,
301                values,
302                limits,
303                body,
304            } => {
305                assert_eq!(ids.len(), values.len());
306                assert_eq!(ids.len(), limits.len());
307                // Plan the values using only the available arrangements, but
308                // introduce any resulting arrangements bound to each `id`.
309                // Arrangements made available cannot be used by prior bindings,
310                // as we cannot circulate an arrangement through a `Variable` yet.
311                let mut lir_values = Vec::with_capacity(values.len());
312                for (id, value) in ids.iter().zip(values) {
313                    let (mut lir_value, mut v_keys) = self.lower_mir_expr(value)?;
314                    // If `v_keys` does not contain an unarranged collection, we must form it.
315                    if !v_keys.raw {
316                        // Choose an "arbitrary" arrangement; TODO: prefer a specific one.
317                        let (input_key, permutation, thinning) =
318                            v_keys.arbitrary_arrangement().unwrap();
319                        let mut input_mfp = MapFilterProject::new(value.arity());
320                        input_mfp.permute_fn(|c| permutation[c], thinning.len() + input_key.len());
321                        let input_key = Some(input_key.clone());
322
323                        let forms = AvailableCollections::new_raw();
324
325                        // We just want to insert an `ArrangeBy` to form an unarranged collection,
326                        // but there is a complication: We shouldn't break the invariant (created by
327                        // `NormalizeLets`, and relied upon by the rendering) that there isn't
328                        // anything between two `LetRec`s. So if `lir_value` is itself a `LetRec`,
329                        // then we insert the `ArrangeBy` on the `body` of the inner `LetRec`,
330                        // instead of on top of the inner `LetRec`.
331                        lir_value = match lir_value {
332                            Plan {
333                                node:
334                                    PlanNode::LetRec {
335                                        ids,
336                                        values,
337                                        limits,
338                                        body,
339                                    },
340                                lir_id,
341                            } => {
342                                let inner_lir_id = self.allocate_lir_id();
343                                PlanNode::LetRec {
344                                    ids,
345                                    values,
346                                    limits,
347                                    body: Box::new(
348                                        PlanNode::ArrangeBy {
349                                            input_key,
350                                            input: body,
351                                            input_mfp,
352                                            forms,
353                                        }
354                                        .as_plan(inner_lir_id),
355                                    ),
356                                }
357                                .as_plan(lir_id)
358                            }
359                            lir_value => {
360                                let lir_id = self.allocate_lir_id();
361                                PlanNode::ArrangeBy {
362                                    input_key,
363                                    input: Box::new(lir_value),
364                                    input_mfp,
365                                    forms,
366                                }
367                                .as_plan(lir_id)
368                            }
369                        };
370                        v_keys.raw = true;
371                    }
372                    let pre_existing = self.arrangements.insert(Id::Local(*id), v_keys);
373                    assert_none!(pre_existing);
374                    lir_values.push(lir_value);
375                }
376                // As we exit the iterative scope, we must leave all arrangements behind,
377                // as they reference a timestamp coordinate that must be stripped off.
378                for id in ids.iter() {
379                    self.arrangements
380                        .insert(Id::Local(*id), AvailableCollections::new_raw());
381                }
382                // Plan the body using initial and `value` arrangements,
383                // and then remove reference to the value arrangements.
384                let (body, b_keys) = self.lower_mir_expr(body)?;
385                for id in ids.iter() {
386                    self.arrangements.remove(&Id::Local(*id));
387                }
388                // Return the plan, and any `body` arrangements.
389                let lir_id = self.allocate_lir_id();
390                (
391                    PlanNode::LetRec {
392                        ids: ids.clone(),
393                        values: lir_values,
394                        limits: limits.clone(),
395                        body: Box::new(body),
396                    }
397                    .as_plan(lir_id),
398                    b_keys,
399                )
400            }
401            MirRelationExpr::FlatMap {
402                input: flat_map_input,
403                func,
404                exprs,
405            } => {
406                // A `FlatMap UnnestList` that comes after the `Reduce` of a window function can be
407                // fused into the lowered `Reduce`.
408                //
409                // In theory, we could have implemented this also as an MIR transform. However, this
410                // is more of a physical optimization, which are sometimes unpleasant to make a part
411                // of the MIR pipeline. The specific problem here with putting this into the MIR
412                // pipeline would be that we'd need to modify MIR's semantics: MIR's Reduce
413                // currently always emits exactly 1 row per group, but the fused Reduce-FlatMap can
414                // emit multiple rows per group. Such semantic changes of MIR are very scary, since
415                // various parts of the optimizer assume that Reduce emits only 1 row per group, and
416                // it would be very hard to hunt down all these parts. (For example, key inference
417                // infers the group key as a unique key.)
418                let fused_with_reduce = 'fusion: {
419                    if !matches!(func, TableFunc::UnnestList { .. }) {
420                        break 'fusion None;
421                    }
422                    // We might have a Project of a single col between the FlatMap and the
423                    // Reduce. (It projects away the grouping keys of the Reduce, and keeps the
424                    // result of the window function.)
425                    let (maybe_reduce, num_grouping_keys) = if let MirRelationExpr::Project {
426                        input: project_input,
427                        outputs: projection,
428                    } = &**flat_map_input
429                    {
430                        // We want this to be a single column, because we'll want to deal with only
431                        // one aggregation in the `Reduce`. (The aggregation of a window function
432                        // always stands alone currently: we plan them separately from other
433                        // aggregations, and Reduces are never fused. When window functions are
434                        // fused with each other, they end up in one aggregation. When there are
435                        // multiple window functions in the same SELECT, but can't be fused, they
436                        // end up in different Reduces.)
437                        if let &[single_col] = &**projection {
438                            (project_input, single_col)
439                        } else {
440                            break 'fusion None;
441                        }
442                    } else {
443                        (flat_map_input, 0)
444                    };
445                    if let MirRelationExpr::Reduce {
446                        input,
447                        group_key,
448                        aggregates,
449                        monotonic,
450                        expected_group_size,
451                    } = &**maybe_reduce
452                    {
453                        if group_key.len() != num_grouping_keys
454                            || aggregates.len() != 1
455                            || !aggregates[0].func.can_fuse_with_unnest_list()
456                        {
457                            break 'fusion None;
458                        }
459                        // At the beginning, `non_fused_mfp_above_flat_map` will be the original MFP
460                        // above the FlatMap. Later, we'll mutate this to be the residual MFP that
461                        // didn't get fused into the `Reduce`.
462                        let non_fused_mfp_above_flat_map = &mut mfp;
463                        let reduce_output_arity = num_grouping_keys + 1;
464                        // We are fusing away the list that the FlatMap would have been unnesting,
465                        // so the column that had that list disappears, so we have to permute the
466                        // MFP above the FlatMap with this column disappearance.
467                        let tweaked_mfp = {
468                            let mut mfp = non_fused_mfp_above_flat_map.clone();
469                            if mfp.demand().contains(&0) {
470                                // I don't think this can happen currently that this MFP would
471                                // refer to the list column, because both the list column and the
472                                // MFP were constructed by the HIR-to-MIR lowering, so it's not just
473                                // some random MFP that we are seeing here. But anyhow, it's better
474                                // to check this here for robustness against future code changes.
475                                break 'fusion None;
476                            }
477                            let permutation: BTreeMap<_, _> =
478                                (1..mfp.input_arity).map(|col| (col, col - 1)).collect();
479                            mfp.permute_fn(|c| permutation[&c], mfp.input_arity - 1);
480                            mfp
481                        };
482                        // We now put together the project that was before the FlatMap, and the
483                        // tweaked version of the MFP that was after the FlatMap.
484                        // (Part of this MFP might be fused into the Reduce.)
485                        let mut project_and_tweaked_mfp = {
486                            let mut mfp = MapFilterProject::new(reduce_output_arity);
487                            mfp = mfp.project(vec![num_grouping_keys]);
488                            mfp = MapFilterProject::compose(mfp, tweaked_mfp);
489                            mfp
490                        };
491                        let fused = self.lower_reduce(
492                            input,
493                            group_key,
494                            aggregates,
495                            monotonic,
496                            expected_group_size,
497                            &mut project_and_tweaked_mfp,
498                            true,
499                        )?;
500                        // Update the residual MFP.
501                        *non_fused_mfp_above_flat_map = project_and_tweaked_mfp;
502                        Some(fused)
503                    } else {
504                        break 'fusion None;
505                    }
506                };
507                if let Some(fused_with_reduce) = fused_with_reduce {
508                    fused_with_reduce
509                } else {
510                    // Couldn't fuse it with a `Reduce`, so lower as a normal `FlatMap`.
511                    let (input, keys) = self.lower_mir_expr(flat_map_input)?;
512                    // This stage can absorb arbitrary MFP instances.
513                    let mfp = mfp.take();
514                    let mut exprs = exprs.clone();
515                    let input_key = if let Some((k, permutation, _)) = keys.arbitrary_arrangement()
516                    {
517                        // We don't permute the MFP here, because it runs _after_ the table function,
518                        // whose output is in a fixed order.
519                        //
520                        // We _do_, however, need to permute the `expr`s that provide input to the
521                        // `func`.
522                        let permutation = permutation.iter().cloned().enumerate().collect();
523                        for expr in &mut exprs {
524                            expr.permute_map(&permutation);
525                        }
526
527                        Some(k.clone())
528                    } else {
529                        None
530                    };
531
532                    let lir_id = self.allocate_lir_id();
533                    // Return the plan, and no arrangements.
534                    (
535                        PlanNode::FlatMap {
536                            input_key,
537                            input: Box::new(input),
538                            exprs: exprs.clone(),
539                            func: func.clone(),
540                            mfp_after: mfp,
541                        }
542                        .as_plan(lir_id),
543                        AvailableCollections::new_raw(),
544                    )
545                }
546            }
547            MirRelationExpr::Join {
548                inputs,
549                equivalences,
550                implementation,
551            } => {
552                // Plan each of the join inputs independently.
553                // The `plans` get surfaced upwards, and the `input_keys` should
554                // be used as part of join planning / to validate the existing
555                // plans / to aid in indexed seeding of update streams.
556                let mut plans = Vec::new();
557                let mut input_keys = Vec::new();
558                let mut input_arities = Vec::new();
559                for input in inputs.iter() {
560                    let (plan, keys) = self.lower_mir_expr(input)?;
561                    input_arities.push(input.arity());
562                    plans.push(plan);
563                    input_keys.push(keys);
564                }
565
566                let input_mapper =
567                    JoinInputMapper::new_from_input_arities(input_arities.iter().copied());
568
569                // Extract temporal predicates as joins cannot currently absorb them.
570                let (plan, missing) = match implementation {
571                    IndexedFilter(_coll_id, _idx_id, key, _val) => {
572                        // Start with the constant input. (This used to be important before database-issues#4016
573                        // was fixed.)
574                        let start: usize = 1;
575                        let order = vec![(0usize, key.clone(), None)];
576                        // All columns of the constant input will be part of the arrangement key.
577                        let source_arrangement = (
578                            (0..key.len())
579                                .map(MirScalarExpr::column)
580                                .collect::<Vec<_>>(),
581                            (0..key.len()).collect::<Vec<_>>(),
582                            Vec::<usize>::new(),
583                        );
584                        let (ljp, missing) = LinearJoinPlan::create_from(
585                            start,
586                            Some(&source_arrangement),
587                            equivalences,
588                            &order,
589                            input_mapper,
590                            &mut mfp,
591                            &input_keys,
592                        );
593                        (JoinPlan::Linear(ljp), missing)
594                    }
595                    Differential((start, start_arr, _start_characteristic), order) => {
596                        let source_arrangement = start_arr.as_ref().and_then(|key| {
597                            input_keys[*start]
598                                .arranged
599                                .iter()
600                                .find(|(k, _, _)| k == key)
601                                .clone()
602                        });
603                        let (ljp, missing) = LinearJoinPlan::create_from(
604                            *start,
605                            source_arrangement,
606                            equivalences,
607                            order,
608                            input_mapper,
609                            &mut mfp,
610                            &input_keys,
611                        );
612                        (JoinPlan::Linear(ljp), missing)
613                    }
614                    DeltaQuery(orders) => {
615                        let (djp, missing) = DeltaJoinPlan::create_from(
616                            equivalences,
617                            orders,
618                            input_mapper,
619                            &mut mfp,
620                            &input_keys,
621                        );
622                        (JoinPlan::Delta(djp), missing)
623                    }
624                    // Other plans are errors, and should be reported as such.
625                    Unimplemented => return Err("unimplemented join".to_string()),
626                };
627                // The renderer will expect certain arrangements to exist; if any of those are not available, the join planning functions above should have returned them in
628                // `missing`. We thus need to plan them here so they'll exist.
629                let is_delta = matches!(plan, JoinPlan::Delta(_));
630                for (((input_plan, input_keys), missing), arity) in plans
631                    .iter_mut()
632                    .zip(input_keys.iter())
633                    .zip(missing.into_iter())
634                    .zip(input_arities.iter().cloned())
635                {
636                    if missing != Default::default() {
637                        if is_delta {
638                            // join_implementation.rs produced a sub-optimal plan here;
639                            // we shouldn't plan delta joins at all if not all of the required
640                            // arrangements are available. Soft panic in CI and log an error in
641                            // production to increase the chances that we will catch all situations
642                            // that violate this constraint.
643                            soft_panic_or_log!("Arrangements depended on by delta join alarmingly absent: {:?}
644Dataflow info: {}
645This is not expected to cause incorrect results, but could indicate a performance issue in Materialize.", missing, self.debug_info);
646                        } else {
647                            soft_panic_or_log!("Arrangements depended on by a non-delta join are absent: {:?}
648Dataflow info: {}
649This is not expected to cause incorrect results, but could indicate a performance issue in Materialize.", missing, self.debug_info);
650                            // Nowadays MIR transforms take care to insert MIR ArrangeBys for each
651                            // Join input. (Earlier, they were missing in the following cases:
652                            //  - They were const-folded away for constant inputs. This is not
653                            //    happening since
654                            //    https://github.com/MaterializeInc/materialize/pull/16351
655                            //  - They were not being inserted for the constant input of
656                            //    `IndexedFilter`s. This was fixed in
657                            //    https://github.com/MaterializeInc/materialize/pull/20920
658                            //  - They were not being inserted for the first input of Differential
659                            //    joins. This was fixed in
660                            //    https://github.com/MaterializeInc/materialize/pull/16099)
661                        }
662                        let lir_id = self.allocate_lir_id();
663                        let raw_plan = std::mem::replace(
664                            input_plan,
665                            PlanNode::Constant {
666                                rows: Ok(Vec::new()),
667                            }
668                            .as_plan(lir_id),
669                        );
670                        *input_plan = self.arrange_by(raw_plan, missing, input_keys, arity);
671                    }
672                }
673                // Return the plan, and no arrangements.
674                let lir_id = self.allocate_lir_id();
675                (
676                    PlanNode::Join {
677                        inputs: plans,
678                        plan,
679                    }
680                    .as_plan(lir_id),
681                    AvailableCollections::new_raw(),
682                )
683            }
684            MirRelationExpr::Reduce {
685                input,
686                group_key,
687                aggregates,
688                monotonic,
689                expected_group_size,
690            } => {
691                if aggregates
692                    .iter()
693                    .any(|agg| agg.func.can_fuse_with_unnest_list())
694                {
695                    // This case should have been handled at the `MirRelationExpr::FlatMap` case
696                    // above. But that has a pretty complicated pattern matching, so it's not
697                    // unthinkable that it fails.
698                    soft_panic_or_log!(
699                        "Window function performance issue: `reduce_unnest_list_fusion` failed"
700                    );
701                }
702                self.lower_reduce(
703                    input,
704                    group_key,
705                    aggregates,
706                    monotonic,
707                    expected_group_size,
708                    &mut mfp,
709                    false,
710                )?
711            }
712            MirRelationExpr::TopK {
713                input,
714                group_key,
715                order_key,
716                limit,
717                offset,
718                monotonic,
719                expected_group_size,
720            } => {
721                let arity = input.arity();
722                let (input, keys) = self.lower_mir_expr(input)?;
723
724                let top_k_plan = TopKPlan::create_from(
725                    group_key.clone(),
726                    order_key.clone(),
727                    *offset,
728                    limit.clone(),
729                    arity,
730                    *monotonic,
731                    *expected_group_size,
732                );
733
734                // We don't have an MFP here -- install an operator to permute the
735                // input, if necessary.
736                let input = if !keys.raw {
737                    self.arrange_by(input, AvailableCollections::new_raw(), &keys, arity)
738                } else {
739                    input
740                };
741                // Return the plan, and no arrangements.
742                let lir_id = self.allocate_lir_id();
743                (
744                    PlanNode::TopK {
745                        input: Box::new(input),
746                        top_k_plan,
747                    }
748                    .as_plan(lir_id),
749                    AvailableCollections::new_raw(),
750                )
751            }
752            MirRelationExpr::Negate { input } => {
753                let arity = input.arity();
754                let (input, keys) = self.lower_mir_expr(input)?;
755
756                // We don't have an MFP here -- install an operator to permute the
757                // input, if necessary.
758                let input = if !keys.raw {
759                    self.arrange_by(input, AvailableCollections::new_raw(), &keys, arity)
760                } else {
761                    input
762                };
763                // Return the plan, and no arrangements.
764                let lir_id = self.allocate_lir_id();
765                (
766                    PlanNode::Negate {
767                        input: Box::new(input),
768                    }
769                    .as_plan(lir_id),
770                    AvailableCollections::new_raw(),
771                )
772            }
773            MirRelationExpr::Threshold { input } => {
774                let (plan, keys) = self.lower_mir_expr(input)?;
775                let arity = input.arity();
776                let (threshold_plan, required_arrangement) = ThresholdPlan::create_from(arity);
777                let plan = if !keys
778                    .arranged
779                    .iter()
780                    .any(|(key, _, _)| key == &required_arrangement.0)
781                {
782                    self.arrange_by(
783                        plan,
784                        AvailableCollections::new_arranged(vec![required_arrangement]),
785                        &keys,
786                        arity,
787                    )
788                } else {
789                    plan
790                };
791
792                let output_keys = threshold_plan.keys();
793                // Return the plan, and any produced keys.
794                let lir_id = self.allocate_lir_id();
795                (
796                    PlanNode::Threshold {
797                        input: Box::new(plan),
798                        threshold_plan,
799                    }
800                    .as_plan(lir_id),
801                    output_keys,
802                )
803            }
804            MirRelationExpr::Union { base, inputs } => {
805                let arity = base.arity();
806                let mut plans_keys = Vec::with_capacity(1 + inputs.len());
807                let (plan, keys) = self.lower_mir_expr(base)?;
808                plans_keys.push((plan, keys));
809                for input in inputs.iter() {
810                    let (plan, keys) = self.lower_mir_expr(input)?;
811                    plans_keys.push((plan, keys));
812                }
813                let plans = plans_keys
814                    .into_iter()
815                    .map(|(plan, keys)| {
816                        // We don't have an MFP here -- install an operator to permute the
817                        // input, if necessary.
818                        if !keys.raw {
819                            self.arrange_by(plan, AvailableCollections::new_raw(), &keys, arity)
820                        } else {
821                            plan
822                        }
823                    })
824                    .collect();
825                // Return the plan and no arrangements.
826                let lir_id = self.allocate_lir_id();
827                (
828                    PlanNode::Union {
829                        inputs: plans,
830                        consolidate_output: false,
831                    }
832                    .as_plan(lir_id),
833                    AvailableCollections::new_raw(),
834                )
835            }
836            MirRelationExpr::ArrangeBy { input, keys } => {
837                let input_mir = input;
838                let (input, mut input_keys) = self.lower_mir_expr(input)?;
839                // Fill the `types` in `input_keys` if not already present.
840                let arity = input_mir.arity();
841
842                // Determine keys that are not present in `input_keys`.
843                let new_keys = keys
844                    .iter()
845                    .filter(|k1| !input_keys.arranged.iter().any(|(k2, _, _)| k1 == &k2))
846                    .cloned()
847                    .collect::<Vec<_>>();
848                if new_keys.is_empty() {
849                    (input, input_keys)
850                } else {
851                    let mut new_keys = new_keys
852                        .iter()
853                        .cloned()
854                        .map(|k| {
855                            let (permutation, thinning) = permutation_for_arrangement(&k, arity);
856                            (k, permutation, thinning)
857                        })
858                        .collect::<Vec<_>>();
859                    let forms = AvailableCollections {
860                        raw: input_keys.raw,
861                        arranged: new_keys.clone(),
862                    };
863                    let (input_key, input_mfp) = if let Some((input_key, permutation, thinning)) =
864                        input_keys.arbitrary_arrangement()
865                    {
866                        let mut mfp = MapFilterProject::new(arity);
867                        mfp.permute_fn(|c| permutation[c], thinning.len() + input_key.len());
868                        (Some(input_key.clone()), mfp)
869                    } else {
870                        (None, MapFilterProject::new(arity))
871                    };
872                    input_keys.arranged.append(&mut new_keys);
873                    input_keys.arranged.sort_by(|k1, k2| k1.0.cmp(&k2.0));
874
875                    // Return the plan and extended keys.
876                    let lir_id = self.allocate_lir_id();
877                    (
878                        PlanNode::ArrangeBy {
879                            input_key,
880                            input: Box::new(input),
881                            input_mfp,
882                            forms,
883                        }
884                        .as_plan(lir_id),
885                        input_keys,
886                    )
887                }
888            }
889        };
890
891        // If the plan stage did not absorb all linear operators, introduce a new stage to implement them.
892        if !mfp.is_identity() {
893            // Seek out an arrangement key that might be constrained to a literal.
894            // TODO: Improve key selection heuristic.
895            let key_val = keys
896                .arranged
897                .iter()
898                .filter_map(|(key, permutation, thinning)| {
899                    let mut mfp = mfp.clone();
900                    mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
901                    mfp.literal_constraints(key)
902                        .map(|val| (key.clone(), permutation, thinning, val))
903                })
904                .max_by_key(|(key, _, _, _)| key.len());
905
906            // Input key selection strategy:
907            // (1) If we can read a key at a particular value, do so
908            // (2) Otherwise, if there is a key that causes the MFP to be the identity, and
909            // therefore allows us to avoid discarding the arrangement, use that.
910            // (3) Otherwise, if there is _some_ key, use that,
911            // (4) Otherwise just read the raw collection.
912            let input_key_val = if let Some((key, permutation, thinning, val)) = key_val {
913                mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
914
915                Some((key, Some(val)))
916            } else if let Some((key, permutation, thinning)) =
917                keys.arranged.iter().find(|(key, permutation, thinning)| {
918                    let mut mfp = mfp.clone();
919                    mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
920                    mfp.is_identity()
921                })
922            {
923                mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
924                Some((key.clone(), None))
925            } else if let Some((key, permutation, thinning)) = keys.arbitrary_arrangement() {
926                mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
927                Some((key.clone(), None))
928            } else {
929                None
930            };
931
932            if mfp.is_identity() {
933                // We have discovered a key
934                // whose permutation causes the MFP to actually
935                // be the identity! We can keep it around,
936                // but without its permutation this time,
937                // and with a trivial thinning of the right length.
938                let (key, val) = input_key_val.unwrap();
939                let (_old_key, old_permutation, old_thinning) = keys
940                    .arranged
941                    .iter_mut()
942                    .find(|(key2, _, _)| key2 == &key)
943                    .unwrap();
944                *old_permutation = (0..mfp.input_arity).collect();
945                let old_thinned_arity = old_thinning.len();
946                *old_thinning = (0..old_thinned_arity).collect();
947                // Get rid of all other forms, as this is now the only one known to be valid.
948                // TODO[btv] we can probably save the other arrangements too, if we adjust their permutations.
949                // This is not hard to do, but leaving it for a quick follow-up to avoid making the present diff too unwieldy.
950                keys.arranged.retain(|(key2, _, _)| key2 == &key);
951                keys.raw = false;
952
953                // Creating a Plan::Mfp node is now logically unnecessary, but we
954                // should do so anyway when `val` is populated, so that
955                // the `key_val` optimization gets applied.
956                let lir_id = self.allocate_lir_id();
957                if val.is_some() {
958                    plan = PlanNode::Mfp {
959                        input: Box::new(plan),
960                        mfp,
961                        input_key_val: Some((key, val)),
962                    }
963                    .as_plan(lir_id)
964                }
965            } else {
966                let lir_id = self.allocate_lir_id();
967                plan = PlanNode::Mfp {
968                    input: Box::new(plan),
969                    mfp,
970                    input_key_val,
971                }
972                .as_plan(lir_id);
973                keys = AvailableCollections::new_raw();
974            }
975        }
976
977        Ok((plan, keys))
978    }
979
980    /// Lowers a `Reduce` with the given fields and an `mfp_on_top`, which is the MFP that is
981    /// originally on top of the `Reduce`. This MFP, or a part of it, might be fused into the
982    /// `Reduce`, in which case `mfp_on_top` is mutated to be the residual MFP, i.e., what was not
983    /// fused.
984    fn lower_reduce<T: Timestamp>(
985        &mut self,
986        input: &MirRelationExpr,
987        group_key: &Vec<MirScalarExpr>,
988        aggregates: &Vec<AggregateExpr>,
989        monotonic: &bool,
990        expected_group_size: &Option<u64>,
991        mfp_on_top: &mut MapFilterProject,
992        fused_unnest_list: bool,
993    ) -> Result<(Plan<T>, AvailableCollections), String> {
994        let input_arity = input.arity();
995        let (input, keys) = self.lower_mir_expr(input)?;
996        let (input_key, permutation_and_new_arity) =
997            if let Some((input_key, permutation, thinning)) = keys.arbitrary_arrangement() {
998                (
999                    Some(input_key.clone()),
1000                    Some((permutation.clone(), thinning.len() + input_key.len())),
1001                )
1002            } else {
1003                (None, None)
1004            };
1005        let key_val_plan = KeyValPlan::new(
1006            input_arity,
1007            group_key,
1008            aggregates,
1009            permutation_and_new_arity,
1010        );
1011        let reduce_plan = ReducePlan::create_from(
1012            aggregates.clone(),
1013            *monotonic,
1014            *expected_group_size,
1015            fused_unnest_list,
1016        );
1017        // Return the plan, and the keys it produces.
1018        let mfp_after;
1019        let output_arity;
1020        if self.enable_reduce_mfp_fusion {
1021            (mfp_after, *mfp_on_top, output_arity) =
1022                reduce_plan.extract_mfp_after(mfp_on_top.clone(), group_key.len());
1023        } else {
1024            (mfp_after, output_arity) = (
1025                MapFilterProject::new(mfp_on_top.input_arity),
1026                group_key.len() + aggregates.len(),
1027            );
1028        }
1029        soft_assert_eq_or_log!(
1030            mfp_on_top.input_arity,
1031            output_arity,
1032            "Output arity of reduce must match input arity for MFP on top of it"
1033        );
1034        let output_keys = reduce_plan.keys(group_key.len(), output_arity);
1035        let lir_id = self.allocate_lir_id();
1036        Ok((
1037            PlanNode::Reduce {
1038                input_key,
1039                input: Box::new(input),
1040                key_val_plan,
1041                plan: reduce_plan,
1042                mfp_after,
1043            }
1044            .as_plan(lir_id),
1045            output_keys,
1046        ))
1047    }
1048
1049    /// Replace the plan with another one
1050    /// that has the collection in some additional forms.
1051    pub fn arrange_by<T>(
1052        &mut self,
1053        plan: Plan<T>,
1054        collections: AvailableCollections,
1055        old_collections: &AvailableCollections,
1056        arity: usize,
1057    ) -> Plan<T> {
1058        if let Plan {
1059            node:
1060                PlanNode::ArrangeBy {
1061                    input_key,
1062                    input,
1063                    input_mfp,
1064                    mut forms,
1065                },
1066            lir_id,
1067        } = plan
1068        {
1069            forms.raw |= collections.raw;
1070            forms.arranged.extend(collections.arranged);
1071            forms.arranged.sort_by(|k1, k2| k1.0.cmp(&k2.0));
1072            forms.arranged.dedup_by(|k1, k2| k1.0 == k2.0);
1073            PlanNode::ArrangeBy {
1074                input_key,
1075                input,
1076                input_mfp,
1077                forms,
1078            }
1079            .as_plan(lir_id)
1080        } else {
1081            let (input_key, input_mfp) = if let Some((input_key, permutation, thinning)) =
1082                old_collections.arbitrary_arrangement()
1083            {
1084                let mut mfp = MapFilterProject::new(arity);
1085                mfp.permute_fn(|c| permutation[c], thinning.len() + input_key.len());
1086                (Some(input_key.clone()), mfp)
1087            } else {
1088                (None, MapFilterProject::new(arity))
1089            };
1090            let lir_id = self.allocate_lir_id();
1091
1092            PlanNode::ArrangeBy {
1093                input_key,
1094                input: Box::new(plan),
1095                input_mfp,
1096                forms: collections,
1097            }
1098            .as_plan(lir_id)
1099        }
1100    }
1101}
1102
1103/// Various bits of state to print along with error messages during LIR planning,
1104/// to aid debugging.
1105#[derive(Clone, Debug)]
1106pub struct LirDebugInfo {
1107    debug_name: String,
1108    id: GlobalId,
1109}
1110
1111impl std::fmt::Display for LirDebugInfo {
1112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1113        write!(f, "Debug name: {}; id: {}", self.debug_name, self.id)
1114    }
1115}