mz_compute_types/plan/
lowering.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Lowering [`DataflowDescription`]s from MIR ([`MirRelationExpr`]) to LIR ([`Plan`]).
11
12use std::collections::BTreeMap;
13
14use columnar::Len;
15use itertools::Itertools;
16use mz_expr::JoinImplementation::{DeltaQuery, Differential, IndexedFilter, Unimplemented};
17use mz_expr::{
18    AggregateExpr, Id, JoinInputMapper, MapFilterProject, MirRelationExpr, MirScalarExpr,
19    OptimizedMirRelationExpr, TableFunc, permutation_for_arrangement,
20};
21use mz_ore::{assert_none, soft_assert_eq_or_log, soft_panic_or_log};
22use mz_repr::GlobalId;
23use mz_repr::optimize::OptimizerFeatures;
24use timely::Container;
25use timely::progress::Timestamp;
26
27use crate::dataflows::{BuildDesc, DataflowDescription, IndexImport};
28use crate::plan::join::{DeltaJoinPlan, JoinPlan, LinearJoinPlan};
29use crate::plan::reduce::{KeyValPlan, ReducePlan};
30use crate::plan::threshold::ThresholdPlan;
31use crate::plan::top_k::TopKPlan;
32use crate::plan::{AvailableCollections, GetPlan, LirId, Plan, PlanNode};
33
34pub(super) struct Context {
35    /// Known bindings to (possibly arranged) collections.
36    arrangements: BTreeMap<Id, AvailableCollections>,
37    /// Tracks the next available `LirId`.
38    next_lir_id: LirId,
39    /// Information to print along with error messages.
40    debug_info: LirDebugInfo,
41    /// Whether to enable fusion of MFPs in reductions.
42    enable_reduce_mfp_fusion: bool,
43}
44
45impl Context {
46    pub fn new(debug_name: String, features: &OptimizerFeatures) -> Self {
47        Self {
48            arrangements: Default::default(),
49            next_lir_id: LirId(1),
50            debug_info: LirDebugInfo {
51                debug_name,
52                id: GlobalId::Transient(0),
53            },
54            enable_reduce_mfp_fusion: features.enable_reduce_mfp_fusion,
55        }
56    }
57
58    fn allocate_lir_id(&mut self) -> LirId {
59        let id = self.next_lir_id;
60        self.next_lir_id = LirId(
61            self.next_lir_id
62                .0
63                .checked_add(1)
64                .expect("No LirId overflow"),
65        );
66        id
67    }
68
69    pub fn lower<T: Timestamp>(
70        mut self,
71        desc: DataflowDescription<OptimizedMirRelationExpr>,
72    ) -> Result<DataflowDescription<Plan<T>>, String> {
73        // Sources might provide arranged forms of their data, in the future.
74        // Indexes provide arranged forms of their data.
75        for IndexImport {
76            desc: index_desc,
77            typ,
78            ..
79        } in desc.index_imports.values()
80        {
81            let key = index_desc.key.clone();
82            // TODO[btv] - We should be told the permutation by
83            // `index_desc`, and it should have been generated
84            // at the same point the thinning logic was.
85            //
86            // We should for sure do that soon, but it requires
87            // a bit of a refactor, so for now we just
88            // _assume_ that they were both generated by `permutation_for_arrangement`,
89            // and recover it here.
90            let (permutation, thinning) = permutation_for_arrangement(&key, typ.arity());
91            let index_keys = self
92                .arrangements
93                .entry(Id::Global(index_desc.on_id))
94                .or_insert_with(AvailableCollections::default);
95            index_keys.arranged.push((key, permutation, thinning));
96        }
97        for id in desc.source_imports.keys() {
98            self.arrangements
99                .entry(Id::Global(*id))
100                .or_insert_with(AvailableCollections::new_raw);
101        }
102
103        // Build each object in order, registering the arrangements it forms.
104        let mut objects_to_build = Vec::with_capacity(desc.objects_to_build.len());
105        for build in desc.objects_to_build {
106            self.debug_info.id = build.id;
107            let (plan, keys) = self.lower_mir_expr(&build.plan)?;
108
109            self.arrangements.insert(Id::Global(build.id), keys);
110            objects_to_build.push(BuildDesc { id: build.id, plan });
111        }
112
113        Ok(DataflowDescription {
114            source_imports: desc.source_imports,
115            index_imports: desc.index_imports,
116            objects_to_build,
117            index_exports: desc.index_exports,
118            sink_exports: desc.sink_exports,
119            as_of: desc.as_of,
120            until: desc.until,
121            initial_storage_as_of: desc.initial_storage_as_of,
122            refresh_schedule: desc.refresh_schedule,
123            debug_name: desc.debug_name,
124            time_dependence: desc.time_dependence,
125        })
126    }
127
128    /// This method converts a MirRelationExpr into a plan that can be directly rendered.
129    ///
130    /// The rough structure is that we repeatedly extract map/filter/project operators
131    /// from each expression we see, bundle them up as a `MapFilterProject` object, and
132    /// then produce a plan for the combination of that with the next operator.
133    ///
134    /// The method accesses `self.arrangements`, which it will locally add to and remove from for
135    /// `Let` bindings (by the end of the call it should contain the same bindings as when it
136    /// started).
137    ///
138    /// The result of the method is both a `Plan`, but also a list of arrangements that
139    /// are certain to be produced, which can be relied on by the next steps in the plan.
140    /// Each of the arrangement keys is associated with an MFP that must be applied if that
141    /// arrangement is used, to back out the permutation associated with that arrangement.
142    ///
143    /// An empty list of arrangement keys indicates that only a `Collection` stream can
144    /// be assumed to exist.
145    fn lower_mir_expr<T: Timestamp>(
146        &mut self,
147        expr: &MirRelationExpr,
148    ) -> Result<(Plan<T>, AvailableCollections), String> {
149        // This function is recursive and can overflow its stack, so grow it if
150        // needed. The growth here is unbounded. Our general solution for this problem
151        // is to use [`ore::stack::RecursionGuard`] to additionally limit the stack
152        // depth. That however requires upstream error handling. This function is
153        // currently called by the Coordinator after calls to `catalog_transact`,
154        // and thus are not allowed to fail. Until that allows errors, we choose
155        // to allow the unbounded growth here. We are though somewhat protected by
156        // higher levels enforcing their own limits on stack depth (in the parser,
157        // transformer/desugarer, and planner).
158        mz_ore::stack::maybe_grow(|| self.lower_mir_expr_stack_safe(expr))
159    }
160
161    fn lower_mir_expr_stack_safe<T>(
162        &mut self,
163        expr: &MirRelationExpr,
164    ) -> Result<(Plan<T>, AvailableCollections), String>
165    where
166        T: Timestamp,
167    {
168        // Extract a maximally large MapFilterProject from `expr`.
169        // We will then try and push this in to the resulting expression.
170        //
171        // Importantly, `mfp` may contain temporal operators and not be a "safe" MFP.
172        // While we would eventually like all plan stages to be able to absorb such
173        // general operators, not all of them can.
174        let (mut mfp, expr) = MapFilterProject::extract_from_expression(expr);
175        // We attempt to plan what we have remaining, in the context of `mfp`.
176        // We may not be able to do this, and must wrap some operators with a `Mfp` stage.
177        let (mut plan, mut keys) = match expr {
178            // These operators should have been extracted from the expression.
179            MirRelationExpr::Map { .. } => {
180                panic!("This operator should have been extracted");
181            }
182            MirRelationExpr::Filter { .. } => {
183                panic!("This operator should have been extracted");
184            }
185            MirRelationExpr::Project { .. } => {
186                panic!("This operator should have been extracted");
187            }
188            // These operators may not have been extracted, and need to result in a `Plan`.
189            MirRelationExpr::Constant { rows, typ: _ } => {
190                let lir_id = self.allocate_lir_id();
191                let node = PlanNode::Constant {
192                    rows: rows.clone().map(|rows| {
193                        rows.into_iter()
194                            .map(|(row, diff)| (row, T::minimum(), diff))
195                            .collect()
196                    }),
197                };
198                // The plan, not arranged in any way.
199                (node.as_plan(lir_id), AvailableCollections::new_raw())
200            }
201            MirRelationExpr::Get { id, typ: _, .. } => {
202                // This stage can absorb arbitrary MFP operators.
203                let mut mfp = mfp.take();
204                // If `mfp` is the identity, we can surface all imported arrangements.
205                // Otherwise, we apply `mfp` and promise no arrangements.
206                let mut in_keys = self
207                    .arrangements
208                    .get(id)
209                    .cloned()
210                    .unwrap_or_else(AvailableCollections::new_raw);
211
212                // Seek out an arrangement key that might be constrained to a literal.
213                // Note: this code has very little use nowadays, as its job was mostly taken over
214                // by `LiteralConstraints` (see in the below longer comment).
215                let key_val = in_keys
216                    .arranged
217                    .iter()
218                    .filter_map(|key| {
219                        mfp.literal_constraints(&key.0)
220                            .map(|val| (key.clone(), val))
221                    })
222                    .max_by_key(|(key, _val)| key.0.len());
223
224                // Determine the plan of action for the `Get` stage.
225                let plan = if let Some(((key, permutation, thinning), val)) = &key_val {
226                    // This code path used to handle looking up literals from indexes, but it's
227                    // mostly deprecated, as this is nowadays performed by the `LiteralConstraints`
228                    // MIR transform instead. However, it's still called in a couple of tricky
229                    // special cases:
230                    // - `LiteralConstraints` handles only Gets of global ids, so this code still
231                    //   gets to handle Filters on top of Gets of local ids.
232                    // - Lowering does a `MapFilterProject::extract_from_expression`, while
233                    //   `LiteralConstraints` does
234                    //   `MapFilterProject::extract_non_errors_from_expr_mut`.
235                    // - It might happen that new literal constraint optimization opportunities
236                    //   appear somewhere near the end of the MIR optimizer after
237                    //   `LiteralConstraints` has already run.
238                    // (Also note that a similar literal constraint handling machinery is also
239                    // present when handling the leftover MFP after this big match.)
240                    mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
241                    in_keys.arranged = vec![(key.clone(), permutation.clone(), thinning.clone())];
242                    GetPlan::Arrangement(key.clone(), Some(val.clone()), mfp)
243                } else if !mfp.is_identity() {
244                    // We need to ensure a collection exists, which means we must form it.
245                    if let Some((key, permutation, thinning)) =
246                        in_keys.arbitrary_arrangement().cloned()
247                    {
248                        mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
249                        in_keys.arranged = vec![(key.clone(), permutation, thinning)];
250                        GetPlan::Arrangement(key, None, mfp)
251                    } else {
252                        GetPlan::Collection(mfp)
253                    }
254                } else {
255                    // By default, just pass input arrangements through.
256                    GetPlan::PassArrangements
257                };
258
259                let out_keys = if let GetPlan::PassArrangements = plan {
260                    in_keys.clone()
261                } else {
262                    AvailableCollections::new_raw()
263                };
264
265                let lir_id = self.allocate_lir_id();
266                let node = PlanNode::Get {
267                    id: id.clone(),
268                    keys: in_keys,
269                    plan,
270                };
271                // Return the plan, and any keys if an identity `mfp`.
272                (node.as_plan(lir_id), out_keys)
273            }
274            MirRelationExpr::Let { id, value, body } => {
275                // It would be unfortunate to have a non-trivial `mfp` here, as we hope
276                // that they would be pushed down. I am not sure if we should take the
277                // initiative to push down the `mfp` ourselves.
278
279                // Plan the value using only the initial arrangements, but
280                // introduce any resulting arrangements bound to `id`.
281                let (value, v_keys) = self.lower_mir_expr(value)?;
282                let pre_existing = self.arrangements.insert(Id::Local(*id), v_keys);
283                assert_none!(pre_existing);
284                // Plan the body using initial and `value` arrangements,
285                // and then remove reference to the value arrangements.
286                let (body, b_keys) = self.lower_mir_expr(body)?;
287                self.arrangements.remove(&Id::Local(*id));
288                // Return the plan, and any `body` arrangements.
289                let lir_id = self.allocate_lir_id();
290                (
291                    PlanNode::Let {
292                        id: id.clone(),
293                        value: Box::new(value),
294                        body: Box::new(body),
295                    }
296                    .as_plan(lir_id),
297                    b_keys,
298                )
299            }
300            MirRelationExpr::LetRec {
301                ids,
302                values,
303                limits,
304                body,
305            } => {
306                assert_eq!(ids.len(), values.len());
307                assert_eq!(ids.len(), limits.len());
308                // Plan the values using only the available arrangements, but
309                // introduce any resulting arrangements bound to each `id`.
310                // Arrangements made available cannot be used by prior bindings,
311                // as we cannot circulate an arrangement through a `Variable` yet.
312                let mut lir_values = Vec::with_capacity(values.len());
313                for (id, value) in ids.iter().zip_eq(values) {
314                    let (mut lir_value, mut v_keys) = self.lower_mir_expr(value)?;
315                    // If `v_keys` does not contain an unarranged collection, we must form it.
316                    if !v_keys.raw {
317                        // Choose an "arbitrary" arrangement; TODO: prefer a specific one.
318                        let (input_key, permutation, thinning) =
319                            v_keys.arbitrary_arrangement().unwrap();
320                        let mut input_mfp = MapFilterProject::new(value.arity());
321                        input_mfp.permute_fn(|c| permutation[c], thinning.len() + input_key.len());
322                        let input_key = Some(input_key.clone());
323
324                        let forms = AvailableCollections::new_raw();
325
326                        // We just want to insert an `ArrangeBy` to form an unarranged collection,
327                        // but there is a complication: We shouldn't break the invariant (created by
328                        // `NormalizeLets`, and relied upon by the rendering) that there isn't
329                        // anything between two `LetRec`s. So if `lir_value` is itself a `LetRec`,
330                        // then we insert the `ArrangeBy` on the `body` of the inner `LetRec`,
331                        // instead of on top of the inner `LetRec`.
332                        lir_value = match lir_value {
333                            Plan {
334                                node:
335                                    PlanNode::LetRec {
336                                        ids,
337                                        values,
338                                        limits,
339                                        body,
340                                    },
341                                lir_id,
342                            } => {
343                                let inner_lir_id = self.allocate_lir_id();
344                                PlanNode::LetRec {
345                                    ids,
346                                    values,
347                                    limits,
348                                    body: Box::new(
349                                        PlanNode::ArrangeBy {
350                                            input_key,
351                                            input: body,
352                                            input_mfp,
353                                            forms,
354                                        }
355                                        .as_plan(inner_lir_id),
356                                    ),
357                                }
358                                .as_plan(lir_id)
359                            }
360                            lir_value => {
361                                let lir_id = self.allocate_lir_id();
362                                PlanNode::ArrangeBy {
363                                    input_key,
364                                    input: Box::new(lir_value),
365                                    input_mfp,
366                                    forms,
367                                }
368                                .as_plan(lir_id)
369                            }
370                        };
371                        v_keys.raw = true;
372                    }
373                    let pre_existing = self.arrangements.insert(Id::Local(*id), v_keys);
374                    assert_none!(pre_existing);
375                    lir_values.push(lir_value);
376                }
377                // As we exit the iterative scope, we must leave all arrangements behind,
378                // as they reference a timestamp coordinate that must be stripped off.
379                for id in ids.iter() {
380                    self.arrangements
381                        .insert(Id::Local(*id), AvailableCollections::new_raw());
382                }
383                // Plan the body using initial and `value` arrangements,
384                // and then remove reference to the value arrangements.
385                let (body, b_keys) = self.lower_mir_expr(body)?;
386                for id in ids.iter() {
387                    self.arrangements.remove(&Id::Local(*id));
388                }
389                // Return the plan, and any `body` arrangements.
390                let lir_id = self.allocate_lir_id();
391                (
392                    PlanNode::LetRec {
393                        ids: ids.clone(),
394                        values: lir_values,
395                        limits: limits.clone(),
396                        body: Box::new(body),
397                    }
398                    .as_plan(lir_id),
399                    b_keys,
400                )
401            }
402            MirRelationExpr::FlatMap {
403                input: flat_map_input,
404                func,
405                exprs,
406            } => {
407                // A `FlatMap UnnestList` that comes after the `Reduce` of a window function can be
408                // fused into the lowered `Reduce`.
409                //
410                // In theory, we could have implemented this also as an MIR transform. However, this
411                // is more of a physical optimization, which are sometimes unpleasant to make a part
412                // of the MIR pipeline. The specific problem here with putting this into the MIR
413                // pipeline would be that we'd need to modify MIR's semantics: MIR's Reduce
414                // currently always emits exactly 1 row per group, but the fused Reduce-FlatMap can
415                // emit multiple rows per group. Such semantic changes of MIR are very scary, since
416                // various parts of the optimizer assume that Reduce emits only 1 row per group, and
417                // it would be very hard to hunt down all these parts. (For example, key inference
418                // infers the group key as a unique key.)
419                let fused_with_reduce = 'fusion: {
420                    if !matches!(func, TableFunc::UnnestList { .. }) {
421                        break 'fusion None;
422                    }
423                    // We might have a Project of a single col between the FlatMap and the
424                    // Reduce. (It projects away the grouping keys of the Reduce, and keeps the
425                    // result of the window function.)
426                    let (maybe_reduce, num_grouping_keys) = if let MirRelationExpr::Project {
427                        input: project_input,
428                        outputs: projection,
429                    } = &**flat_map_input
430                    {
431                        // We want this to be a single column, because we'll want to deal with only
432                        // one aggregation in the `Reduce`. (The aggregation of a window function
433                        // always stands alone currently: we plan them separately from other
434                        // aggregations, and Reduces are never fused. When window functions are
435                        // fused with each other, they end up in one aggregation. When there are
436                        // multiple window functions in the same SELECT, but can't be fused, they
437                        // end up in different Reduces.)
438                        if let &[single_col] = &**projection {
439                            (project_input, single_col)
440                        } else {
441                            break 'fusion None;
442                        }
443                    } else {
444                        (flat_map_input, 0)
445                    };
446                    if let MirRelationExpr::Reduce {
447                        input,
448                        group_key,
449                        aggregates,
450                        monotonic,
451                        expected_group_size,
452                    } = &**maybe_reduce
453                    {
454                        if group_key.len() != num_grouping_keys
455                            || aggregates.len() != 1
456                            || !aggregates[0].func.can_fuse_with_unnest_list()
457                        {
458                            break 'fusion None;
459                        }
460                        // At the beginning, `non_fused_mfp_above_flat_map` will be the original MFP
461                        // above the FlatMap. Later, we'll mutate this to be the residual MFP that
462                        // didn't get fused into the `Reduce`.
463                        let non_fused_mfp_above_flat_map = &mut mfp;
464                        let reduce_output_arity = num_grouping_keys + 1;
465                        // We are fusing away the list that the FlatMap would have been unnesting,
466                        // so the column that had that list disappears, so we have to permute the
467                        // MFP above the FlatMap with this column disappearance.
468                        let tweaked_mfp = {
469                            let mut mfp = non_fused_mfp_above_flat_map.clone();
470                            if mfp.demand().contains(&0) {
471                                // I don't think this can happen currently that this MFP would
472                                // refer to the list column, because both the list column and the
473                                // MFP were constructed by the HIR-to-MIR lowering, so it's not just
474                                // some random MFP that we are seeing here. But anyhow, it's better
475                                // to check this here for robustness against future code changes.
476                                break 'fusion None;
477                            }
478                            let permutation: BTreeMap<_, _> =
479                                (1..mfp.input_arity).map(|col| (col, col - 1)).collect();
480                            mfp.permute_fn(|c| permutation[&c], mfp.input_arity - 1);
481                            mfp
482                        };
483                        // We now put together the project that was before the FlatMap, and the
484                        // tweaked version of the MFP that was after the FlatMap.
485                        // (Part of this MFP might be fused into the Reduce.)
486                        let mut project_and_tweaked_mfp = {
487                            let mut mfp = MapFilterProject::new(reduce_output_arity);
488                            mfp = mfp.project(vec![num_grouping_keys]);
489                            mfp = MapFilterProject::compose(mfp, tweaked_mfp);
490                            mfp
491                        };
492                        let fused = self.lower_reduce(
493                            input,
494                            group_key,
495                            aggregates,
496                            monotonic,
497                            expected_group_size,
498                            &mut project_and_tweaked_mfp,
499                            true,
500                        )?;
501                        // Update the residual MFP.
502                        *non_fused_mfp_above_flat_map = project_and_tweaked_mfp;
503                        Some(fused)
504                    } else {
505                        break 'fusion None;
506                    }
507                };
508                if let Some(fused_with_reduce) = fused_with_reduce {
509                    fused_with_reduce
510                } else {
511                    // Couldn't fuse it with a `Reduce`, so lower as a normal `FlatMap`.
512                    let (input, keys) = self.lower_mir_expr(flat_map_input)?;
513                    // This stage can absorb arbitrary MFP instances.
514                    let mfp = mfp.take();
515                    let mut exprs = exprs.clone();
516                    let input_key = if let Some((k, permutation, _)) = keys.arbitrary_arrangement()
517                    {
518                        // We don't permute the MFP here, because it runs _after_ the table function,
519                        // whose output is in a fixed order.
520                        //
521                        // We _do_, however, need to permute the `expr`s that provide input to the
522                        // `func`.
523                        let permutation = permutation.iter().cloned().enumerate().collect();
524                        for expr in &mut exprs {
525                            expr.permute_map(&permutation);
526                        }
527
528                        Some(k.clone())
529                    } else {
530                        None
531                    };
532
533                    let lir_id = self.allocate_lir_id();
534                    // Return the plan, and no arrangements.
535                    (
536                        PlanNode::FlatMap {
537                            input_key,
538                            input: Box::new(input),
539                            exprs: exprs.clone(),
540                            func: func.clone(),
541                            mfp_after: mfp,
542                        }
543                        .as_plan(lir_id),
544                        AvailableCollections::new_raw(),
545                    )
546                }
547            }
548            MirRelationExpr::Join {
549                inputs,
550                equivalences,
551                implementation,
552            } => {
553                // Plan each of the join inputs independently.
554                // The `plans` get surfaced upwards, and the `input_keys` should
555                // be used as part of join planning / to validate the existing
556                // plans / to aid in indexed seeding of update streams.
557                let mut plans = Vec::new();
558                let mut input_keys = Vec::new();
559                let mut input_arities = Vec::new();
560                for input in inputs.iter() {
561                    let (plan, keys) = self.lower_mir_expr(input)?;
562                    input_arities.push(input.arity());
563                    plans.push(plan);
564                    input_keys.push(keys);
565                }
566
567                let input_mapper =
568                    JoinInputMapper::new_from_input_arities(input_arities.iter().copied());
569
570                // Extract temporal predicates as joins cannot currently absorb them.
571                let (plan, missing) = match implementation {
572                    IndexedFilter(_coll_id, _idx_id, key, _val) => {
573                        // Start with the constant input. (This used to be important before database-issues#4016
574                        // was fixed.)
575                        let start: usize = 1;
576                        let order = vec![(0usize, key.clone(), None)];
577                        // All columns of the constant input will be part of the arrangement key.
578                        let source_arrangement = (
579                            (0..key.len())
580                                .map(MirScalarExpr::column)
581                                .collect::<Vec<_>>(),
582                            (0..key.len()).collect::<Vec<_>>(),
583                            Vec::<usize>::new(),
584                        );
585                        let (ljp, missing) = LinearJoinPlan::create_from(
586                            start,
587                            Some(&source_arrangement),
588                            equivalences,
589                            &order,
590                            input_mapper,
591                            &mut mfp,
592                            &input_keys,
593                        );
594                        (JoinPlan::Linear(ljp), missing)
595                    }
596                    Differential((start, start_arr, _start_characteristic), order) => {
597                        let source_arrangement = start_arr.as_ref().and_then(|key| {
598                            input_keys[*start]
599                                .arranged
600                                .iter()
601                                .find(|(k, _, _)| k == key)
602                                .clone()
603                        });
604                        let (ljp, missing) = LinearJoinPlan::create_from(
605                            *start,
606                            source_arrangement,
607                            equivalences,
608                            order,
609                            input_mapper,
610                            &mut mfp,
611                            &input_keys,
612                        );
613                        (JoinPlan::Linear(ljp), missing)
614                    }
615                    DeltaQuery(orders) => {
616                        let (djp, missing) = DeltaJoinPlan::create_from(
617                            equivalences,
618                            orders,
619                            input_mapper,
620                            &mut mfp,
621                            &input_keys,
622                        );
623                        (JoinPlan::Delta(djp), missing)
624                    }
625                    // Other plans are errors, and should be reported as such.
626                    Unimplemented => return Err("unimplemented join".to_string()),
627                };
628                // The renderer will expect certain arrangements to exist; if any of those are not available, the join planning functions above should have returned them in
629                // `missing`. We thus need to plan them here so they'll exist.
630                let is_delta = matches!(plan, JoinPlan::Delta(_));
631                for (((input_plan, input_keys), missing), arity) in plans
632                    .iter_mut()
633                    .zip_eq(input_keys.iter())
634                    .zip_eq(missing.into_iter())
635                    .zip_eq(input_arities.iter().cloned())
636                {
637                    if missing != Default::default() {
638                        if is_delta {
639                            // join_implementation.rs produced a sub-optimal plan here;
640                            // we shouldn't plan delta joins at all if not all of the required
641                            // arrangements are available. Soft panic in CI and log an error in
642                            // production to increase the chances that we will catch all situations
643                            // that violate this constraint.
644                            soft_panic_or_log!("Arrangements depended on by delta join alarmingly absent: {:?}
645Dataflow info: {}
646This is not expected to cause incorrect results, but could indicate a performance issue in Materialize.", missing, self.debug_info);
647                        } else {
648                            soft_panic_or_log!("Arrangements depended on by a non-delta join are absent: {:?}
649Dataflow info: {}
650This is not expected to cause incorrect results, but could indicate a performance issue in Materialize.", missing, self.debug_info);
651                            // Nowadays MIR transforms take care to insert MIR ArrangeBys for each
652                            // Join input. (Earlier, they were missing in the following cases:
653                            //  - They were const-folded away for constant inputs. This is not
654                            //    happening since
655                            //    https://github.com/MaterializeInc/materialize/pull/16351
656                            //  - They were not being inserted for the constant input of
657                            //    `IndexedFilter`s. This was fixed in
658                            //    https://github.com/MaterializeInc/materialize/pull/20920
659                            //  - They were not being inserted for the first input of Differential
660                            //    joins. This was fixed in
661                            //    https://github.com/MaterializeInc/materialize/pull/16099)
662                        }
663                        let lir_id = self.allocate_lir_id();
664                        let raw_plan = std::mem::replace(
665                            input_plan,
666                            PlanNode::Constant {
667                                rows: Ok(Vec::new()),
668                            }
669                            .as_plan(lir_id),
670                        );
671                        *input_plan = self.arrange_by(raw_plan, missing, input_keys, arity);
672                    }
673                }
674                // Return the plan, and no arrangements.
675                let lir_id = self.allocate_lir_id();
676                (
677                    PlanNode::Join {
678                        inputs: plans,
679                        plan,
680                    }
681                    .as_plan(lir_id),
682                    AvailableCollections::new_raw(),
683                )
684            }
685            MirRelationExpr::Reduce {
686                input,
687                group_key,
688                aggregates,
689                monotonic,
690                expected_group_size,
691            } => {
692                if aggregates
693                    .iter()
694                    .any(|agg| agg.func.can_fuse_with_unnest_list())
695                {
696                    // This case should have been handled at the `MirRelationExpr::FlatMap` case
697                    // above. But that has a pretty complicated pattern matching, so it's not
698                    // unthinkable that it fails.
699                    soft_panic_or_log!(
700                        "Window function performance issue: `reduce_unnest_list_fusion` failed"
701                    );
702                }
703                self.lower_reduce(
704                    input,
705                    group_key,
706                    aggregates,
707                    monotonic,
708                    expected_group_size,
709                    &mut mfp,
710                    false,
711                )?
712            }
713            MirRelationExpr::TopK {
714                input,
715                group_key,
716                order_key,
717                limit,
718                offset,
719                monotonic,
720                expected_group_size,
721            } => {
722                let arity = input.arity();
723                let (input, keys) = self.lower_mir_expr(input)?;
724
725                let top_k_plan = TopKPlan::create_from(
726                    group_key.clone(),
727                    order_key.clone(),
728                    *offset,
729                    limit.clone(),
730                    arity,
731                    *monotonic,
732                    *expected_group_size,
733                );
734
735                // We don't have an MFP here -- install an operator to permute the
736                // input, if necessary.
737                let input = if !keys.raw {
738                    self.arrange_by(input, AvailableCollections::new_raw(), &keys, arity)
739                } else {
740                    input
741                };
742                // Return the plan, and no arrangements.
743                let lir_id = self.allocate_lir_id();
744                (
745                    PlanNode::TopK {
746                        input: Box::new(input),
747                        top_k_plan,
748                    }
749                    .as_plan(lir_id),
750                    AvailableCollections::new_raw(),
751                )
752            }
753            MirRelationExpr::Negate { input } => {
754                let arity = input.arity();
755                let (input, keys) = self.lower_mir_expr(input)?;
756
757                // We don't have an MFP here -- install an operator to permute the
758                // input, if necessary.
759                let input = if !keys.raw {
760                    self.arrange_by(input, AvailableCollections::new_raw(), &keys, arity)
761                } else {
762                    input
763                };
764                // Return the plan, and no arrangements.
765                let lir_id = self.allocate_lir_id();
766                (
767                    PlanNode::Negate {
768                        input: Box::new(input),
769                    }
770                    .as_plan(lir_id),
771                    AvailableCollections::new_raw(),
772                )
773            }
774            MirRelationExpr::Threshold { input } => {
775                let (plan, keys) = self.lower_mir_expr(input)?;
776                let arity = input.arity();
777                let (threshold_plan, required_arrangement) = ThresholdPlan::create_from(arity);
778                let plan = if !keys
779                    .arranged
780                    .iter()
781                    .any(|(key, _, _)| key == &required_arrangement.0)
782                {
783                    self.arrange_by(
784                        plan,
785                        AvailableCollections::new_arranged(vec![required_arrangement]),
786                        &keys,
787                        arity,
788                    )
789                } else {
790                    plan
791                };
792
793                let output_keys = threshold_plan.keys();
794                // Return the plan, and any produced keys.
795                let lir_id = self.allocate_lir_id();
796                (
797                    PlanNode::Threshold {
798                        input: Box::new(plan),
799                        threshold_plan,
800                    }
801                    .as_plan(lir_id),
802                    output_keys,
803                )
804            }
805            MirRelationExpr::Union { base, inputs } => {
806                let arity = base.arity();
807                let mut plans_keys = Vec::with_capacity(1 + inputs.len());
808                let (plan, keys) = self.lower_mir_expr(base)?;
809                plans_keys.push((plan, keys));
810                for input in inputs.iter() {
811                    let (plan, keys) = self.lower_mir_expr(input)?;
812                    plans_keys.push((plan, keys));
813                }
814                let plans = plans_keys
815                    .into_iter()
816                    .map(|(plan, keys)| {
817                        // We don't have an MFP here -- install an operator to permute the
818                        // input, if necessary.
819                        if !keys.raw {
820                            self.arrange_by(plan, AvailableCollections::new_raw(), &keys, arity)
821                        } else {
822                            plan
823                        }
824                    })
825                    .collect();
826                // Return the plan and no arrangements.
827                let lir_id = self.allocate_lir_id();
828                (
829                    PlanNode::Union {
830                        inputs: plans,
831                        consolidate_output: false,
832                    }
833                    .as_plan(lir_id),
834                    AvailableCollections::new_raw(),
835                )
836            }
837            MirRelationExpr::ArrangeBy { input, keys } => {
838                let input_mir = input;
839                let (input, mut input_keys) = self.lower_mir_expr(input)?;
840                // Fill the `types` in `input_keys` if not already present.
841                let arity = input_mir.arity();
842
843                // Determine keys that are not present in `input_keys`.
844                let new_keys = keys
845                    .iter()
846                    .filter(|k1| !input_keys.arranged.iter().any(|(k2, _, _)| k1 == &k2))
847                    .cloned()
848                    .collect::<Vec<_>>();
849                if new_keys.is_empty() {
850                    (input, input_keys)
851                } else {
852                    let mut new_keys = new_keys
853                        .iter()
854                        .cloned()
855                        .map(|k| {
856                            let (permutation, thinning) = permutation_for_arrangement(&k, arity);
857                            (k, permutation, thinning)
858                        })
859                        .collect::<Vec<_>>();
860                    let forms = AvailableCollections {
861                        raw: input_keys.raw,
862                        arranged: new_keys.clone(),
863                    };
864                    let (input_key, input_mfp) = if let Some((input_key, permutation, thinning)) =
865                        input_keys.arbitrary_arrangement()
866                    {
867                        let mut mfp = MapFilterProject::new(arity);
868                        mfp.permute_fn(|c| permutation[c], thinning.len() + input_key.len());
869                        (Some(input_key.clone()), mfp)
870                    } else {
871                        (None, MapFilterProject::new(arity))
872                    };
873                    input_keys.arranged.append(&mut new_keys);
874                    input_keys.arranged.sort_by(|k1, k2| k1.0.cmp(&k2.0));
875
876                    // Return the plan and extended keys.
877                    let lir_id = self.allocate_lir_id();
878                    (
879                        PlanNode::ArrangeBy {
880                            input_key,
881                            input: Box::new(input),
882                            input_mfp,
883                            forms,
884                        }
885                        .as_plan(lir_id),
886                        input_keys,
887                    )
888                }
889            }
890        };
891
892        // If the plan stage did not absorb all linear operators, introduce a new stage to implement them.
893        if !mfp.is_identity() {
894            // Seek out an arrangement key that might be constrained to a literal.
895            // TODO: Improve key selection heuristic.
896            let key_val = keys
897                .arranged
898                .iter()
899                .filter_map(|(key, permutation, thinning)| {
900                    let mut mfp = mfp.clone();
901                    mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
902                    mfp.literal_constraints(key)
903                        .map(|val| (key.clone(), permutation, thinning, val))
904                })
905                .max_by_key(|(key, _, _, _)| key.len());
906
907            // Input key selection strategy:
908            // (1) If we can read a key at a particular value, do so
909            // (2) Otherwise, if there is a key that causes the MFP to be the identity, and
910            // therefore allows us to avoid discarding the arrangement, use that.
911            // (3) Otherwise, if there is _some_ key, use that,
912            // (4) Otherwise just read the raw collection.
913            let input_key_val = if let Some((key, permutation, thinning, val)) = key_val {
914                mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
915
916                Some((key, Some(val)))
917            } else if let Some((key, permutation, thinning)) =
918                keys.arranged.iter().find(|(key, permutation, thinning)| {
919                    let mut mfp = mfp.clone();
920                    mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
921                    mfp.is_identity()
922                })
923            {
924                mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
925                Some((key.clone(), None))
926            } else if let Some((key, permutation, thinning)) = keys.arbitrary_arrangement() {
927                mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
928                Some((key.clone(), None))
929            } else {
930                None
931            };
932
933            if mfp.is_identity() {
934                // We have discovered a key
935                // whose permutation causes the MFP to actually
936                // be the identity! We can keep it around,
937                // but without its permutation this time,
938                // and with a trivial thinning of the right length.
939                let (key, val) = input_key_val.unwrap();
940                let (_old_key, old_permutation, old_thinning) = keys
941                    .arranged
942                    .iter_mut()
943                    .find(|(key2, _, _)| key2 == &key)
944                    .unwrap();
945                *old_permutation = (0..mfp.input_arity).collect();
946                let old_thinned_arity = old_thinning.len();
947                *old_thinning = (0..old_thinned_arity).collect();
948                // Get rid of all other forms, as this is now the only one known to be valid.
949                // TODO[btv] we can probably save the other arrangements too, if we adjust their permutations.
950                // This is not hard to do, but leaving it for a quick follow-up to avoid making the present diff too unwieldy.
951                keys.arranged.retain(|(key2, _, _)| key2 == &key);
952                keys.raw = false;
953
954                // Creating a Plan::Mfp node is now logically unnecessary, but we
955                // should do so anyway when `val` is populated, so that
956                // the `key_val` optimization gets applied.
957                let lir_id = self.allocate_lir_id();
958                if val.is_some() {
959                    plan = PlanNode::Mfp {
960                        input: Box::new(plan),
961                        mfp,
962                        input_key_val: Some((key, val)),
963                    }
964                    .as_plan(lir_id)
965                }
966            } else {
967                let lir_id = self.allocate_lir_id();
968                plan = PlanNode::Mfp {
969                    input: Box::new(plan),
970                    mfp,
971                    input_key_val,
972                }
973                .as_plan(lir_id);
974                keys = AvailableCollections::new_raw();
975            }
976        }
977
978        Ok((plan, keys))
979    }
980
981    /// Lowers a `Reduce` with the given fields and an `mfp_on_top`, which is the MFP that is
982    /// originally on top of the `Reduce`. This MFP, or a part of it, might be fused into the
983    /// `Reduce`, in which case `mfp_on_top` is mutated to be the residual MFP, i.e., what was not
984    /// fused.
985    fn lower_reduce<T: Timestamp>(
986        &mut self,
987        input: &MirRelationExpr,
988        group_key: &Vec<MirScalarExpr>,
989        aggregates: &Vec<AggregateExpr>,
990        monotonic: &bool,
991        expected_group_size: &Option<u64>,
992        mfp_on_top: &mut MapFilterProject,
993        fused_unnest_list: bool,
994    ) -> Result<(Plan<T>, AvailableCollections), String> {
995        let input_arity = input.arity();
996        let (input, keys) = self.lower_mir_expr(input)?;
997        let (input_key, permutation_and_new_arity) =
998            if let Some((input_key, permutation, thinning)) = keys.arbitrary_arrangement() {
999                (
1000                    Some(input_key.clone()),
1001                    Some((permutation.clone(), thinning.len() + input_key.len())),
1002                )
1003            } else {
1004                (None, None)
1005            };
1006        let key_val_plan = KeyValPlan::new(
1007            input_arity,
1008            group_key,
1009            aggregates,
1010            permutation_and_new_arity,
1011        );
1012        let reduce_plan = ReducePlan::create_from(
1013            aggregates.clone(),
1014            *monotonic,
1015            *expected_group_size,
1016            fused_unnest_list,
1017        );
1018        // Return the plan, and the keys it produces.
1019        let mfp_after;
1020        let output_arity;
1021        if self.enable_reduce_mfp_fusion {
1022            (mfp_after, *mfp_on_top, output_arity) =
1023                reduce_plan.extract_mfp_after(mfp_on_top.clone(), group_key.len());
1024        } else {
1025            (mfp_after, output_arity) = (
1026                MapFilterProject::new(mfp_on_top.input_arity),
1027                group_key.len() + aggregates.len(),
1028            );
1029        }
1030        soft_assert_eq_or_log!(
1031            mfp_on_top.input_arity,
1032            output_arity,
1033            "Output arity of reduce must match input arity for MFP on top of it"
1034        );
1035        let output_keys = reduce_plan.keys(group_key.len(), output_arity);
1036        let lir_id = self.allocate_lir_id();
1037        Ok((
1038            PlanNode::Reduce {
1039                input_key,
1040                input: Box::new(input),
1041                key_val_plan,
1042                plan: reduce_plan,
1043                mfp_after,
1044            }
1045            .as_plan(lir_id),
1046            output_keys,
1047        ))
1048    }
1049
1050    /// Replace the plan with another one
1051    /// that has the collection in some additional forms.
1052    pub fn arrange_by<T>(
1053        &mut self,
1054        plan: Plan<T>,
1055        collections: AvailableCollections,
1056        old_collections: &AvailableCollections,
1057        arity: usize,
1058    ) -> Plan<T> {
1059        if let Plan {
1060            node:
1061                PlanNode::ArrangeBy {
1062                    input_key,
1063                    input,
1064                    input_mfp,
1065                    mut forms,
1066                },
1067            lir_id,
1068        } = plan
1069        {
1070            forms.raw |= collections.raw;
1071            forms.arranged.extend(collections.arranged);
1072            forms.arranged.sort_by(|k1, k2| k1.0.cmp(&k2.0));
1073            forms.arranged.dedup_by(|k1, k2| k1.0 == k2.0);
1074            PlanNode::ArrangeBy {
1075                input_key,
1076                input,
1077                input_mfp,
1078                forms,
1079            }
1080            .as_plan(lir_id)
1081        } else {
1082            let (input_key, input_mfp) = if let Some((input_key, permutation, thinning)) =
1083                old_collections.arbitrary_arrangement()
1084            {
1085                let mut mfp = MapFilterProject::new(arity);
1086                mfp.permute_fn(|c| permutation[c], thinning.len() + input_key.len());
1087                (Some(input_key.clone()), mfp)
1088            } else {
1089                (None, MapFilterProject::new(arity))
1090            };
1091            let lir_id = self.allocate_lir_id();
1092
1093            PlanNode::ArrangeBy {
1094                input_key,
1095                input: Box::new(plan),
1096                input_mfp,
1097                forms: collections,
1098            }
1099            .as_plan(lir_id)
1100        }
1101    }
1102}
1103
1104/// Various bits of state to print along with error messages during LIR planning,
1105/// to aid debugging.
1106#[derive(Clone, Debug)]
1107pub struct LirDebugInfo {
1108    debug_name: String,
1109    id: GlobalId,
1110}
1111
1112impl std::fmt::Display for LirDebugInfo {
1113    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1114        write!(f, "Debug name: {}; id: {}", self.debug_name, self.id)
1115    }
1116}