Skip to main content

mz_compute_types/plan/
lowering.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Lowering [`DataflowDescription`]s from MIR ([`MirRelationExpr`]) to LIR ([`Plan`]).
11
12use std::collections::BTreeMap;
13
14use columnar::Len;
15use itertools::Itertools;
16use mz_expr::JoinImplementation::{DeltaQuery, Differential, IndexedFilter, Unimplemented};
17use mz_expr::{
18    AggregateExpr, Id, JoinInputMapper, MapFilterProject, MirRelationExpr, MirScalarExpr,
19    OptimizedMirRelationExpr, TableFunc, permutation_for_arrangement,
20};
21use mz_ore::{assert_none, soft_assert_eq_or_log, soft_panic_or_log};
22use mz_repr::optimize::OptimizerFeatures;
23use mz_repr::{GlobalId, Timestamp};
24
25use crate::dataflows::{BuildDesc, DataflowDescription, IndexImport};
26use crate::plan::join::{DeltaJoinPlan, JoinPlan, LinearJoinPlan};
27use crate::plan::reduce::{KeyValPlan, ReducePlan};
28use crate::plan::threshold::ThresholdPlan;
29use crate::plan::top_k::TopKPlan;
30use crate::plan::{AvailableCollections, GetPlan, LirId, Plan, PlanNode};
31
32pub(super) struct Context {
33    /// Known bindings to (possibly arranged) collections.
34    arrangements: BTreeMap<Id, AvailableCollections>,
35    /// Tracks the next available `LirId`.
36    next_lir_id: LirId,
37    /// Information to print along with error messages.
38    debug_info: LirDebugInfo,
39    /// Whether to enable fusion of MFPs in reductions.
40    enable_reduce_mfp_fusion: bool,
41}
42
43impl Context {
44    pub fn new(debug_name: String, features: &OptimizerFeatures) -> Self {
45        Self {
46            arrangements: Default::default(),
47            next_lir_id: LirId(1),
48            debug_info: LirDebugInfo {
49                debug_name,
50                id: GlobalId::Transient(0),
51            },
52            enable_reduce_mfp_fusion: features.enable_reduce_mfp_fusion,
53        }
54    }
55
56    fn allocate_lir_id(&mut self) -> LirId {
57        let id = self.next_lir_id;
58        self.next_lir_id = LirId(
59            self.next_lir_id
60                .0
61                .checked_add(1)
62                .expect("No LirId overflow"),
63        );
64        id
65    }
66
67    pub fn lower(
68        mut self,
69        desc: DataflowDescription<OptimizedMirRelationExpr>,
70    ) -> Result<DataflowDescription<Plan>, String> {
71        // Sources might provide arranged forms of their data, in the future.
72        // Indexes provide arranged forms of their data.
73        for IndexImport {
74            desc: index_desc,
75            typ,
76            ..
77        } in desc.index_imports.values()
78        {
79            let key = index_desc.key.clone();
80            // TODO[btv] - We should be told the permutation by
81            // `index_desc`, and it should have been generated
82            // at the same point the thinning logic was.
83            //
84            // We should for sure do that soon, but it requires
85            // a bit of a refactor, so for now we just
86            // _assume_ that they were both generated by `permutation_for_arrangement`,
87            // and recover it here.
88            let (permutation, thinning) = permutation_for_arrangement(&key, typ.arity());
89            let index_keys = self
90                .arrangements
91                .entry(Id::Global(index_desc.on_id))
92                .or_insert_with(AvailableCollections::default);
93            index_keys.arranged.push((key, permutation, thinning));
94        }
95        for id in desc.source_imports.keys() {
96            self.arrangements
97                .entry(Id::Global(*id))
98                .or_insert_with(AvailableCollections::new_raw);
99        }
100
101        // Build each object in order, registering the arrangements it forms.
102        let mut objects_to_build = Vec::with_capacity(desc.objects_to_build.len());
103        for build in desc.objects_to_build {
104            self.debug_info.id = build.id;
105            let (plan, keys) = self.lower_mir_expr(&build.plan)?;
106
107            self.arrangements.insert(Id::Global(build.id), keys);
108            objects_to_build.push(BuildDesc { id: build.id, plan });
109        }
110
111        Ok(DataflowDescription {
112            source_imports: desc.source_imports,
113            index_imports: desc.index_imports,
114            objects_to_build,
115            index_exports: desc.index_exports,
116            sink_exports: desc.sink_exports,
117            as_of: desc.as_of,
118            until: desc.until,
119            initial_storage_as_of: desc.initial_storage_as_of,
120            refresh_schedule: desc.refresh_schedule,
121            debug_name: desc.debug_name,
122            time_dependence: desc.time_dependence,
123        })
124    }
125
126    /// This method converts a MirRelationExpr into a plan that can be directly rendered.
127    ///
128    /// The rough structure is that we repeatedly extract map/filter/project operators
129    /// from each expression we see, bundle them up as a `MapFilterProject` object, and
130    /// then produce a plan for the combination of that with the next operator.
131    ///
132    /// The method accesses `self.arrangements`, which it will locally add to and remove from for
133    /// `Let` bindings (by the end of the call it should contain the same bindings as when it
134    /// started).
135    ///
136    /// The result of the method is both a `Plan`, but also a list of arrangements that
137    /// are certain to be produced, which can be relied on by the next steps in the plan.
138    /// Each of the arrangement keys is associated with an MFP that must be applied if that
139    /// arrangement is used, to back out the permutation associated with that arrangement.
140    ///
141    /// An empty list of arrangement keys indicates that only a `Collection` stream can
142    /// be assumed to exist.
143    fn lower_mir_expr(
144        &mut self,
145        expr: &MirRelationExpr,
146    ) -> Result<(Plan, AvailableCollections), String> {
147        // This function is recursive and can overflow its stack, so grow it if
148        // needed. The growth here is unbounded. Our general solution for this problem
149        // is to use [`ore::stack::RecursionGuard`] to additionally limit the stack
150        // depth. That however requires upstream error handling. This function is
151        // currently called by the Coordinator after calls to `catalog_transact`,
152        // and thus are not allowed to fail. Until that allows errors, we choose
153        // to allow the unbounded growth here. We are though somewhat protected by
154        // higher levels enforcing their own limits on stack depth (in the parser,
155        // transformer/desugarer, and planner).
156        mz_ore::stack::maybe_grow(|| self.lower_mir_expr_stack_safe(expr))
157    }
158
159    fn lower_mir_expr_stack_safe(
160        &mut self,
161        expr: &MirRelationExpr,
162    ) -> Result<(Plan, AvailableCollections), String> {
163        // Extract a maximally large MapFilterProject from `expr`.
164        // We will then try and push this in to the resulting expression.
165        //
166        // Importantly, `mfp` may contain temporal operators and not be a "safe" MFP.
167        // While we would eventually like all plan stages to be able to absorb such
168        // general operators, not all of them can.
169        let (mut mfp, expr) = MapFilterProject::extract_from_expression(expr);
170        // We attempt to plan what we have remaining, in the context of `mfp`.
171        // We may not be able to do this, and must wrap some operators with a `Mfp` stage.
172        let (mut plan, mut keys) = match expr {
173            // These operators should have been extracted from the expression.
174            MirRelationExpr::Map { .. } => {
175                panic!("This operator should have been extracted");
176            }
177            MirRelationExpr::Filter { .. } => {
178                panic!("This operator should have been extracted");
179            }
180            MirRelationExpr::Project { .. } => {
181                panic!("This operator should have been extracted");
182            }
183            // These operators may not have been extracted, and need to result in a `Plan`.
184            MirRelationExpr::Constant { rows, typ: _ } => {
185                let lir_id = self.allocate_lir_id();
186                let node = PlanNode::Constant {
187                    rows: rows.clone().map(|rows| {
188                        rows.into_iter()
189                            .map(|(row, diff)| (row, Timestamp::MIN, diff))
190                            .collect()
191                    }),
192                };
193                // The plan, not arranged in any way.
194                (node.as_plan(lir_id), AvailableCollections::new_raw())
195            }
196            MirRelationExpr::Get { id, typ: _, .. } => {
197                // This stage can absorb arbitrary MFP operators.
198                let mut mfp = mfp.take();
199                // If `mfp` is the identity, we can surface all imported arrangements.
200                // Otherwise, we apply `mfp` and promise no arrangements.
201                let mut in_keys = self
202                    .arrangements
203                    .get(id)
204                    .cloned()
205                    .unwrap_or_else(AvailableCollections::new_raw);
206
207                // Seek out an arrangement key that might be constrained to a literal.
208                // Note: this code has very little use nowadays, as its job was mostly taken over
209                // by `LiteralConstraints` (see in the below longer comment).
210                let key_val = in_keys
211                    .arranged
212                    .iter()
213                    .filter_map(|key| {
214                        mfp.literal_constraints(&key.0)
215                            .map(|val| (key.clone(), val))
216                    })
217                    .max_by_key(|(key, _val)| key.0.len());
218
219                // Determine the plan of action for the `Get` stage.
220                let plan = if let Some(((key, permutation, thinning), val)) = &key_val {
221                    // This code path used to handle looking up literals from indexes, but it's
222                    // mostly deprecated, as this is nowadays performed by the `LiteralConstraints`
223                    // MIR transform instead. However, it's still called in a couple of tricky
224                    // special cases:
225                    // - `LiteralConstraints` handles only Gets of global ids, so this code still
226                    //   gets to handle Filters on top of Gets of local ids.
227                    // - Lowering does a `MapFilterProject::extract_from_expression`, while
228                    //   `LiteralConstraints` does
229                    //   `MapFilterProject::extract_non_errors_from_expr_mut`.
230                    // - It might happen that new literal constraint optimization opportunities
231                    //   appear somewhere near the end of the MIR optimizer after
232                    //   `LiteralConstraints` has already run.
233                    // (Also note that a similar literal constraint handling machinery is also
234                    // present when handling the leftover MFP after this big match.)
235                    mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
236                    in_keys.arranged = vec![(key.clone(), permutation.clone(), thinning.clone())];
237                    GetPlan::Arrangement(key.clone(), Some(val.clone()), mfp)
238                } else if !mfp.is_identity() {
239                    // We need to ensure a collection exists, which means we must form it.
240                    if let Some((key, permutation, thinning)) =
241                        in_keys.arbitrary_arrangement().cloned()
242                    {
243                        mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
244                        in_keys.arranged = vec![(key.clone(), permutation, thinning)];
245                        GetPlan::Arrangement(key, None, mfp)
246                    } else {
247                        GetPlan::Collection(mfp)
248                    }
249                } else {
250                    // By default, just pass input arrangements through.
251                    GetPlan::PassArrangements
252                };
253
254                let out_keys = if let GetPlan::PassArrangements = plan {
255                    in_keys.clone()
256                } else {
257                    AvailableCollections::new_raw()
258                };
259
260                let lir_id = self.allocate_lir_id();
261                let node = PlanNode::Get {
262                    id: id.clone(),
263                    keys: in_keys,
264                    plan,
265                };
266                // Return the plan, and any keys if an identity `mfp`.
267                (node.as_plan(lir_id), out_keys)
268            }
269            MirRelationExpr::Let { id, value, body } => {
270                // It would be unfortunate to have a non-trivial `mfp` here, as we hope
271                // that they would be pushed down. I am not sure if we should take the
272                // initiative to push down the `mfp` ourselves.
273
274                // Plan the value using only the initial arrangements, but
275                // introduce any resulting arrangements bound to `id`.
276                let (value, v_keys) = self.lower_mir_expr(value)?;
277                let pre_existing = self.arrangements.insert(Id::Local(*id), v_keys);
278                assert_none!(pre_existing);
279                // Plan the body using initial and `value` arrangements,
280                // and then remove reference to the value arrangements.
281                let (body, b_keys) = self.lower_mir_expr(body)?;
282                self.arrangements.remove(&Id::Local(*id));
283                // Return the plan, and any `body` arrangements.
284                let lir_id = self.allocate_lir_id();
285                (
286                    PlanNode::Let {
287                        id: id.clone(),
288                        value: Box::new(value),
289                        body: Box::new(body),
290                    }
291                    .as_plan(lir_id),
292                    b_keys,
293                )
294            }
295            MirRelationExpr::LetRec {
296                ids,
297                values,
298                limits,
299                body,
300            } => {
301                assert_eq!(ids.len(), values.len());
302                assert_eq!(ids.len(), limits.len());
303                // Plan the values using only the available arrangements, but
304                // introduce any resulting arrangements bound to each `id`.
305                // Arrangements made available cannot be used by prior bindings,
306                // as we cannot circulate an arrangement through a `Variable` yet.
307                let mut lir_values = Vec::with_capacity(values.len());
308                for (id, value) in ids.iter().zip_eq(values) {
309                    let (mut lir_value, mut v_keys) = self.lower_mir_expr(value)?;
310                    // If `v_keys` does not contain an unarranged collection, we must form it.
311                    if !v_keys.raw {
312                        // Choose an "arbitrary" arrangement; TODO: prefer a specific one.
313                        let (input_key, permutation, thinning) =
314                            v_keys.arbitrary_arrangement().unwrap();
315                        let mut input_mfp = MapFilterProject::new(value.arity());
316                        input_mfp.permute_fn(|c| permutation[c], thinning.len() + input_key.len());
317                        let input_key = Some(input_key.clone());
318
319                        let forms = AvailableCollections::new_raw();
320
321                        // We just want to insert an `ArrangeBy` to form an unarranged collection,
322                        // but there is a complication: We shouldn't break the invariant (created by
323                        // `NormalizeLets`, and relied upon by the rendering) that there isn't
324                        // anything between two `LetRec`s. So if `lir_value` is itself a `LetRec`,
325                        // then we insert the `ArrangeBy` on the `body` of the inner `LetRec`,
326                        // instead of on top of the inner `LetRec`.
327                        lir_value = match lir_value {
328                            Plan {
329                                node:
330                                    PlanNode::LetRec {
331                                        ids,
332                                        values,
333                                        limits,
334                                        body,
335                                    },
336                                lir_id,
337                            } => {
338                                let inner_lir_id = self.allocate_lir_id();
339                                PlanNode::LetRec {
340                                    ids,
341                                    values,
342                                    limits,
343                                    body: Box::new(
344                                        PlanNode::ArrangeBy {
345                                            input_key,
346                                            input: body,
347                                            input_mfp,
348                                            forms,
349                                        }
350                                        .as_plan(inner_lir_id),
351                                    ),
352                                }
353                                .as_plan(lir_id)
354                            }
355                            lir_value => {
356                                let lir_id = self.allocate_lir_id();
357                                PlanNode::ArrangeBy {
358                                    input_key,
359                                    input: Box::new(lir_value),
360                                    input_mfp,
361                                    forms,
362                                }
363                                .as_plan(lir_id)
364                            }
365                        };
366                        v_keys.raw = true;
367                    }
368                    let pre_existing = self.arrangements.insert(Id::Local(*id), v_keys);
369                    assert_none!(pre_existing);
370                    lir_values.push(lir_value);
371                }
372                // As we exit the iterative scope, we must leave all arrangements behind,
373                // as they reference a timestamp coordinate that must be stripped off.
374                for id in ids.iter() {
375                    self.arrangements
376                        .insert(Id::Local(*id), AvailableCollections::new_raw());
377                }
378                // Plan the body using initial and `value` arrangements,
379                // and then remove reference to the value arrangements.
380                let (body, b_keys) = self.lower_mir_expr(body)?;
381                for id in ids.iter() {
382                    self.arrangements.remove(&Id::Local(*id));
383                }
384                // Return the plan, and any `body` arrangements.
385                let lir_id = self.allocate_lir_id();
386                (
387                    PlanNode::LetRec {
388                        ids: ids.clone(),
389                        values: lir_values,
390                        limits: limits.clone(),
391                        body: Box::new(body),
392                    }
393                    .as_plan(lir_id),
394                    b_keys,
395                )
396            }
397            MirRelationExpr::FlatMap {
398                input: flat_map_input,
399                func,
400                exprs,
401            } => {
402                // A `FlatMap UnnestList` that comes after the `Reduce` of a window function can be
403                // fused into the lowered `Reduce`.
404                //
405                // In theory, we could have implemented this also as an MIR transform. However, this
406                // is more of a physical optimization, which are sometimes unpleasant to make a part
407                // of the MIR pipeline. The specific problem here with putting this into the MIR
408                // pipeline would be that we'd need to modify MIR's semantics: MIR's Reduce
409                // currently always emits exactly 1 row per group, but the fused Reduce-FlatMap can
410                // emit multiple rows per group. Such semantic changes of MIR are very scary, since
411                // various parts of the optimizer assume that Reduce emits only 1 row per group, and
412                // it would be very hard to hunt down all these parts. (For example, key inference
413                // infers the group key as a unique key.)
414                let fused_with_reduce = 'fusion: {
415                    if !matches!(func, TableFunc::UnnestList { .. }) {
416                        break 'fusion None;
417                    }
418                    // We might have a Project of a single col between the FlatMap and the
419                    // Reduce. (It projects away the grouping keys of the Reduce, and keeps the
420                    // result of the window function.)
421                    let (maybe_reduce, num_grouping_keys) = if let MirRelationExpr::Project {
422                        input: project_input,
423                        outputs: projection,
424                    } = &**flat_map_input
425                    {
426                        // We want this to be a single column, because we'll want to deal with only
427                        // one aggregation in the `Reduce`. (The aggregation of a window function
428                        // always stands alone currently: we plan them separately from other
429                        // aggregations, and Reduces are never fused. When window functions are
430                        // fused with each other, they end up in one aggregation. When there are
431                        // multiple window functions in the same SELECT, but can't be fused, they
432                        // end up in different Reduces.)
433                        if let &[single_col] = &**projection {
434                            (project_input, single_col)
435                        } else {
436                            break 'fusion None;
437                        }
438                    } else {
439                        (flat_map_input, 0)
440                    };
441                    if let MirRelationExpr::Reduce {
442                        input,
443                        group_key,
444                        aggregates,
445                        monotonic,
446                        expected_group_size,
447                    } = &**maybe_reduce
448                    {
449                        if group_key.len() != num_grouping_keys
450                            || aggregates.len() != 1
451                            || !aggregates[0].func.can_fuse_with_unnest_list()
452                        {
453                            break 'fusion None;
454                        }
455                        // At the beginning, `non_fused_mfp_above_flat_map` will be the original MFP
456                        // above the FlatMap. Later, we'll mutate this to be the residual MFP that
457                        // didn't get fused into the `Reduce`.
458                        let non_fused_mfp_above_flat_map = &mut mfp;
459                        let reduce_output_arity = num_grouping_keys + 1;
460                        // We are fusing away the list that the FlatMap would have been unnesting,
461                        // so the column that had that list disappears, so we have to permute the
462                        // MFP above the FlatMap with this column disappearance.
463                        let tweaked_mfp = {
464                            let mut mfp = non_fused_mfp_above_flat_map.clone();
465                            if mfp.demand().contains(&0) {
466                                // I don't think this can happen currently that this MFP would
467                                // refer to the list column, because both the list column and the
468                                // MFP were constructed by the HIR-to-MIR lowering, so it's not just
469                                // some random MFP that we are seeing here. But anyhow, it's better
470                                // to check this here for robustness against future code changes.
471                                break 'fusion None;
472                            }
473                            let permutation: BTreeMap<_, _> =
474                                (1..mfp.input_arity).map(|col| (col, col - 1)).collect();
475                            mfp.permute_fn(|c| permutation[&c], mfp.input_arity - 1);
476                            mfp
477                        };
478                        // We now put together the project that was before the FlatMap, and the
479                        // tweaked version of the MFP that was after the FlatMap.
480                        // (Part of this MFP might be fused into the Reduce.)
481                        let mut project_and_tweaked_mfp = {
482                            let mut mfp = MapFilterProject::new(reduce_output_arity);
483                            mfp = mfp.project(vec![num_grouping_keys]);
484                            mfp = MapFilterProject::compose(mfp, tweaked_mfp);
485                            mfp
486                        };
487                        let fused = self.lower_reduce(
488                            input,
489                            group_key,
490                            aggregates,
491                            monotonic,
492                            expected_group_size,
493                            &mut project_and_tweaked_mfp,
494                            true,
495                        )?;
496                        // Update the residual MFP.
497                        *non_fused_mfp_above_flat_map = project_and_tweaked_mfp;
498                        Some(fused)
499                    } else {
500                        break 'fusion None;
501                    }
502                };
503                if let Some(fused_with_reduce) = fused_with_reduce {
504                    fused_with_reduce
505                } else {
506                    // Couldn't fuse it with a `Reduce`, so lower as a normal `FlatMap`.
507                    let (input, keys) = self.lower_mir_expr(flat_map_input)?;
508                    // This stage can absorb arbitrary MFP instances.
509                    let mfp = mfp.take();
510                    let mut exprs = exprs.clone();
511                    let input_key = if let Some((k, permutation, _)) = keys.arbitrary_arrangement()
512                    {
513                        // We don't permute the MFP here, because it runs _after_ the table function,
514                        // whose output is in a fixed order.
515                        //
516                        // We _do_, however, need to permute the `expr`s that provide input to the
517                        // `func`.
518                        let permutation = permutation.iter().cloned().enumerate().collect();
519                        for expr in &mut exprs {
520                            expr.permute_map(&permutation);
521                        }
522
523                        Some(k.clone())
524                    } else {
525                        None
526                    };
527
528                    let lir_id = self.allocate_lir_id();
529                    // Return the plan, and no arrangements.
530                    (
531                        PlanNode::FlatMap {
532                            input_key,
533                            input: Box::new(input),
534                            exprs: exprs.clone(),
535                            func: func.clone(),
536                            mfp_after: mfp,
537                        }
538                        .as_plan(lir_id),
539                        AvailableCollections::new_raw(),
540                    )
541                }
542            }
543            MirRelationExpr::Join {
544                inputs,
545                equivalences,
546                implementation,
547            } => {
548                // Plan each of the join inputs independently.
549                // The `plans` get surfaced upwards, and the `input_keys` should
550                // be used as part of join planning / to validate the existing
551                // plans / to aid in indexed seeding of update streams.
552                let mut plans = Vec::new();
553                let mut input_keys = Vec::new();
554                let mut input_arities = Vec::new();
555                for input in inputs.iter() {
556                    let (plan, keys) = self.lower_mir_expr(input)?;
557                    input_arities.push(input.arity());
558                    plans.push(plan);
559                    input_keys.push(keys);
560                }
561
562                let input_mapper =
563                    JoinInputMapper::new_from_input_arities(input_arities.iter().copied());
564
565                // Extract temporal predicates as joins cannot currently absorb them.
566                let (plan, missing) = match implementation {
567                    IndexedFilter(_coll_id, _idx_id, key, _val) => {
568                        // Start with the constant input. (This used to be important before database-issues#4016
569                        // was fixed.)
570                        let start: usize = 1;
571                        let order = vec![(0usize, key.clone(), None)];
572                        // All columns of the constant input will be part of the arrangement key.
573                        let source_arrangement = (
574                            (0..key.len())
575                                .map(MirScalarExpr::column)
576                                .collect::<Vec<_>>(),
577                            (0..key.len()).collect::<Vec<_>>(),
578                            Vec::<usize>::new(),
579                        );
580                        let (ljp, missing) = LinearJoinPlan::create_from(
581                            start,
582                            Some(&source_arrangement),
583                            equivalences,
584                            &order,
585                            input_mapper,
586                            &mut mfp,
587                            &input_keys,
588                        );
589                        (JoinPlan::Linear(ljp), missing)
590                    }
591                    Differential((start, start_arr, _start_characteristic), order) => {
592                        let source_arrangement = start_arr.as_ref().and_then(|key| {
593                            input_keys[*start]
594                                .arranged
595                                .iter()
596                                .find(|(k, _, _)| k == key)
597                                .clone()
598                        });
599                        let (ljp, missing) = LinearJoinPlan::create_from(
600                            *start,
601                            source_arrangement,
602                            equivalences,
603                            order,
604                            input_mapper,
605                            &mut mfp,
606                            &input_keys,
607                        );
608                        (JoinPlan::Linear(ljp), missing)
609                    }
610                    DeltaQuery(orders) => {
611                        let (djp, missing) = DeltaJoinPlan::create_from(
612                            equivalences,
613                            orders,
614                            input_mapper,
615                            &mut mfp,
616                            &input_keys,
617                        );
618                        (JoinPlan::Delta(djp), missing)
619                    }
620                    // Other plans are errors, and should be reported as such.
621                    Unimplemented => return Err("unimplemented join".to_string()),
622                };
623                // The renderer will expect certain arrangements to exist; if any of those are not available, the join planning functions above should have returned them in
624                // `missing`. We thus need to plan them here so they'll exist.
625                let is_delta = matches!(plan, JoinPlan::Delta(_));
626                for (((input_plan, input_keys), missing), arity) in plans
627                    .iter_mut()
628                    .zip_eq(input_keys.iter())
629                    .zip_eq(missing.into_iter())
630                    .zip_eq(input_arities.iter().cloned())
631                {
632                    if missing != Default::default() {
633                        if is_delta {
634                            // join_implementation.rs produced a sub-optimal plan here;
635                            // we shouldn't plan delta joins at all if not all of the required
636                            // arrangements are available. Soft panic in CI and log an error in
637                            // production to increase the chances that we will catch all situations
638                            // that violate this constraint.
639                            soft_panic_or_log!("Arrangements depended on by delta join alarmingly absent: {:?}
640Dataflow info: {}
641This is not expected to cause incorrect results, but could indicate a performance issue in Materialize.", missing, self.debug_info);
642                        } else {
643                            soft_panic_or_log!("Arrangements depended on by a non-delta join are absent: {:?}
644Dataflow info: {}
645This is not expected to cause incorrect results, but could indicate a performance issue in Materialize.", missing, self.debug_info);
646                            // Nowadays MIR transforms take care to insert MIR ArrangeBys for each
647                            // Join input. (Earlier, they were missing in the following cases:
648                            //  - They were const-folded away for constant inputs. This is not
649                            //    happening since
650                            //    https://github.com/MaterializeInc/materialize/pull/16351
651                            //  - They were not being inserted for the constant input of
652                            //    `IndexedFilter`s. This was fixed in
653                            //    https://github.com/MaterializeInc/materialize/pull/20920
654                            //  - They were not being inserted for the first input of Differential
655                            //    joins. This was fixed in
656                            //    https://github.com/MaterializeInc/materialize/pull/16099)
657                        }
658                        let lir_id = self.allocate_lir_id();
659                        let raw_plan = std::mem::replace(
660                            input_plan,
661                            PlanNode::Constant {
662                                rows: Ok(Vec::new()),
663                            }
664                            .as_plan(lir_id),
665                        );
666                        *input_plan = self.arrange_by(raw_plan, missing, input_keys, arity);
667                    }
668                }
669                // Return the plan, and no arrangements.
670                let lir_id = self.allocate_lir_id();
671                (
672                    PlanNode::Join {
673                        inputs: plans,
674                        plan,
675                    }
676                    .as_plan(lir_id),
677                    AvailableCollections::new_raw(),
678                )
679            }
680            MirRelationExpr::Reduce {
681                input,
682                group_key,
683                aggregates,
684                monotonic,
685                expected_group_size,
686            } => {
687                if aggregates
688                    .iter()
689                    .any(|agg| agg.func.can_fuse_with_unnest_list())
690                {
691                    // This case should have been handled at the `MirRelationExpr::FlatMap` case
692                    // above. But that has a pretty complicated pattern matching, so it's not
693                    // unthinkable that it fails.
694                    soft_panic_or_log!(
695                        "Window function performance issue: `reduce_unnest_list_fusion` failed"
696                    );
697                }
698                self.lower_reduce(
699                    input,
700                    group_key,
701                    aggregates,
702                    monotonic,
703                    expected_group_size,
704                    &mut mfp,
705                    false,
706                )?
707            }
708            MirRelationExpr::TopK {
709                input,
710                group_key,
711                order_key,
712                limit,
713                offset,
714                monotonic,
715                expected_group_size,
716            } => {
717                let arity = input.arity();
718                let (input, keys) = self.lower_mir_expr(input)?;
719
720                let top_k_plan = TopKPlan::create_from(
721                    group_key.clone(),
722                    order_key.clone(),
723                    *offset,
724                    limit.clone(),
725                    arity,
726                    *monotonic,
727                    *expected_group_size,
728                );
729
730                // We don't have an MFP here -- install an operator to permute the
731                // input, if necessary.
732                let input = if !keys.raw {
733                    self.arrange_by(input, AvailableCollections::new_raw(), &keys, arity)
734                } else {
735                    input
736                };
737                // Return the plan, and no arrangements.
738                let lir_id = self.allocate_lir_id();
739                (
740                    PlanNode::TopK {
741                        input: Box::new(input),
742                        top_k_plan,
743                    }
744                    .as_plan(lir_id),
745                    AvailableCollections::new_raw(),
746                )
747            }
748            MirRelationExpr::Negate { input } => {
749                let arity = input.arity();
750                let (input, keys) = self.lower_mir_expr(input)?;
751
752                // We don't have an MFP here -- install an operator to permute the
753                // input, if necessary.
754                let input = if !keys.raw {
755                    self.arrange_by(input, AvailableCollections::new_raw(), &keys, arity)
756                } else {
757                    input
758                };
759                // Return the plan, and no arrangements.
760                let lir_id = self.allocate_lir_id();
761                (
762                    PlanNode::Negate {
763                        input: Box::new(input),
764                    }
765                    .as_plan(lir_id),
766                    AvailableCollections::new_raw(),
767                )
768            }
769            MirRelationExpr::Threshold { input } => {
770                let (plan, keys) = self.lower_mir_expr(input)?;
771                let arity = input.arity();
772                let (threshold_plan, required_arrangement) = ThresholdPlan::create_from(arity);
773                let plan = if !keys
774                    .arranged
775                    .iter()
776                    .any(|(key, _, _)| key == &required_arrangement.0)
777                {
778                    self.arrange_by(
779                        plan,
780                        AvailableCollections::new_arranged(vec![required_arrangement]),
781                        &keys,
782                        arity,
783                    )
784                } else {
785                    plan
786                };
787
788                let output_keys = threshold_plan.keys();
789                // Return the plan, and any produced keys.
790                let lir_id = self.allocate_lir_id();
791                (
792                    PlanNode::Threshold {
793                        input: Box::new(plan),
794                        threshold_plan,
795                    }
796                    .as_plan(lir_id),
797                    output_keys,
798                )
799            }
800            MirRelationExpr::Union { base, inputs } => {
801                let arity = base.arity();
802                let mut plans_keys = Vec::with_capacity(1 + inputs.len());
803                let (plan, keys) = self.lower_mir_expr(base)?;
804                plans_keys.push((plan, keys));
805                for input in inputs.iter() {
806                    let (plan, keys) = self.lower_mir_expr(input)?;
807                    plans_keys.push((plan, keys));
808                }
809                let plans = plans_keys
810                    .into_iter()
811                    .map(|(plan, keys)| {
812                        // We don't have an MFP here -- install an operator to permute the
813                        // input, if necessary.
814                        if !keys.raw {
815                            self.arrange_by(plan, AvailableCollections::new_raw(), &keys, arity)
816                        } else {
817                            plan
818                        }
819                    })
820                    .collect();
821                // Return the plan and no arrangements.
822                let lir_id = self.allocate_lir_id();
823                (
824                    PlanNode::Union {
825                        inputs: plans,
826                        consolidate_output: false,
827                    }
828                    .as_plan(lir_id),
829                    AvailableCollections::new_raw(),
830                )
831            }
832            MirRelationExpr::ArrangeBy { input, keys } => {
833                let input_mir = input;
834                let (input, mut input_keys) = self.lower_mir_expr(input)?;
835                // Fill the `types` in `input_keys` if not already present.
836                let arity = input_mir.arity();
837
838                // Determine keys that are not present in `input_keys`.
839                let new_keys = keys
840                    .iter()
841                    .filter(|k1| !input_keys.arranged.iter().any(|(k2, _, _)| k1 == &k2))
842                    .cloned()
843                    .collect::<Vec<_>>();
844                if new_keys.is_empty() {
845                    (input, input_keys)
846                } else {
847                    let mut new_keys = new_keys
848                        .iter()
849                        .cloned()
850                        .map(|k| {
851                            let (permutation, thinning) = permutation_for_arrangement(&k, arity);
852                            (k, permutation, thinning)
853                        })
854                        .collect::<Vec<_>>();
855                    let forms = AvailableCollections {
856                        raw: input_keys.raw,
857                        arranged: new_keys.clone(),
858                    };
859                    let (input_key, input_mfp) = if let Some((input_key, permutation, thinning)) =
860                        input_keys.arbitrary_arrangement()
861                    {
862                        let mut mfp = MapFilterProject::new(arity);
863                        mfp.permute_fn(|c| permutation[c], thinning.len() + input_key.len());
864                        (Some(input_key.clone()), mfp)
865                    } else {
866                        (None, MapFilterProject::new(arity))
867                    };
868                    input_keys.arranged.append(&mut new_keys);
869                    input_keys.arranged.sort_by(|k1, k2| k1.0.cmp(&k2.0));
870
871                    // Return the plan and extended keys.
872                    let lir_id = self.allocate_lir_id();
873                    (
874                        PlanNode::ArrangeBy {
875                            input_key,
876                            input: Box::new(input),
877                            input_mfp,
878                            forms,
879                        }
880                        .as_plan(lir_id),
881                        input_keys,
882                    )
883                }
884            }
885        };
886
887        // If the plan stage did not absorb all linear operators, introduce a new stage to implement them.
888        if !mfp.is_identity() {
889            // Seek out an arrangement key that might be constrained to a literal.
890            // TODO: Improve key selection heuristic.
891            let key_val = keys
892                .arranged
893                .iter()
894                .filter_map(|(key, permutation, thinning)| {
895                    let mut mfp = mfp.clone();
896                    mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
897                    mfp.literal_constraints(key)
898                        .map(|val| (key.clone(), permutation, thinning, val))
899                })
900                .max_by_key(|(key, _, _, _)| key.len());
901
902            // Input key selection strategy:
903            // (1) If we can read a key at a particular value, do so
904            // (2) Otherwise, if there is a key that causes the MFP to be the identity, and
905            // therefore allows us to avoid discarding the arrangement, use that.
906            // (3) Otherwise, if there is _some_ key, use that,
907            // (4) Otherwise just read the raw collection.
908            let input_key_val = if let Some((key, permutation, thinning, val)) = key_val {
909                mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
910
911                Some((key, Some(val)))
912            } else if let Some((key, permutation, thinning)) =
913                keys.arranged.iter().find(|(key, permutation, thinning)| {
914                    let mut mfp = mfp.clone();
915                    mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
916                    mfp.is_identity()
917                })
918            {
919                mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
920                Some((key.clone(), None))
921            } else if let Some((key, permutation, thinning)) = keys.arbitrary_arrangement() {
922                mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
923                Some((key.clone(), None))
924            } else {
925                None
926            };
927
928            if mfp.is_identity() {
929                // We have discovered a key
930                // whose permutation causes the MFP to actually
931                // be the identity! We can keep it around,
932                // but without its permutation this time,
933                // and with a trivial thinning of the right length.
934                let (key, val) = input_key_val.unwrap();
935                let (_old_key, old_permutation, old_thinning) = keys
936                    .arranged
937                    .iter_mut()
938                    .find(|(key2, _, _)| key2 == &key)
939                    .unwrap();
940                *old_permutation = (0..mfp.input_arity).collect();
941                let old_thinned_arity = old_thinning.len();
942                *old_thinning = (0..old_thinned_arity).collect();
943                // Get rid of all other forms, as this is now the only one known to be valid.
944                // TODO[btv] we can probably save the other arrangements too, if we adjust their permutations.
945                // This is not hard to do, but leaving it for a quick follow-up to avoid making the present diff too unwieldy.
946                keys.arranged.retain(|(key2, _, _)| key2 == &key);
947                keys.raw = false;
948
949                // Creating a Plan::Mfp node is now logically unnecessary, but we
950                // should do so anyway when `val` is populated, so that
951                // the `key_val` optimization gets applied.
952                let lir_id = self.allocate_lir_id();
953                if val.is_some() {
954                    plan = PlanNode::Mfp {
955                        input: Box::new(plan),
956                        mfp,
957                        input_key_val: Some((key, val)),
958                    }
959                    .as_plan(lir_id)
960                }
961            } else {
962                let lir_id = self.allocate_lir_id();
963                plan = PlanNode::Mfp {
964                    input: Box::new(plan),
965                    mfp,
966                    input_key_val,
967                }
968                .as_plan(lir_id);
969                keys = AvailableCollections::new_raw();
970            }
971        }
972
973        Ok((plan, keys))
974    }
975
976    /// Lowers a `Reduce` with the given fields and an `mfp_on_top`, which is the MFP that is
977    /// originally on top of the `Reduce`. This MFP, or a part of it, might be fused into the
978    /// `Reduce`, in which case `mfp_on_top` is mutated to be the residual MFP, i.e., what was not
979    /// fused.
980    fn lower_reduce(
981        &mut self,
982        input: &MirRelationExpr,
983        group_key: &Vec<MirScalarExpr>,
984        aggregates: &Vec<AggregateExpr>,
985        monotonic: &bool,
986        expected_group_size: &Option<u64>,
987        mfp_on_top: &mut MapFilterProject,
988        fused_unnest_list: bool,
989    ) -> Result<(Plan, AvailableCollections), String> {
990        let input_arity = input.arity();
991        let (input, keys) = self.lower_mir_expr(input)?;
992        let (input_key, permutation_and_new_arity) =
993            if let Some((input_key, permutation, thinning)) = keys.arbitrary_arrangement() {
994                (
995                    Some(input_key.clone()),
996                    Some((permutation.clone(), thinning.len() + input_key.len())),
997                )
998            } else {
999                (None, None)
1000            };
1001        let key_val_plan = KeyValPlan::new(
1002            input_arity,
1003            group_key,
1004            aggregates,
1005            permutation_and_new_arity,
1006        );
1007        let reduce_plan = ReducePlan::create_from(
1008            aggregates.clone(),
1009            *monotonic,
1010            *expected_group_size,
1011            fused_unnest_list,
1012        );
1013        // Return the plan, and the keys it produces.
1014        let mfp_after;
1015        let output_arity;
1016        if self.enable_reduce_mfp_fusion {
1017            (mfp_after, *mfp_on_top, output_arity) =
1018                reduce_plan.extract_mfp_after(mfp_on_top.clone(), group_key.len());
1019        } else {
1020            (mfp_after, output_arity) = (
1021                MapFilterProject::new(mfp_on_top.input_arity),
1022                group_key.len() + aggregates.len(),
1023            );
1024        }
1025        soft_assert_eq_or_log!(
1026            mfp_on_top.input_arity,
1027            output_arity,
1028            "Output arity of reduce must match input arity for MFP on top of it"
1029        );
1030        let output_keys = reduce_plan.keys(group_key.len(), output_arity);
1031        let lir_id = self.allocate_lir_id();
1032        Ok((
1033            PlanNode::Reduce {
1034                input_key,
1035                input: Box::new(input),
1036                key_val_plan,
1037                plan: reduce_plan,
1038                mfp_after,
1039            }
1040            .as_plan(lir_id),
1041            output_keys,
1042        ))
1043    }
1044
1045    /// Replace the plan with another one
1046    /// that has the collection in some additional forms.
1047    pub fn arrange_by(
1048        &mut self,
1049        plan: Plan,
1050        collections: AvailableCollections,
1051        old_collections: &AvailableCollections,
1052        arity: usize,
1053    ) -> Plan {
1054        if let Plan {
1055            node:
1056                PlanNode::ArrangeBy {
1057                    input_key,
1058                    input,
1059                    input_mfp,
1060                    mut forms,
1061                },
1062            lir_id,
1063        } = plan
1064        {
1065            forms.raw |= collections.raw;
1066            forms.arranged.extend(collections.arranged);
1067            forms.arranged.sort_by(|k1, k2| k1.0.cmp(&k2.0));
1068            forms.arranged.dedup_by(|k1, k2| k1.0 == k2.0);
1069            PlanNode::ArrangeBy {
1070                input_key,
1071                input,
1072                input_mfp,
1073                forms,
1074            }
1075            .as_plan(lir_id)
1076        } else {
1077            let (input_key, input_mfp) = if let Some((input_key, permutation, thinning)) =
1078                old_collections.arbitrary_arrangement()
1079            {
1080                let mut mfp = MapFilterProject::new(arity);
1081                mfp.permute_fn(|c| permutation[c], thinning.len() + input_key.len());
1082                (Some(input_key.clone()), mfp)
1083            } else {
1084                (None, MapFilterProject::new(arity))
1085            };
1086            let lir_id = self.allocate_lir_id();
1087
1088            PlanNode::ArrangeBy {
1089                input_key,
1090                input: Box::new(plan),
1091                input_mfp,
1092                forms: collections,
1093            }
1094            .as_plan(lir_id)
1095        }
1096    }
1097}
1098
1099/// Various bits of state to print along with error messages during LIR planning,
1100/// to aid debugging.
1101#[derive(Clone, Debug)]
1102pub struct LirDebugInfo {
1103    debug_name: String,
1104    id: GlobalId,
1105}
1106
1107impl std::fmt::Display for LirDebugInfo {
1108    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1109        write!(f, "Debug name: {}; id: {}", self.debug_name, self.id)
1110    }
1111}