mz_compute_types/plan/
lowering.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Lowering [`DataflowDescription`]s from MIR ([`MirRelationExpr`]) to LIR ([`Plan`]).
11
12use std::collections::BTreeMap;
13
14use mz_expr::JoinImplementation::{DeltaQuery, Differential, IndexedFilter, Unimplemented};
15use mz_expr::{
16    AggregateExpr, Id, JoinInputMapper, MapFilterProject, MirRelationExpr, MirScalarExpr,
17    OptimizedMirRelationExpr, TableFunc, permutation_for_arrangement,
18};
19use mz_ore::{assert_none, soft_assert_eq_or_log, soft_panic_or_log};
20use mz_repr::GlobalId;
21use mz_repr::optimize::OptimizerFeatures;
22use timely::progress::Timestamp;
23
24use crate::dataflows::{BuildDesc, DataflowDescription, IndexImport};
25use crate::plan::join::{DeltaJoinPlan, JoinPlan, LinearJoinPlan};
26use crate::plan::reduce::{KeyValPlan, ReducePlan};
27use crate::plan::threshold::ThresholdPlan;
28use crate::plan::top_k::TopKPlan;
29use crate::plan::{AvailableCollections, GetPlan, LirId, Plan, PlanNode};
30
31pub(super) struct Context {
32    /// Known bindings to (possibly arranged) collections.
33    arrangements: BTreeMap<Id, AvailableCollections>,
34    /// Tracks the next available `LirId`.
35    next_lir_id: LirId,
36    /// Information to print along with error messages.
37    debug_info: LirDebugInfo,
38    /// Whether to enable fusion of MFPs in reductions.
39    enable_reduce_mfp_fusion: bool,
40}
41
42impl Context {
43    pub fn new(debug_name: String, features: &OptimizerFeatures) -> Self {
44        Self {
45            arrangements: Default::default(),
46            next_lir_id: LirId(1),
47            debug_info: LirDebugInfo {
48                debug_name,
49                id: GlobalId::Transient(0),
50            },
51            enable_reduce_mfp_fusion: features.enable_reduce_mfp_fusion,
52        }
53    }
54
55    fn allocate_lir_id(&mut self) -> LirId {
56        let id = self.next_lir_id;
57        self.next_lir_id = LirId(
58            self.next_lir_id
59                .0
60                .checked_add(1)
61                .expect("No LirId overflow"),
62        );
63        id
64    }
65
66    pub fn lower<T: Timestamp>(
67        mut self,
68        desc: DataflowDescription<OptimizedMirRelationExpr>,
69    ) -> Result<DataflowDescription<Plan<T>>, String> {
70        // Sources might provide arranged forms of their data, in the future.
71        // Indexes provide arranged forms of their data.
72        for IndexImport {
73            desc: index_desc,
74            typ,
75            ..
76        } in desc.index_imports.values()
77        {
78            let key = index_desc.key.clone();
79            // TODO[btv] - We should be told the permutation by
80            // `index_desc`, and it should have been generated
81            // at the same point the thinning logic was.
82            //
83            // We should for sure do that soon, but it requires
84            // a bit of a refactor, so for now we just
85            // _assume_ that they were both generated by `permutation_for_arrangement`,
86            // and recover it here.
87            let (permutation, thinning) = permutation_for_arrangement(&key, typ.arity());
88            let index_keys = self
89                .arrangements
90                .entry(Id::Global(index_desc.on_id))
91                .or_insert_with(AvailableCollections::default);
92            index_keys.arranged.push((key, permutation, thinning));
93            index_keys.types = Some(typ.column_types.clone());
94        }
95        for id in desc.source_imports.keys() {
96            self.arrangements
97                .entry(Id::Global(*id))
98                .or_insert_with(AvailableCollections::new_raw);
99        }
100
101        // Build each object in order, registering the arrangements it forms.
102        let mut objects_to_build = Vec::with_capacity(desc.objects_to_build.len());
103        for build in desc.objects_to_build {
104            self.debug_info.id = build.id;
105            let (plan, keys) = self.lower_mir_expr(&build.plan)?;
106
107            self.arrangements.insert(Id::Global(build.id), keys);
108            objects_to_build.push(BuildDesc { id: build.id, plan });
109        }
110
111        Ok(DataflowDescription {
112            source_imports: desc.source_imports,
113            index_imports: desc.index_imports,
114            objects_to_build,
115            index_exports: desc.index_exports,
116            sink_exports: desc.sink_exports,
117            as_of: desc.as_of,
118            until: desc.until,
119            initial_storage_as_of: desc.initial_storage_as_of,
120            refresh_schedule: desc.refresh_schedule,
121            debug_name: desc.debug_name,
122            time_dependence: desc.time_dependence,
123        })
124    }
125
126    /// This method converts a MirRelationExpr into a plan that can be directly rendered.
127    ///
128    /// The rough structure is that we repeatedly extract map/filter/project operators
129    /// from each expression we see, bundle them up as a `MapFilterProject` object, and
130    /// then produce a plan for the combination of that with the next operator.
131    ///
132    /// The method accesses `self.arrangements`, which it will locally add to and remove from for
133    /// `Let` bindings (by the end of the call it should contain the same bindings as when it
134    /// started).
135    ///
136    /// The result of the method is both a `Plan`, but also a list of arrangements that
137    /// are certain to be produced, which can be relied on by the next steps in the plan.
138    /// Each of the arrangement keys is associated with an MFP that must be applied if that
139    /// arrangement is used, to back out the permutation associated with that arrangement.
140    ///
141    /// An empty list of arrangement keys indicates that only a `Collection` stream can
142    /// be assumed to exist.
143    fn lower_mir_expr<T: Timestamp>(
144        &mut self,
145        expr: &MirRelationExpr,
146    ) -> Result<(Plan<T>, AvailableCollections), String> {
147        // This function is recursive and can overflow its stack, so grow it if
148        // needed. The growth here is unbounded. Our general solution for this problem
149        // is to use [`ore::stack::RecursionGuard`] to additionally limit the stack
150        // depth. That however requires upstream error handling. This function is
151        // currently called by the Coordinator after calls to `catalog_transact`,
152        // and thus are not allowed to fail. Until that allows errors, we choose
153        // to allow the unbounded growth here. We are though somewhat protected by
154        // higher levels enforcing their own limits on stack depth (in the parser,
155        // transformer/desugarer, and planner).
156        mz_ore::stack::maybe_grow(|| self.lower_mir_expr_stack_safe(expr))
157    }
158
159    fn lower_mir_expr_stack_safe<T>(
160        &mut self,
161        expr: &MirRelationExpr,
162    ) -> Result<(Plan<T>, AvailableCollections), String>
163    where
164        T: Timestamp,
165    {
166        // Extract a maximally large MapFilterProject from `expr`.
167        // We will then try and push this in to the resulting expression.
168        //
169        // Importantly, `mfp` may contain temporal operators and not be a "safe" MFP.
170        // While we would eventually like all plan stages to be able to absorb such
171        // general operators, not all of them can.
172        let (mut mfp, expr) = MapFilterProject::extract_from_expression(expr);
173        // We attempt to plan what we have remaining, in the context of `mfp`.
174        // We may not be able to do this, and must wrap some operators with a `Mfp` stage.
175        let (mut plan, mut keys) = match expr {
176            // These operators should have been extracted from the expression.
177            MirRelationExpr::Map { .. } => {
178                panic!("This operator should have been extracted");
179            }
180            MirRelationExpr::Filter { .. } => {
181                panic!("This operator should have been extracted");
182            }
183            MirRelationExpr::Project { .. } => {
184                panic!("This operator should have been extracted");
185            }
186            // These operators may not have been extracted, and need to result in a `Plan`.
187            MirRelationExpr::Constant { rows, typ: _ } => {
188                let lir_id = self.allocate_lir_id();
189                let node = PlanNode::Constant {
190                    rows: rows.clone().map(|rows| {
191                        rows.into_iter()
192                            .map(|(row, diff)| (row, T::minimum(), diff))
193                            .collect()
194                    }),
195                };
196                // The plan, not arranged in any way.
197                (node.as_plan(lir_id), AvailableCollections::new_raw())
198            }
199            MirRelationExpr::Get { id, typ: _, .. } => {
200                // This stage can absorb arbitrary MFP operators.
201                let mut mfp = mfp.take();
202                // If `mfp` is the identity, we can surface all imported arrangements.
203                // Otherwise, we apply `mfp` and promise no arrangements.
204                let mut in_keys = self
205                    .arrangements
206                    .get(id)
207                    .cloned()
208                    .unwrap_or_else(AvailableCollections::new_raw);
209
210                // Seek out an arrangement key that might be constrained to a literal.
211                // Note: this code has very little use nowadays, as its job was mostly taken over
212                // by `LiteralConstraints` (see in the below longer comment).
213                let key_val = in_keys
214                    .arranged
215                    .iter()
216                    .filter_map(|key| {
217                        mfp.literal_constraints(&key.0)
218                            .map(|val| (key.clone(), val))
219                    })
220                    .max_by_key(|(key, _val)| key.0.len());
221
222                // Determine the plan of action for the `Get` stage.
223                let plan = if let Some(((key, permutation, thinning), val)) = &key_val {
224                    // This code path used to handle looking up literals from indexes, but it's
225                    // mostly deprecated, as this is nowadays performed by the `LiteralConstraints`
226                    // MIR transform instead. However, it's still called in a couple of tricky
227                    // special cases:
228                    // - `LiteralConstraints` handles only Gets of global ids, so this code still
229                    //   gets to handle Filters on top of Gets of local ids.
230                    // - Lowering does a `MapFilterProject::extract_from_expression`, while
231                    //   `LiteralConstraints` does
232                    //   `MapFilterProject::extract_non_errors_from_expr_mut`.
233                    // - It might happen that new literal constraint optimization opportunities
234                    //   appear somewhere near the end of the MIR optimizer after
235                    //   `LiteralConstraints` has already run.
236                    // (Also note that a similar literal constraint handling machinery is also
237                    // present when handling the leftover MFP after this big match.)
238                    mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
239                    in_keys.arranged = vec![(key.clone(), permutation.clone(), thinning.clone())];
240                    GetPlan::Arrangement(key.clone(), Some(val.clone()), mfp)
241                } else if !mfp.is_identity() {
242                    // We need to ensure a collection exists, which means we must form it.
243                    if let Some((key, permutation, thinning)) =
244                        in_keys.arbitrary_arrangement().cloned()
245                    {
246                        mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
247                        in_keys.arranged = vec![(key.clone(), permutation, thinning)];
248                        GetPlan::Arrangement(key, None, mfp)
249                    } else {
250                        GetPlan::Collection(mfp)
251                    }
252                } else {
253                    // By default, just pass input arrangements through.
254                    GetPlan::PassArrangements
255                };
256
257                let out_keys = if let GetPlan::PassArrangements = plan {
258                    in_keys.clone()
259                } else {
260                    AvailableCollections::new_raw()
261                };
262
263                let lir_id = self.allocate_lir_id();
264                let node = PlanNode::Get {
265                    id: id.clone(),
266                    keys: in_keys,
267                    plan,
268                };
269                // Return the plan, and any keys if an identity `mfp`.
270                (node.as_plan(lir_id), out_keys)
271            }
272            MirRelationExpr::Let { id, value, body } => {
273                // It would be unfortunate to have a non-trivial `mfp` here, as we hope
274                // that they would be pushed down. I am not sure if we should take the
275                // initiative to push down the `mfp` ourselves.
276
277                // Plan the value using only the initial arrangements, but
278                // introduce any resulting arrangements bound to `id`.
279                let (value, v_keys) = self.lower_mir_expr(value)?;
280                let pre_existing = self.arrangements.insert(Id::Local(*id), v_keys);
281                assert_none!(pre_existing);
282                // Plan the body using initial and `value` arrangements,
283                // and then remove reference to the value arrangements.
284                let (body, b_keys) = self.lower_mir_expr(body)?;
285                self.arrangements.remove(&Id::Local(*id));
286                // Return the plan, and any `body` arrangements.
287                let lir_id = self.allocate_lir_id();
288                (
289                    PlanNode::Let {
290                        id: id.clone(),
291                        value: Box::new(value),
292                        body: Box::new(body),
293                    }
294                    .as_plan(lir_id),
295                    b_keys,
296                )
297            }
298            MirRelationExpr::LetRec {
299                ids,
300                values,
301                limits,
302                body,
303            } => {
304                assert_eq!(ids.len(), values.len());
305                assert_eq!(ids.len(), limits.len());
306                // Plan the values using only the available arrangements, but
307                // introduce any resulting arrangements bound to each `id`.
308                // Arrangements made available cannot be used by prior bindings,
309                // as we cannot circulate an arrangement through a `Variable` yet.
310                let mut lir_values = Vec::with_capacity(values.len());
311                for (id, value) in ids.iter().zip(values) {
312                    let (mut lir_value, mut v_keys) = self.lower_mir_expr(value)?;
313                    // If `v_keys` does not contain an unarranged collection, we must form it.
314                    if !v_keys.raw {
315                        // Choose an "arbitrary" arrangement; TODO: prefer a specific one.
316                        let (input_key, permutation, thinning) =
317                            v_keys.arbitrary_arrangement().unwrap();
318                        let mut input_mfp = MapFilterProject::new(value.arity());
319                        input_mfp.permute_fn(|c| permutation[c], thinning.len() + input_key.len());
320                        let input_key = Some(input_key.clone());
321
322                        let forms = AvailableCollections::new_raw();
323
324                        // We just want to insert an `ArrangeBy` to form an unarranged collection,
325                        // but there is a complication: We shouldn't break the invariant (created by
326                        // `NormalizeLets`, and relied upon by the rendering) that there isn't
327                        // anything between two `LetRec`s. So if `lir_value` is itself a `LetRec`,
328                        // then we insert the `ArrangeBy` on the `body` of the inner `LetRec`,
329                        // instead of on top of the inner `LetRec`.
330                        lir_value = match lir_value {
331                            Plan {
332                                node:
333                                    PlanNode::LetRec {
334                                        ids,
335                                        values,
336                                        limits,
337                                        body,
338                                    },
339                                lir_id,
340                            } => {
341                                let inner_lir_id = self.allocate_lir_id();
342                                PlanNode::LetRec {
343                                    ids,
344                                    values,
345                                    limits,
346                                    body: Box::new(
347                                        PlanNode::ArrangeBy {
348                                            input: body,
349                                            forms,
350                                            input_key,
351                                            input_mfp,
352                                        }
353                                        .as_plan(inner_lir_id),
354                                    ),
355                                }
356                                .as_plan(lir_id)
357                            }
358                            lir_value => {
359                                let lir_id = self.allocate_lir_id();
360                                PlanNode::ArrangeBy {
361                                    input: Box::new(lir_value),
362                                    forms,
363                                    input_key,
364                                    input_mfp,
365                                }
366                                .as_plan(lir_id)
367                            }
368                        };
369                        v_keys.raw = true;
370                    }
371                    let pre_existing = self.arrangements.insert(Id::Local(*id), v_keys);
372                    assert_none!(pre_existing);
373                    lir_values.push(lir_value);
374                }
375                // As we exit the iterative scope, we must leave all arrangements behind,
376                // as they reference a timestamp coordinate that must be stripped off.
377                for id in ids.iter() {
378                    self.arrangements
379                        .insert(Id::Local(*id), AvailableCollections::new_raw());
380                }
381                // Plan the body using initial and `value` arrangements,
382                // and then remove reference to the value arrangements.
383                let (body, b_keys) = self.lower_mir_expr(body)?;
384                for id in ids.iter() {
385                    self.arrangements.remove(&Id::Local(*id));
386                }
387                // Return the plan, and any `body` arrangements.
388                let lir_id = self.allocate_lir_id();
389                (
390                    PlanNode::LetRec {
391                        ids: ids.clone(),
392                        values: lir_values,
393                        limits: limits.clone(),
394                        body: Box::new(body),
395                    }
396                    .as_plan(lir_id),
397                    b_keys,
398                )
399            }
400            MirRelationExpr::FlatMap {
401                input: flat_map_input,
402                func,
403                exprs,
404            } => {
405                // A `FlatMap UnnestList` that comes after the `Reduce` of a window function can be
406                // fused into the lowered `Reduce`.
407                //
408                // In theory, we could have implemented this also as an MIR transform. However, this
409                // is more of a physical optimization, which are sometimes unpleasant to make a part
410                // of the MIR pipeline. The specific problem here with putting this into the MIR
411                // pipeline would be that we'd need to modify MIR's semantics: MIR's Reduce
412                // currently always emits exactly 1 row per group, but the fused Reduce-FlatMap can
413                // emit multiple rows per group. Such semantic changes of MIR are very scary, since
414                // various parts of the optimizer assume that Reduce emits only 1 row per group, and
415                // it would be very hard to hunt down all these parts. (For example, key inference
416                // infers the group key as a unique key.)
417                let fused_with_reduce = 'fusion: {
418                    if !matches!(func, TableFunc::UnnestList { .. }) {
419                        break 'fusion None;
420                    }
421                    // We might have a Project of a single col between the FlatMap and the
422                    // Reduce. (It projects away the grouping keys of the Reduce, and keeps the
423                    // result of the window function.)
424                    let (maybe_reduce, num_grouping_keys) = if let MirRelationExpr::Project {
425                        input: project_input,
426                        outputs: projection,
427                    } = &**flat_map_input
428                    {
429                        // We want this to be a single column, because we'll want to deal with only
430                        // one aggregation in the `Reduce`. (The aggregation of a window function
431                        // always stands alone currently: we plan them separately from other
432                        // aggregations, and Reduces are never fused. When window functions are
433                        // fused with each other, they end up in one aggregation. When there are
434                        // multiple window functions in the same SELECT, but can't be fused, they
435                        // end up in different Reduces.)
436                        if let &[single_col] = &**projection {
437                            (project_input, single_col)
438                        } else {
439                            break 'fusion None;
440                        }
441                    } else {
442                        (flat_map_input, 0)
443                    };
444                    if let MirRelationExpr::Reduce {
445                        input,
446                        group_key,
447                        aggregates,
448                        monotonic,
449                        expected_group_size,
450                    } = &**maybe_reduce
451                    {
452                        if group_key.len() != num_grouping_keys
453                            || aggregates.len() != 1
454                            || !aggregates[0].func.can_fuse_with_unnest_list()
455                        {
456                            break 'fusion None;
457                        }
458                        // At the beginning, `non_fused_mfp_above_flat_map` will be the original MFP
459                        // above the FlatMap. Later, we'll mutate this to be the residual MFP that
460                        // didn't get fused into the `Reduce`.
461                        let non_fused_mfp_above_flat_map = &mut mfp;
462                        let reduce_output_arity = num_grouping_keys + 1;
463                        // We are fusing away the list that the FlatMap would have been unnesting,
464                        // so the column that had that list disappears, so we have to permute the
465                        // MFP above the FlatMap with this column disappearance.
466                        let tweaked_mfp = {
467                            let mut mfp = non_fused_mfp_above_flat_map.clone();
468                            if mfp.demand().contains(&0) {
469                                // I don't think this can happen currently that this MFP would
470                                // refer to the list column, because both the list column and the
471                                // MFP were constructed by the HIR-to-MIR lowering, so it's not just
472                                // some random MFP that we are seeing here. But anyhow, it's better
473                                // to check this here for robustness against future code changes.
474                                break 'fusion None;
475                            }
476                            let permutation: BTreeMap<_, _> =
477                                (1..mfp.input_arity).map(|col| (col, col - 1)).collect();
478                            mfp.permute_fn(|c| permutation[&c], mfp.input_arity - 1);
479                            mfp
480                        };
481                        // We now put together the project that was before the FlatMap, and the
482                        // tweaked version of the MFP that was after the FlatMap.
483                        // (Part of this MFP might be fused into the Reduce.)
484                        let mut project_and_tweaked_mfp = {
485                            let mut mfp = MapFilterProject::new(reduce_output_arity);
486                            mfp = mfp.project(vec![num_grouping_keys]);
487                            mfp = MapFilterProject::compose(mfp, tweaked_mfp);
488                            mfp
489                        };
490                        let fused = self.lower_reduce(
491                            input,
492                            group_key,
493                            aggregates,
494                            monotonic,
495                            expected_group_size,
496                            &mut project_and_tweaked_mfp,
497                            true,
498                        )?;
499                        // Update the residual MFP.
500                        *non_fused_mfp_above_flat_map = project_and_tweaked_mfp;
501                        Some(fused)
502                    } else {
503                        break 'fusion None;
504                    }
505                };
506                if let Some(fused_with_reduce) = fused_with_reduce {
507                    fused_with_reduce
508                } else {
509                    // Couldn't fuse it with a `Reduce`, so lower as a normal `FlatMap`.
510                    let (input, keys) = self.lower_mir_expr(flat_map_input)?;
511                    // This stage can absorb arbitrary MFP instances.
512                    let mfp = mfp.take();
513                    let mut exprs = exprs.clone();
514                    let input_key = if let Some((k, permutation, _)) = keys.arbitrary_arrangement()
515                    {
516                        // We don't permute the MFP here, because it runs _after_ the table function,
517                        // whose output is in a fixed order.
518                        //
519                        // We _do_, however, need to permute the `expr`s that provide input to the
520                        // `func`.
521                        let permutation = permutation.iter().cloned().enumerate().collect();
522                        for expr in &mut exprs {
523                            expr.permute_map(&permutation);
524                        }
525
526                        Some(k.clone())
527                    } else {
528                        None
529                    };
530
531                    let lir_id = self.allocate_lir_id();
532                    // Return the plan, and no arrangements.
533                    (
534                        PlanNode::FlatMap {
535                            input: Box::new(input),
536                            func: func.clone(),
537                            exprs: exprs.clone(),
538                            mfp_after: mfp,
539                            input_key,
540                        }
541                        .as_plan(lir_id),
542                        AvailableCollections::new_raw(),
543                    )
544                }
545            }
546            MirRelationExpr::Join {
547                inputs,
548                equivalences,
549                implementation,
550            } => {
551                // Plan each of the join inputs independently.
552                // The `plans` get surfaced upwards, and the `input_keys` should
553                // be used as part of join planning / to validate the existing
554                // plans / to aid in indexed seeding of update streams.
555                let mut plans = Vec::new();
556                let mut input_keys = Vec::new();
557                let mut input_arities = Vec::new();
558                for input in inputs.iter() {
559                    let (plan, keys) = self.lower_mir_expr(input)?;
560                    input_arities.push(input.arity());
561                    plans.push(plan);
562                    input_keys.push(keys);
563                }
564
565                let input_mapper =
566                    JoinInputMapper::new_from_input_arities(input_arities.iter().copied());
567
568                // Extract temporal predicates as joins cannot currently absorb them.
569                let (plan, missing) = match implementation {
570                    IndexedFilter(_coll_id, _idx_id, key, _val) => {
571                        // Start with the constant input. (This used to be important before database-issues#4016
572                        // was fixed.)
573                        let start: usize = 1;
574                        let order = vec![(0usize, key.clone(), None)];
575                        // All columns of the constant input will be part of the arrangement key.
576                        let source_arrangement = (
577                            (0..key.len())
578                                .map(MirScalarExpr::Column)
579                                .collect::<Vec<_>>(),
580                            (0..key.len()).collect::<Vec<_>>(),
581                            Vec::<usize>::new(),
582                        );
583                        let (ljp, missing) = LinearJoinPlan::create_from(
584                            start,
585                            Some(&source_arrangement),
586                            equivalences,
587                            &order,
588                            input_mapper,
589                            &mut mfp,
590                            &input_keys,
591                        );
592                        (JoinPlan::Linear(ljp), missing)
593                    }
594                    Differential((start, start_arr, _start_characteristic), order) => {
595                        let source_arrangement = start_arr.as_ref().and_then(|key| {
596                            input_keys[*start]
597                                .arranged
598                                .iter()
599                                .find(|(k, _, _)| k == key)
600                                .clone()
601                        });
602                        let (ljp, missing) = LinearJoinPlan::create_from(
603                            *start,
604                            source_arrangement,
605                            equivalences,
606                            order,
607                            input_mapper,
608                            &mut mfp,
609                            &input_keys,
610                        );
611                        (JoinPlan::Linear(ljp), missing)
612                    }
613                    DeltaQuery(orders) => {
614                        let (djp, missing) = DeltaJoinPlan::create_from(
615                            equivalences,
616                            orders,
617                            input_mapper,
618                            &mut mfp,
619                            &input_keys,
620                        );
621                        (JoinPlan::Delta(djp), missing)
622                    }
623                    // Other plans are errors, and should be reported as such.
624                    Unimplemented => return Err("unimplemented join".to_string()),
625                };
626                // The renderer will expect certain arrangements to exist; if any of those are not available, the join planning functions above should have returned them in
627                // `missing`. We thus need to plan them here so they'll exist.
628                let is_delta = matches!(plan, JoinPlan::Delta(_));
629                for (((input_plan, input_keys), missing), arity) in plans
630                    .iter_mut()
631                    .zip(input_keys.iter())
632                    .zip(missing.into_iter())
633                    .zip(input_arities.iter().cloned())
634                {
635                    if missing != Default::default() {
636                        if is_delta {
637                            // join_implementation.rs produced a sub-optimal plan here;
638                            // we shouldn't plan delta joins at all if not all of the required
639                            // arrangements are available. Soft panic in CI and log an error in
640                            // production to increase the chances that we will catch all situations
641                            // that violate this constraint.
642                            soft_panic_or_log!("Arrangements depended on by delta join alarmingly absent: {:?}
643Dataflow info: {}
644This is not expected to cause incorrect results, but could indicate a performance issue in Materialize.", missing, self.debug_info);
645                        } else {
646                            soft_panic_or_log!("Arrangements depended on by a non-delta join are absent: {:?}
647Dataflow info: {}
648This is not expected to cause incorrect results, but could indicate a performance issue in Materialize.", missing, self.debug_info);
649                            // Nowadays MIR transforms take care to insert MIR ArrangeBys for each
650                            // Join input. (Earlier, they were missing in the following cases:
651                            //  - They were const-folded away for constant inputs. This is not
652                            //    happening since
653                            //    https://github.com/MaterializeInc/materialize/pull/16351
654                            //  - They were not being inserted for the constant input of
655                            //    `IndexedFilter`s. This was fixed in
656                            //    https://github.com/MaterializeInc/materialize/pull/20920
657                            //  - They were not being inserted for the first input of Differential
658                            //    joins. This was fixed in
659                            //    https://github.com/MaterializeInc/materialize/pull/16099)
660                        }
661                        let lir_id = self.allocate_lir_id();
662                        let raw_plan = std::mem::replace(
663                            input_plan,
664                            PlanNode::Constant {
665                                rows: Ok(Vec::new()),
666                            }
667                            .as_plan(lir_id),
668                        );
669                        *input_plan = self.arrange_by(raw_plan, missing, input_keys, arity);
670                    }
671                }
672                // Return the plan, and no arrangements.
673                let lir_id = self.allocate_lir_id();
674                (
675                    PlanNode::Join {
676                        inputs: plans,
677                        plan,
678                    }
679                    .as_plan(lir_id),
680                    AvailableCollections::new_raw(),
681                )
682            }
683            MirRelationExpr::Reduce {
684                input,
685                group_key,
686                aggregates,
687                monotonic,
688                expected_group_size,
689            } => {
690                if aggregates
691                    .iter()
692                    .any(|agg| agg.func.can_fuse_with_unnest_list())
693                {
694                    // This case should have been handled at the `MirRelationExpr::FlatMap` case
695                    // above. But that has a pretty complicated pattern matching, so it's not
696                    // unthinkable that it fails.
697                    soft_panic_or_log!(
698                        "Window function performance issue: `reduce_unnest_list_fusion` failed"
699                    );
700                }
701                self.lower_reduce(
702                    input,
703                    group_key,
704                    aggregates,
705                    monotonic,
706                    expected_group_size,
707                    &mut mfp,
708                    false,
709                )?
710            }
711            MirRelationExpr::TopK {
712                input,
713                group_key,
714                order_key,
715                limit,
716                offset,
717                monotonic,
718                expected_group_size,
719            } => {
720                let arity = input.arity();
721                let (input, keys) = self.lower_mir_expr(input)?;
722
723                let top_k_plan = TopKPlan::create_from(
724                    group_key.clone(),
725                    order_key.clone(),
726                    *offset,
727                    limit.clone(),
728                    arity,
729                    *monotonic,
730                    *expected_group_size,
731                );
732
733                // We don't have an MFP here -- install an operator to permute the
734                // input, if necessary.
735                let input = if !keys.raw {
736                    self.arrange_by(input, AvailableCollections::new_raw(), &keys, arity)
737                } else {
738                    input
739                };
740                // Return the plan, and no arrangements.
741                let lir_id = self.allocate_lir_id();
742                (
743                    PlanNode::TopK {
744                        input: Box::new(input),
745                        top_k_plan,
746                    }
747                    .as_plan(lir_id),
748                    AvailableCollections::new_raw(),
749                )
750            }
751            MirRelationExpr::Negate { input } => {
752                let arity = input.arity();
753                let (input, keys) = self.lower_mir_expr(input)?;
754
755                // We don't have an MFP here -- install an operator to permute the
756                // input, if necessary.
757                let input = if !keys.raw {
758                    self.arrange_by(input, AvailableCollections::new_raw(), &keys, arity)
759                } else {
760                    input
761                };
762                // Return the plan, and no arrangements.
763                let lir_id = self.allocate_lir_id();
764                (
765                    PlanNode::Negate {
766                        input: Box::new(input),
767                    }
768                    .as_plan(lir_id),
769                    AvailableCollections::new_raw(),
770                )
771            }
772            MirRelationExpr::Threshold { input } => {
773                let (plan, keys) = self.lower_mir_expr(input)?;
774                let arity = keys
775                    .types
776                    .as_ref()
777                    .map(|types| types.len())
778                    .unwrap_or_else(|| input.arity());
779                let (threshold_plan, required_arrangement) = ThresholdPlan::create_from(arity);
780                let mut types = keys.types.clone();
781                let plan = if !keys
782                    .arranged
783                    .iter()
784                    .any(|(key, _, _)| key == &required_arrangement.0)
785                {
786                    types = Some(types.unwrap_or_else(|| input.typ().column_types));
787                    self.arrange_by(
788                        plan,
789                        AvailableCollections::new_arranged(
790                            vec![required_arrangement],
791                            types.clone(),
792                        ),
793                        &keys,
794                        arity,
795                    )
796                } else {
797                    plan
798                };
799
800                let output_keys = threshold_plan.keys(types);
801                // Return the plan, and any produced keys.
802                let lir_id = self.allocate_lir_id();
803                (
804                    PlanNode::Threshold {
805                        input: Box::new(plan),
806                        threshold_plan,
807                    }
808                    .as_plan(lir_id),
809                    output_keys,
810                )
811            }
812            MirRelationExpr::Union { base, inputs } => {
813                let arity = base.arity();
814                let mut plans_keys = Vec::with_capacity(1 + inputs.len());
815                let (plan, keys) = self.lower_mir_expr(base)?;
816                plans_keys.push((plan, keys));
817                for input in inputs.iter() {
818                    let (plan, keys) = self.lower_mir_expr(input)?;
819                    plans_keys.push((plan, keys));
820                }
821                let plans = plans_keys
822                    .into_iter()
823                    .map(|(plan, keys)| {
824                        // We don't have an MFP here -- install an operator to permute the
825                        // input, if necessary.
826                        if !keys.raw {
827                            self.arrange_by(plan, AvailableCollections::new_raw(), &keys, arity)
828                        } else {
829                            plan
830                        }
831                    })
832                    .collect();
833                // Return the plan and no arrangements.
834                let lir_id = self.allocate_lir_id();
835                (
836                    PlanNode::Union {
837                        inputs: plans,
838                        consolidate_output: false,
839                    }
840                    .as_plan(lir_id),
841                    AvailableCollections::new_raw(),
842                )
843            }
844            MirRelationExpr::ArrangeBy { input, keys } => {
845                let input_mir = input;
846                let (input, mut input_keys) = self.lower_mir_expr(input)?;
847                // Fill the `types` in `input_keys` if not already present.
848                let input_types = input_keys
849                    .types
850                    .get_or_insert_with(|| input_mir.typ().column_types);
851                let arity = input_types.len();
852
853                // Determine keys that are not present in `input_keys`.
854                let new_keys = keys
855                    .iter()
856                    .filter(|k1| !input_keys.arranged.iter().any(|(k2, _, _)| k1 == &k2))
857                    .cloned()
858                    .collect::<Vec<_>>();
859                if new_keys.is_empty() {
860                    (input, input_keys)
861                } else {
862                    let new_keys = new_keys.iter().cloned().map(|k| {
863                        let (permutation, thinning) = permutation_for_arrangement(&k, arity);
864                        (k, permutation, thinning)
865                    });
866                    let (input_key, input_mfp) = if let Some((input_key, permutation, thinning)) =
867                        input_keys.arbitrary_arrangement()
868                    {
869                        let mut mfp = MapFilterProject::new(arity);
870                        mfp.permute_fn(|c| permutation[c], thinning.len() + input_key.len());
871                        (Some(input_key.clone()), mfp)
872                    } else {
873                        (None, MapFilterProject::new(arity))
874                    };
875                    input_keys.arranged.extend(new_keys);
876                    input_keys.arranged.sort_by(|k1, k2| k1.0.cmp(&k2.0));
877
878                    // Return the plan and extended keys.
879                    let lir_id = self.allocate_lir_id();
880                    (
881                        PlanNode::ArrangeBy {
882                            input: Box::new(input),
883                            forms: input_keys.clone(),
884                            input_key,
885                            input_mfp,
886                        }
887                        .as_plan(lir_id),
888                        input_keys,
889                    )
890                }
891            }
892        };
893
894        // If the plan stage did not absorb all linear operators, introduce a new stage to implement them.
895        if !mfp.is_identity() {
896            // Seek out an arrangement key that might be constrained to a literal.
897            // TODO: Improve key selection heuristic.
898            let key_val = keys
899                .arranged
900                .iter()
901                .filter_map(|(key, permutation, thinning)| {
902                    let mut mfp = mfp.clone();
903                    mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
904                    mfp.literal_constraints(key)
905                        .map(|val| (key.clone(), permutation, thinning, val))
906                })
907                .max_by_key(|(key, _, _, _)| key.len());
908
909            // Input key selection strategy:
910            // (1) If we can read a key at a particular value, do so
911            // (2) Otherwise, if there is a key that causes the MFP to be the identity, and
912            // therefore allows us to avoid discarding the arrangement, use that.
913            // (3) Otherwise, if there is _some_ key, use that,
914            // (4) Otherwise just read the raw collection.
915            let input_key_val = if let Some((key, permutation, thinning, val)) = key_val {
916                mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
917
918                Some((key, Some(val)))
919            } else if let Some((key, permutation, thinning)) =
920                keys.arranged.iter().find(|(key, permutation, thinning)| {
921                    let mut mfp = mfp.clone();
922                    mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
923                    mfp.is_identity()
924                })
925            {
926                mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
927                Some((key.clone(), None))
928            } else if let Some((key, permutation, thinning)) = keys.arbitrary_arrangement() {
929                mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
930                Some((key.clone(), None))
931            } else {
932                None
933            };
934
935            if mfp.is_identity() {
936                // We have discovered a key
937                // whose permutation causes the MFP to actually
938                // be the identity! We can keep it around,
939                // but without its permutation this time,
940                // and with a trivial thinning of the right length.
941                let (key, val) = input_key_val.unwrap();
942                let (_old_key, old_permutation, old_thinning) = keys
943                    .arranged
944                    .iter_mut()
945                    .find(|(key2, _, _)| key2 == &key)
946                    .unwrap();
947                *old_permutation = (0..mfp.input_arity).collect();
948                let old_thinned_arity = old_thinning.len();
949                *old_thinning = (0..old_thinned_arity).collect();
950                // Get rid of all other forms, as this is now the only one known to be valid.
951                // TODO[btv] we can probably save the other arrangements too, if we adjust their permutations.
952                // This is not hard to do, but leaving it for a quick follow-up to avoid making the present diff too unwieldy.
953                keys.arranged.retain(|(key2, _, _)| key2 == &key);
954                keys.raw = false;
955
956                // Creating a Plan::Mfp node is now logically unnecessary, but we
957                // should do so anyway when `val` is populated, so that
958                // the `key_val` optimization gets applied.
959                let lir_id = self.allocate_lir_id();
960                if val.is_some() {
961                    plan = PlanNode::Mfp {
962                        input: Box::new(plan),
963                        mfp,
964                        input_key_val: Some((key, val)),
965                    }
966                    .as_plan(lir_id)
967                }
968            } else {
969                let lir_id = self.allocate_lir_id();
970                plan = PlanNode::Mfp {
971                    input: Box::new(plan),
972                    mfp,
973                    input_key_val,
974                }
975                .as_plan(lir_id);
976                keys = AvailableCollections::new_raw();
977            }
978        }
979
980        Ok((plan, keys))
981    }
982
983    /// Lowers a `Reduce` with the given fields and an `mfp_on_top`, which is the MFP that is
984    /// originally on top of the `Reduce`. This MFP, or a part of it, might be fused into the
985    /// `Reduce`, in which case `mfp_on_top` is mutated to be the residual MFP, i.e., what was not
986    /// fused.
987    fn lower_reduce<T: Timestamp>(
988        &mut self,
989        input: &MirRelationExpr,
990        group_key: &Vec<MirScalarExpr>,
991        aggregates: &Vec<AggregateExpr>,
992        monotonic: &bool,
993        expected_group_size: &Option<u64>,
994        mfp_on_top: &mut MapFilterProject,
995        fused_unnest_list: bool,
996    ) -> Result<(Plan<T>, AvailableCollections), String> {
997        let input_arity = input.arity();
998        let (input, keys) = self.lower_mir_expr(input)?;
999        let (input_key, permutation_and_new_arity) =
1000            if let Some((input_key, permutation, thinning)) = keys.arbitrary_arrangement() {
1001                (
1002                    Some(input_key.clone()),
1003                    Some((permutation.clone(), thinning.len() + input_key.len())),
1004                )
1005            } else {
1006                (None, None)
1007            };
1008        let key_val_plan = KeyValPlan::new(
1009            input_arity,
1010            group_key,
1011            aggregates,
1012            permutation_and_new_arity,
1013        );
1014        let reduce_plan = ReducePlan::create_from(
1015            aggregates.clone(),
1016            *monotonic,
1017            *expected_group_size,
1018            fused_unnest_list,
1019        );
1020        // Return the plan, and the keys it produces.
1021        let mfp_after;
1022        let output_arity;
1023        if self.enable_reduce_mfp_fusion {
1024            (mfp_after, *mfp_on_top, output_arity) =
1025                reduce_plan.extract_mfp_after(mfp_on_top.clone(), group_key.len());
1026        } else {
1027            (mfp_after, output_arity) = (
1028                MapFilterProject::new(mfp_on_top.input_arity),
1029                group_key.len() + aggregates.len(),
1030            );
1031        }
1032        soft_assert_eq_or_log!(
1033            mfp_on_top.input_arity,
1034            output_arity,
1035            "Output arity of reduce must match input arity for MFP on top of it"
1036        );
1037        let output_keys = reduce_plan.keys(group_key.len(), output_arity);
1038        let lir_id = self.allocate_lir_id();
1039        Ok((
1040            PlanNode::Reduce {
1041                input: Box::new(input),
1042                key_val_plan,
1043                plan: reduce_plan,
1044                input_key,
1045                mfp_after,
1046            }
1047            .as_plan(lir_id),
1048            output_keys,
1049        ))
1050    }
1051
1052    /// Replace the plan with another one
1053    /// that has the collection in some additional forms.
1054    pub fn arrange_by<T>(
1055        &mut self,
1056        plan: Plan<T>,
1057        collections: AvailableCollections,
1058        old_collections: &AvailableCollections,
1059        arity: usize,
1060    ) -> Plan<T> {
1061        if let Plan {
1062            node:
1063                PlanNode::ArrangeBy {
1064                    input,
1065                    mut forms,
1066                    input_key,
1067                    input_mfp,
1068                },
1069            lir_id,
1070        } = plan
1071        {
1072            forms.raw |= collections.raw;
1073            forms.arranged.extend(collections.arranged);
1074            forms.arranged.sort_by(|k1, k2| k1.0.cmp(&k2.0));
1075            forms.arranged.dedup_by(|k1, k2| k1.0 == k2.0);
1076            if forms.types.is_none() {
1077                forms.types = collections.types;
1078            } else {
1079                assert!(collections.types.is_none() || collections.types == forms.types);
1080            }
1081            PlanNode::ArrangeBy {
1082                input,
1083                forms,
1084                input_key,
1085                input_mfp,
1086            }
1087            .as_plan(lir_id)
1088        } else {
1089            let (input_key, input_mfp) = if let Some((input_key, permutation, thinning)) =
1090                old_collections.arbitrary_arrangement()
1091            {
1092                let mut mfp = MapFilterProject::new(arity);
1093                mfp.permute_fn(|c| permutation[c], thinning.len() + input_key.len());
1094                (Some(input_key.clone()), mfp)
1095            } else {
1096                (None, MapFilterProject::new(arity))
1097            };
1098            let lir_id = self.allocate_lir_id();
1099            PlanNode::ArrangeBy {
1100                input: Box::new(plan),
1101                forms: collections,
1102                input_key,
1103                input_mfp,
1104            }
1105            .as_plan(lir_id)
1106        }
1107    }
1108}
1109
1110/// Various bits of state to print along with error messages during LIR planning,
1111/// to aid debugging.
1112#[derive(Clone, Debug)]
1113pub struct LirDebugInfo {
1114    debug_name: String,
1115    id: GlobalId,
1116}
1117
1118impl std::fmt::Display for LirDebugInfo {
1119    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1120        write!(f, "Debug name: {}; id: {}", self.debug_name, self.id)
1121    }
1122}