Skip to main content

mz_compute_types/plan/
lowering.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Lowering [`DataflowDescription`]s from MIR ([`MirRelationExpr`]) to LIR ([`Plan`]).
11
12use std::collections::{BTreeMap, BTreeSet};
13
14use columnar::Len;
15use itertools::Itertools;
16use mz_expr::JoinImplementation::{DeltaQuery, Differential, IndexedFilter, Unimplemented};
17use mz_expr::{
18    AggregateExpr, Columns, Id, JoinInputMapper, MapFilterProject, MirRelationExpr, MirScalarExpr,
19    OptimizedMirRelationExpr, TableFunc, permutation_for_arrangement,
20};
21use mz_ore::{assert_none, soft_assert_eq_or_log, soft_panic_or_log};
22use mz_repr::optimize::OptimizerFeatures;
23use mz_repr::{GlobalId, Timestamp};
24
25use crate::dataflows::{BuildDesc, DataflowDescription, IndexImport};
26use crate::plan::join::{DeltaJoinPlan, JoinPlan, LinearJoinPlan};
27use crate::plan::reduce::{KeyValPlan, ReducePlan};
28use crate::plan::threshold::ThresholdPlan;
29use crate::plan::top_k::TopKPlan;
30use crate::plan::{ArrangementStrategy, AvailableCollections, GetPlan, LirId, Plan, PlanNode};
31
32/// Pick an [`ArrangementStrategy`] based on whether the input may contain future-stamped
33/// updates. Future updates are the only case where temporal bucketing pays off.
34///
35/// Any arrangement or consolidation that absorbs data that can have future updates should be
36/// guarded by a temporal bucketing operator.
37fn strategy_from_future(has_future_updates: bool) -> ArrangementStrategy {
38    if has_future_updates {
39        ArrangementStrategy::TemporalBucketing
40    } else {
41        ArrangementStrategy::Direct
42    }
43}
44
45/// The result of lowering a [`MirRelationExpr`] to a [`Plan`].
46struct LoweredExpr {
47    /// The lowered plan.
48    plan: Plan,
49    /// The arrangement keys that the plan is certain to produce.
50    keys: AvailableCollections,
51    /// Whether the plan's output may contain updates at future timestamps,
52    /// e.g., from a temporal MFP using `mz_now()`.
53    has_future_updates: bool,
54}
55
56pub(super) struct Context {
57    /// Known bindings to (possibly arranged) collections.
58    arrangements: BTreeMap<Id, AvailableCollections>,
59    /// Ids whose collections may contain updates at future timestamps,
60    /// e.g., from a temporal MFP using `mz_now()`.
61    has_future_updates: BTreeSet<Id>,
62    /// Tracks the next available `LirId`.
63    next_lir_id: LirId,
64    /// Information to print along with error messages.
65    debug_info: LirDebugInfo,
66    /// Whether to enable fusion of MFPs in reductions.
67    enable_reduce_mfp_fusion: bool,
68}
69
70impl Context {
71    pub fn new(debug_name: String, features: &OptimizerFeatures) -> Self {
72        Self {
73            arrangements: Default::default(),
74            has_future_updates: Default::default(),
75            next_lir_id: LirId(1),
76            debug_info: LirDebugInfo {
77                debug_name,
78                id: GlobalId::Transient(0),
79            },
80            enable_reduce_mfp_fusion: features.enable_reduce_mfp_fusion,
81        }
82    }
83
84    fn allocate_lir_id(&mut self) -> LirId {
85        let id = self.next_lir_id;
86        self.next_lir_id = LirId(
87            self.next_lir_id
88                .0
89                .checked_add(1)
90                .expect("No LirId overflow"),
91        );
92        id
93    }
94
95    pub fn lower(
96        mut self,
97        desc: DataflowDescription<OptimizedMirRelationExpr>,
98    ) -> Result<DataflowDescription<Plan>, String> {
99        // Sources might provide arranged forms of their data, in the future.
100        // Indexes provide arranged forms of their data.
101        for IndexImport {
102            desc: index_desc,
103            typ,
104            ..
105        } in desc.index_imports.values()
106        {
107            let key = index_desc.key.clone();
108            // TODO[btv] - We should be told the permutation by
109            // `index_desc`, and it should have been generated
110            // at the same point the thinning logic was.
111            //
112            // We should for sure do that soon, but it requires
113            // a bit of a refactor, so for now we just
114            // _assume_ that they were both generated by `permutation_for_arrangement`,
115            // and recover it here.
116            let (permutation, thinning) = permutation_for_arrangement(&key, typ.arity());
117            let index_keys = self
118                .arrangements
119                .entry(Id::Global(index_desc.on_id))
120                .or_insert_with(AvailableCollections::default);
121            index_keys.arranged.push((key, permutation, thinning));
122        }
123        for id in desc.source_imports.keys() {
124            self.arrangements
125                .entry(Id::Global(*id))
126                .or_insert_with(AvailableCollections::new_raw);
127        }
128
129        // Build each object in order, registering the arrangements it forms.
130        let mut objects_to_build = Vec::with_capacity(desc.objects_to_build.len());
131        for build in desc.objects_to_build {
132            self.debug_info.id = build.id;
133            let LoweredExpr {
134                plan,
135                keys,
136                has_future_updates,
137            } = self.lower_mir_expr(&build.plan)?;
138
139            self.arrangements.insert(Id::Global(build.id), keys);
140            if has_future_updates {
141                self.has_future_updates.insert(Id::Global(build.id));
142            }
143            objects_to_build.push(BuildDesc { id: build.id, plan });
144        }
145
146        Ok(DataflowDescription {
147            source_imports: desc.source_imports,
148            index_imports: desc.index_imports,
149            objects_to_build,
150            index_exports: desc.index_exports,
151            sink_exports: desc.sink_exports,
152            as_of: desc.as_of,
153            until: desc.until,
154            initial_storage_as_of: desc.initial_storage_as_of,
155            refresh_schedule: desc.refresh_schedule,
156            debug_name: desc.debug_name,
157            time_dependence: desc.time_dependence,
158        })
159    }
160
161    /// This method converts a MirRelationExpr into a plan that can be directly rendered.
162    ///
163    /// The rough structure is that we repeatedly extract map/filter/project operators
164    /// from each expression we see, bundle them up as a `MapFilterProject` object, and
165    /// then produce a plan for the combination of that with the next operator.
166    ///
167    /// The method accesses `self.arrangements`, which it will locally add to and remove from for
168    /// `Let` bindings (by the end of the call it should contain the same bindings as when it
169    /// started).
170    ///
171    /// The result of the method is both a `Plan`, but also a list of arrangements that
172    /// are certain to be produced, which can be relied on by the next steps in the plan.
173    /// Each of the arrangement keys is associated with an MFP that must be applied if that
174    /// arrangement is used, to back out the permutation associated with that arrangement.
175    ///
176    /// An empty list of arrangement keys indicates that only a `Collection` stream can
177    /// be assumed to exist.
178    fn lower_mir_expr(&mut self, expr: &MirRelationExpr) -> Result<LoweredExpr, String> {
179        // This function is recursive and can overflow its stack, so grow it if
180        // needed. The growth here is unbounded. Our general solution for this problem
181        // is to use [`ore::stack::RecursionGuard`] to additionally limit the stack
182        // depth. That however requires upstream error handling. This function is
183        // currently called by the Coordinator after calls to `catalog_transact`,
184        // and thus are not allowed to fail. Until that allows errors, we choose
185        // to allow the unbounded growth here. We are though somewhat protected by
186        // higher levels enforcing their own limits on stack depth (in the parser,
187        // transformer/desugarer, and planner).
188        mz_ore::stack::maybe_grow(|| self.lower_mir_expr_stack_safe(expr))
189    }
190
191    fn lower_mir_expr_stack_safe(&mut self, expr: &MirRelationExpr) -> Result<LoweredExpr, String> {
192        // Extract a maximally large MapFilterProject from `expr`.
193        // We will then try and push this in to the resulting expression.
194        //
195        // Importantly, `mfp` may contain temporal operators and not be a "safe" MFP.
196        // While we would eventually like all plan stages to be able to absorb such
197        // general operators, not all of them can.
198        let (mut mfp, expr) = MapFilterProject::extract_from_expression(expr);
199        // We attempt to plan what we have remaining, in the context of `mfp`.
200        // We may not be able to do this, and must wrap some operators with a `Mfp` stage.
201        let LoweredExpr {
202            mut plan,
203            mut keys,
204            mut has_future_updates,
205        } = match expr {
206            // These operators should have been extracted from the expression.
207            MirRelationExpr::Map { .. } => {
208                panic!("This operator should have been extracted");
209            }
210            MirRelationExpr::Filter { .. } => {
211                panic!("This operator should have been extracted");
212            }
213            MirRelationExpr::Project { .. } => {
214                panic!("This operator should have been extracted");
215            }
216            // These operators may not have been extracted, and need to result in a `Plan`.
217            MirRelationExpr::Constant { rows, typ: _ } => {
218                let lir_id = self.allocate_lir_id();
219                let node = PlanNode::Constant {
220                    rows: rows.clone().map(|rows| {
221                        rows.into_iter()
222                            .map(|(row, diff)| (row, Timestamp::MIN, diff))
223                            .collect()
224                    }),
225                };
226                // The plan, not arranged in any way.
227                LoweredExpr {
228                    plan: node.as_plan(lir_id),
229                    keys: AvailableCollections::new_raw(),
230                    has_future_updates: false,
231                }
232            }
233            MirRelationExpr::Get { id, typ: _, .. } => {
234                // This stage can absorb arbitrary MFP operators.
235                let mut mfp = mfp.take();
236                // If `mfp` is the identity, we can surface all imported arrangements.
237                // Otherwise, we apply `mfp` and promise no arrangements.
238                let mut in_keys = self
239                    .arrangements
240                    .get(id)
241                    .cloned()
242                    .unwrap_or_else(AvailableCollections::new_raw);
243
244                // Seek out an arrangement key that might be constrained to a literal.
245                // Note: this code has very little use nowadays, as its job was mostly taken over
246                // by `LiteralConstraints` (see in the below longer comment).
247                let key_val = in_keys
248                    .arranged
249                    .iter()
250                    .filter_map(|key| {
251                        mfp.literal_constraints(&key.0)
252                            .map(|val| (key.clone(), val))
253                    })
254                    .max_by_key(|(key, _val)| key.0.len());
255
256                // Determine the plan of action for the `Get` stage.
257                let plan = if let Some(((key, permutation, thinning), val)) = &key_val {
258                    // This code path used to handle looking up literals from indexes, but it's
259                    // mostly deprecated, as this is nowadays performed by the `LiteralConstraints`
260                    // MIR transform instead. However, it's still called in a couple of tricky
261                    // special cases:
262                    // - `LiteralConstraints` handles only Gets of global ids, so this code still
263                    //   gets to handle Filters on top of Gets of local ids.
264                    // - Lowering does a `MapFilterProject::extract_from_expression`, while
265                    //   `LiteralConstraints` does
266                    //   `MapFilterProject::extract_non_errors_from_expr_mut`.
267                    // - It might happen that new literal constraint optimization opportunities
268                    //   appear somewhere near the end of the MIR optimizer after
269                    //   `LiteralConstraints` has already run.
270                    // (Also note that a similar literal constraint handling machinery is also
271                    // present when handling the leftover MFP after this big match.)
272                    mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
273                    in_keys.arranged = vec![(key.clone(), permutation.clone(), thinning.clone())];
274                    GetPlan::Arrangement(key.clone(), Some(val.clone()), mfp)
275                } else if !mfp.is_identity() {
276                    // We need to ensure a collection exists, which means we must form it.
277                    if let Some((key, permutation, thinning)) =
278                        in_keys.arbitrary_arrangement().cloned()
279                    {
280                        mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
281                        in_keys.arranged = vec![(key.clone(), permutation, thinning)];
282                        GetPlan::Arrangement(key, None, mfp)
283                    } else {
284                        GetPlan::Collection(mfp)
285                    }
286                } else {
287                    // By default, just pass input arrangements through.
288                    GetPlan::PassArrangements
289                };
290
291                let out_keys = if let GetPlan::PassArrangements = plan {
292                    in_keys.clone()
293                } else {
294                    AvailableCollections::new_raw()
295                };
296
297                // Even with a non-temporal MFP, we must propagate `has_future_updates`
298                // from the underlying binding — applying an MFP doesn't drop future-
299                // timestamped updates that already exist on the input.
300                //
301                // Note that global Gets from different dataflows can't have future updates, because
302                // both indexes and materialized views hold back future updates.
303                let has_future_updates = self.has_future_updates.contains(id)
304                    || match &plan {
305                        GetPlan::Arrangement(_, _, mfp) | GetPlan::Collection(mfp) => {
306                            mfp.has_temporal_predicates()
307                        }
308                        GetPlan::PassArrangements => false,
309                    };
310
311                let lir_id = self.allocate_lir_id();
312                let node = PlanNode::Get {
313                    id: id.clone(),
314                    keys: in_keys,
315                    plan,
316                };
317                // Return the plan, and any keys if an identity `mfp`.
318                LoweredExpr {
319                    plan: node.as_plan(lir_id),
320                    keys: out_keys,
321                    has_future_updates,
322                }
323            }
324            MirRelationExpr::Let { id, value, body } => {
325                // It would be unfortunate to have a non-trivial `mfp` here, as we hope
326                // that they would be pushed down. I am not sure if we should take the
327                // initiative to push down the `mfp` ourselves.
328
329                // Plan the value using only the initial arrangements, but
330                // introduce any resulting arrangements bound to `id`.
331                let LoweredExpr {
332                    plan: value,
333                    keys: v_keys,
334                    has_future_updates: v_future,
335                } = self.lower_mir_expr(value)?;
336                let pre_existing = self.arrangements.insert(Id::Local(*id), v_keys);
337                assert_none!(pre_existing);
338                if v_future {
339                    self.has_future_updates.insert(Id::Local(*id));
340                }
341                // Plan the body using initial and `value` arrangements,
342                // and then remove reference to the value arrangements.
343                let LoweredExpr {
344                    plan: body,
345                    keys: b_keys,
346                    has_future_updates: b_future,
347                } = self.lower_mir_expr(body)?;
348                self.arrangements.remove(&Id::Local(*id));
349                self.has_future_updates.remove(&Id::Local(*id));
350                // Return the plan, and any `body` arrangements.
351                let lir_id = self.allocate_lir_id();
352                LoweredExpr {
353                    plan: PlanNode::Let {
354                        id: id.clone(),
355                        value: Box::new(value),
356                        body: Box::new(body),
357                    }
358                    .as_plan(lir_id),
359                    keys: b_keys,
360                    has_future_updates: b_future,
361                }
362            }
363            MirRelationExpr::LetRec {
364                ids,
365                values,
366                limits,
367                body,
368            } => {
369                assert_eq!(ids.len(), values.len());
370                assert_eq!(ids.len(), limits.len());
371                // Plan the values using only the available arrangements, but
372                // introduce any resulting arrangements bound to each `id`.
373                // Arrangements made available cannot be used by prior bindings,
374                // as we cannot circulate an arrangement through a `Variable` yet.
375                let mut lir_values = Vec::with_capacity(values.len());
376                let mut any_v_future = false;
377                for (id, value) in ids.iter().zip_eq(values) {
378                    let LoweredExpr {
379                        plan: mut lir_value,
380                        keys: mut v_keys,
381                        has_future_updates: v_future,
382                    } = self.lower_mir_expr(value)?;
383                    any_v_future |= v_future;
384                    // If `v_keys` does not contain an unarranged collection, we must form it.
385                    if !v_keys.raw {
386                        // Choose an "arbitrary" arrangement; TODO: prefer a specific one.
387                        let (input_key, permutation, thinning) =
388                            v_keys.arbitrary_arrangement().unwrap();
389                        let mut input_mfp = MapFilterProject::new(value.arity());
390                        input_mfp.permute_fn(|c| permutation[c], thinning.len() + input_key.len());
391                        let input_key = Some(input_key.clone());
392
393                        let forms = AvailableCollections::new_raw();
394
395                        // We just want to insert an `ArrangeBy` to form an unarranged collection,
396                        // but there is a complication: We shouldn't break the invariant (created by
397                        // `NormalizeLets`, and relied upon by the rendering) that there isn't
398                        // anything between two `LetRec`s. So if `lir_value` is itself a `LetRec`,
399                        // then we insert the `ArrangeBy` on the `body` of the inner `LetRec`,
400                        // instead of on top of the inner `LetRec`.
401                        //
402                        // We forward `v_future` for honesty; bucketing has no observable effect
403                        // inside an iterative scope, but the field should reflect reality.
404                        lir_value = match lir_value {
405                            Plan {
406                                node:
407                                    PlanNode::LetRec {
408                                        ids,
409                                        values,
410                                        limits,
411                                        body,
412                                    },
413                                lir_id,
414                            } => {
415                                let inner_lir_id = self.allocate_lir_id();
416                                PlanNode::LetRec {
417                                    ids,
418                                    values,
419                                    limits,
420                                    body: Box::new(
421                                        PlanNode::ArrangeBy {
422                                            input_key,
423                                            input: body,
424                                            input_mfp,
425                                            forms,
426                                            strategy: strategy_from_future(v_future),
427                                        }
428                                        .as_plan(inner_lir_id),
429                                    ),
430                                }
431                                .as_plan(lir_id)
432                            }
433                            lir_value => {
434                                let lir_id = self.allocate_lir_id();
435                                PlanNode::ArrangeBy {
436                                    input_key,
437                                    input: Box::new(lir_value),
438                                    input_mfp,
439                                    forms,
440                                    strategy: strategy_from_future(v_future),
441                                }
442                                .as_plan(lir_id)
443                            }
444                        };
445                        v_keys.raw = true;
446                    }
447                    let pre_existing = self.arrangements.insert(Id::Local(*id), v_keys);
448                    assert_none!(pre_existing);
449                    if v_future {
450                        self.has_future_updates.insert(Id::Local(*id));
451                    }
452                    lir_values.push(lir_value);
453                }
454                // As we exit the iterative scope, we must leave all arrangements behind,
455                // as they reference a timestamp coordinate that must be stripped off.
456                for id in ids.iter() {
457                    self.arrangements
458                        .insert(Id::Local(*id), AvailableCollections::new_raw());
459                }
460                // Plan the body using initial and `value` arrangements,
461                // and then remove reference to the value arrangements.
462                let LoweredExpr {
463                    plan: body,
464                    keys: b_keys,
465                    has_future_updates: b_future,
466                } = self.lower_mir_expr(body)?;
467                for id in ids.iter() {
468                    self.arrangements.remove(&Id::Local(*id));
469                    self.has_future_updates.remove(&Id::Local(*id));
470                }
471                // Return the plan, and any `body` arrangements.
472                //
473                // The body's `b_future` alone can under-report: an earlier binding may only
474                // inherit `has_future_updates` via a Variable to a *later* binding, which the
475                // sequential sweep can't observe at the time the earlier binding is lowered.
476                // A precise fix would require a fixpoint (or the MIR `Analysis` framework with
477                // a `true ⊑ false` lattice). As a cheap correct alternative, OR with the
478                // bindings' future flags: any cross-binding propagation must originate from a
479                // local temporal predicate inside *some* binding, so the OR captures it
480                // without forcing bucketing on a fully non-temporal LetRec.
481                let lir_id = self.allocate_lir_id();
482                LoweredExpr {
483                    plan: PlanNode::LetRec {
484                        ids: ids.clone(),
485                        values: lir_values,
486                        limits: limits.clone(),
487                        body: Box::new(body),
488                    }
489                    .as_plan(lir_id),
490                    keys: b_keys,
491                    has_future_updates: b_future || any_v_future,
492                }
493            }
494            MirRelationExpr::FlatMap {
495                input: flat_map_input,
496                func,
497                exprs,
498            } => {
499                // A `FlatMap UnnestList` that comes after the `Reduce` of a window function can be
500                // fused into the lowered `Reduce`.
501                //
502                // In theory, we could have implemented this also as an MIR transform. However, this
503                // is more of a physical optimization, which are sometimes unpleasant to make a part
504                // of the MIR pipeline. The specific problem here with putting this into the MIR
505                // pipeline would be that we'd need to modify MIR's semantics: MIR's Reduce
506                // currently always emits exactly 1 row per group, but the fused Reduce-FlatMap can
507                // emit multiple rows per group. Such semantic changes of MIR are very scary, since
508                // various parts of the optimizer assume that Reduce emits only 1 row per group, and
509                // it would be very hard to hunt down all these parts. (For example, key inference
510                // infers the group key as a unique key.)
511                let fused_with_reduce = 'fusion: {
512                    if !matches!(func, TableFunc::UnnestList { .. }) {
513                        break 'fusion None;
514                    }
515                    // We might have a Project of a single col between the FlatMap and the
516                    // Reduce. (It projects away the grouping keys of the Reduce, and keeps the
517                    // result of the window function.)
518                    let (maybe_reduce, num_grouping_keys) = if let MirRelationExpr::Project {
519                        input: project_input,
520                        outputs: projection,
521                    } = &**flat_map_input
522                    {
523                        // We want this to be a single column, because we'll want to deal with only
524                        // one aggregation in the `Reduce`. (The aggregation of a window function
525                        // always stands alone currently: we plan them separately from other
526                        // aggregations, and Reduces are never fused. When window functions are
527                        // fused with each other, they end up in one aggregation. When there are
528                        // multiple window functions in the same SELECT, but can't be fused, they
529                        // end up in different Reduces.)
530                        if let &[single_col] = &**projection {
531                            (project_input, single_col)
532                        } else {
533                            break 'fusion None;
534                        }
535                    } else {
536                        (flat_map_input, 0)
537                    };
538                    if let MirRelationExpr::Reduce {
539                        input,
540                        group_key,
541                        aggregates,
542                        monotonic,
543                        expected_group_size,
544                    } = &**maybe_reduce
545                    {
546                        if group_key.len() != num_grouping_keys
547                            || aggregates.len() != 1
548                            || !aggregates[0].func.can_fuse_with_unnest_list()
549                        {
550                            break 'fusion None;
551                        }
552                        // At the beginning, `non_fused_mfp_above_flat_map` will be the original MFP
553                        // above the FlatMap. Later, we'll mutate this to be the residual MFP that
554                        // didn't get fused into the `Reduce`.
555                        let non_fused_mfp_above_flat_map = &mut mfp;
556                        let reduce_output_arity = num_grouping_keys + 1;
557                        // We are fusing away the list that the FlatMap would have been unnesting,
558                        // so the column that had that list disappears, so we have to permute the
559                        // MFP above the FlatMap with this column disappearance.
560                        let tweaked_mfp = {
561                            let mut mfp = non_fused_mfp_above_flat_map.clone();
562                            if mfp.demand().contains(&0) {
563                                // I don't think this can happen currently that this MFP would
564                                // refer to the list column, because both the list column and the
565                                // MFP were constructed by the HIR-to-MIR lowering, so it's not just
566                                // some random MFP that we are seeing here. But anyhow, it's better
567                                // to check this here for robustness against future code changes.
568                                break 'fusion None;
569                            }
570                            let permutation: BTreeMap<_, _> =
571                                (1..mfp.input_arity).map(|col| (col, col - 1)).collect();
572                            mfp.permute_fn(|c| permutation[&c], mfp.input_arity - 1);
573                            mfp
574                        };
575                        // We now put together the project that was before the FlatMap, and the
576                        // tweaked version of the MFP that was after the FlatMap.
577                        // (Part of this MFP might be fused into the Reduce.)
578                        let mut project_and_tweaked_mfp = {
579                            let mut mfp = MapFilterProject::new(reduce_output_arity);
580                            mfp = mfp.project(vec![num_grouping_keys]);
581                            mfp = MapFilterProject::compose(mfp, tweaked_mfp);
582                            mfp
583                        };
584                        let fused = self.lower_reduce(
585                            input,
586                            group_key,
587                            aggregates,
588                            monotonic,
589                            expected_group_size,
590                            &mut project_and_tweaked_mfp,
591                            true,
592                        )?;
593                        // Update the residual MFP.
594                        *non_fused_mfp_above_flat_map = project_and_tweaked_mfp;
595                        Some(fused)
596                    } else {
597                        break 'fusion None;
598                    }
599                };
600                if let Some(fused_with_reduce) = fused_with_reduce {
601                    fused_with_reduce
602                } else {
603                    // Couldn't fuse it with a `Reduce`, so lower as a normal `FlatMap`.
604                    let LoweredExpr {
605                        plan: input,
606                        keys,
607                        has_future_updates: input_future,
608                    } = self.lower_mir_expr(flat_map_input)?;
609                    // This stage can absorb arbitrary MFP instances.
610                    let mut mfp = mfp.take();
611                    let mut exprs = exprs.clone();
612                    // Prefer the unarranged collection when present: it presents input columns
613                    // in logical order, so no permutation is required.
614                    let input_key = if keys.raw {
615                        None
616                    } else if let Some((k, permutation, thinning)) = keys.arbitrary_arrangement() {
617                        // Reading from this arrangement exposes input columns in arrangement
618                        // order (key columns followed by thinned value columns). We must
619                        // permute every reference to an input column accordingly: the
620                        // `expr`s feeding the table function arguments, and the `mfp` running
621                        // after the table function (which still references input columns at
622                        // positions `0..input_arity`).
623                        //
624                        // The renderer hands the `mfp` the *whole* arranged row and appends the
625                        // table-function output after it. The arranged row can be wider than the
626                        // logical input row when the key is not a set of distinct columns (an
627                        // expression, functional, or repeated-column key carries extra key
628                        // values). So the table-function output columns at positions
629                        // `input_arity..` must be shifted to land after the arranged row, and the
630                        // `mfp`'s new input arity must reflect the arranged width.
631                        for expr in &mut exprs {
632                            expr.permute(permutation);
633                        }
634                        let input_arity = permutation.len();
635                        let arranged_arity = thinning.len() + k.len();
636                        let output_arity = mfp.input_arity - input_arity;
637                        mfp.permute_fn(
638                            |c| {
639                                if c < input_arity {
640                                    permutation[c]
641                                } else {
642                                    arranged_arity + (c - input_arity)
643                                }
644                            },
645                            arranged_arity + output_arity,
646                        );
647                        Some(k.clone())
648                    } else {
649                        None
650                    };
651
652                    let lir_id = self.allocate_lir_id();
653                    // The absorbed `mfp` may contain temporal predicates, which can
654                    // introduce future-stamped updates that aren't present on the input.
655                    let has_future_updates = input_future || mfp.has_temporal_predicates();
656                    // Return the plan, and no arrangements.
657                    LoweredExpr {
658                        plan: PlanNode::FlatMap {
659                            input_key,
660                            input: Box::new(input),
661                            exprs,
662                            func: func.clone(),
663                            mfp_after: mfp,
664                        }
665                        .as_plan(lir_id),
666                        keys: AvailableCollections::new_raw(),
667                        has_future_updates,
668                    }
669                }
670            }
671            MirRelationExpr::Join {
672                inputs,
673                equivalences,
674                implementation,
675            } => {
676                // Plan each of the join inputs independently.
677                // The `plans` get surfaced upwards, and the `input_keys` should
678                // be used as part of join planning / to validate the existing
679                // plans / to aid in indexed seeding of update streams.
680                let mut plans = Vec::new();
681                let mut input_keys = Vec::new();
682                let mut input_arities = Vec::new();
683                let mut input_futures = Vec::new();
684                for input in inputs.iter() {
685                    let LoweredExpr {
686                        plan,
687                        keys,
688                        has_future_updates: input_future,
689                    } = self.lower_mir_expr(input)?;
690                    input_arities.push(input.arity());
691                    plans.push(plan);
692                    input_keys.push(keys);
693                    input_futures.push(input_future);
694                }
695                let any_input_future = input_futures.iter().any(|&f| f);
696
697                let input_mapper =
698                    JoinInputMapper::new_from_input_arities(input_arities.iter().copied());
699
700                // Extract temporal predicates as joins cannot currently absorb them.
701                let (plan, missing) = match implementation {
702                    IndexedFilter(_coll_id, _idx_id, key, _val) => {
703                        // Start with the constant input. (This used to be important before database-issues#4016
704                        // was fixed.)
705                        let start: usize = 1;
706                        let order = vec![(0usize, key.clone(), None)];
707                        // All columns of the constant input will be part of the arrangement key.
708                        let source_arrangement = (
709                            (0..key.len())
710                                .map(MirScalarExpr::column)
711                                .collect::<Vec<_>>(),
712                            (0..key.len()).collect::<Vec<_>>(),
713                            Vec::<usize>::new(),
714                        );
715                        let (ljp, missing) = LinearJoinPlan::create_from(
716                            start,
717                            Some(&source_arrangement),
718                            equivalences,
719                            &order,
720                            input_mapper,
721                            &mut mfp,
722                            &input_keys,
723                        );
724                        (JoinPlan::Linear(ljp), missing)
725                    }
726                    Differential((start, start_arr, _start_characteristic), order) => {
727                        let source_arrangement = start_arr.as_ref().and_then(|key| {
728                            input_keys[*start]
729                                .arranged
730                                .iter()
731                                .find(|(k, _, _)| k == key)
732                                .clone()
733                        });
734                        let (ljp, missing) = LinearJoinPlan::create_from(
735                            *start,
736                            source_arrangement,
737                            equivalences,
738                            order,
739                            input_mapper,
740                            &mut mfp,
741                            &input_keys,
742                        );
743                        (JoinPlan::Linear(ljp), missing)
744                    }
745                    DeltaQuery(orders) => {
746                        let (djp, missing) = DeltaJoinPlan::create_from(
747                            equivalences,
748                            orders,
749                            input_mapper,
750                            &mut mfp,
751                            &input_keys,
752                        );
753                        (JoinPlan::Delta(djp), missing)
754                    }
755                    // Other plans are errors, and should be reported as such.
756                    Unimplemented => return Err("unimplemented join".to_string()),
757                };
758                // The renderer will expect certain arrangements to exist; if any of those are not available, the join planning functions above should have returned them in
759                // `missing`. We thus need to plan them here so they'll exist.
760                let is_delta = matches!(plan, JoinPlan::Delta(_));
761                for ((((input_plan, input_keys), missing), arity), input_future) in plans
762                    .iter_mut()
763                    .zip_eq(input_keys.iter())
764                    .zip_eq(missing)
765                    .zip_eq(input_arities.iter().cloned())
766                    .zip_eq(input_futures.iter().copied())
767                {
768                    if missing != Default::default() {
769                        if is_delta {
770                            // join_implementation.rs produced a sub-optimal plan here;
771                            // we shouldn't plan delta joins at all if not all of the required
772                            // arrangements are available. Soft panic in CI and log an error in
773                            // production to increase the chances that we will catch all situations
774                            // that violate this constraint.
775                            soft_panic_or_log!("Arrangements depended on by delta join alarmingly absent: {:?}
776Dataflow info: {}
777This is not expected to cause incorrect results, but could indicate a performance issue in Materialize.", missing, self.debug_info);
778                        } else {
779                            soft_panic_or_log!("Arrangements depended on by a non-delta join are absent: {:?}
780Dataflow info: {}
781This is not expected to cause incorrect results, but could indicate a performance issue in Materialize.", missing, self.debug_info);
782                            // Nowadays MIR transforms take care to insert MIR ArrangeBys for each
783                            // Join input. (Earlier, they were missing in the following cases:
784                            //  - They were const-folded away for constant inputs. This is not
785                            //    happening since
786                            //    https://github.com/MaterializeInc/materialize/pull/16351
787                            //  - They were not being inserted for the constant input of
788                            //    `IndexedFilter`s. This was fixed in
789                            //    https://github.com/MaterializeInc/materialize/pull/20920
790                            //  - They were not being inserted for the first input of Differential
791                            //    joins. This was fixed in
792                            //    https://github.com/MaterializeInc/materialize/pull/16099)
793                        }
794                        let lir_id = self.allocate_lir_id();
795                        let raw_plan = std::mem::replace(
796                            input_plan,
797                            PlanNode::Constant {
798                                rows: Ok(Vec::new()),
799                            }
800                            .as_plan(lir_id),
801                        );
802                        *input_plan =
803                            self.arrange_by(raw_plan, missing, input_keys, arity, input_future);
804                    }
805                }
806                // Return the plan, and no arrangements.
807                // Both linear and delta join planning extract temporal predicates back into the
808                // residual `mfp` (see `LinearJoinPlan::create_from` / `DeltaJoinPlan::create_from`),
809                // so the absorbed MFP cannot introduce future updates — the join's output future
810                // flag is just the OR of its inputs.
811                let lir_id = self.allocate_lir_id();
812                LoweredExpr {
813                    plan: PlanNode::Join {
814                        inputs: plans,
815                        plan,
816                    }
817                    .as_plan(lir_id),
818                    keys: AvailableCollections::new_raw(),
819                    has_future_updates: any_input_future,
820                }
821            }
822            MirRelationExpr::Reduce {
823                input,
824                group_key,
825                aggregates,
826                monotonic,
827                expected_group_size,
828            } => {
829                if aggregates
830                    .iter()
831                    .any(|agg| agg.func.can_fuse_with_unnest_list())
832                {
833                    // This case should have been handled at the `MirRelationExpr::FlatMap` case
834                    // above. But that has a pretty complicated pattern matching, so it's not
835                    // unthinkable that it fails.
836                    soft_panic_or_log!(
837                        "Window function performance issue: `reduce_unnest_list_fusion` failed"
838                    );
839                }
840                self.lower_reduce(
841                    input,
842                    group_key,
843                    aggregates,
844                    monotonic,
845                    expected_group_size,
846                    &mut mfp,
847                    false,
848                )?
849            }
850            MirRelationExpr::TopK {
851                input,
852                group_key,
853                order_key,
854                limit,
855                offset,
856                monotonic,
857                expected_group_size,
858            } => {
859                let arity = input.arity();
860                let LoweredExpr {
861                    plan: input,
862                    keys,
863                    has_future_updates: input_future,
864                } = self.lower_mir_expr(input)?;
865
866                let top_k_plan = TopKPlan::create_from(
867                    group_key.clone(),
868                    order_key.clone(),
869                    *offset,
870                    limit.clone(),
871                    arity,
872                    *monotonic,
873                    *expected_group_size,
874                );
875
876                // We don't have an MFP here -- install an operator to permute the
877                // input, if necessary.
878                let input = if !keys.raw {
879                    self.arrange_by(
880                        input,
881                        AvailableCollections::new_raw(),
882                        &keys,
883                        arity,
884                        // `new_raw` means no arrangement, so no bucketing is needed
885                        false,
886                    )
887                } else {
888                    input
889                };
890                // Return the plan, and no arrangements.
891                let temporal_bucketing_strategy = strategy_from_future(input_future);
892                let lir_id = self.allocate_lir_id();
893                LoweredExpr {
894                    plan: PlanNode::TopK {
895                        input: Box::new(input),
896                        top_k_plan,
897                        temporal_bucketing_strategy,
898                    }
899                    .as_plan(lir_id),
900                    keys: AvailableCollections::new_raw(),
901                    has_future_updates: false,
902                }
903            }
904            MirRelationExpr::Negate { input } => {
905                let arity = input.arity();
906                let LoweredExpr {
907                    plan: input,
908                    keys,
909                    has_future_updates: input_future,
910                } = self.lower_mir_expr(input)?;
911
912                // We don't have an MFP here -- install an operator to permute the
913                // input, if necessary.
914                let input = if !keys.raw {
915                    self.arrange_by(
916                        input,
917                        AvailableCollections::new_raw(),
918                        &keys,
919                        arity,
920                        // `new_raw` means no arrangement, so no bucketing is needed
921                        false,
922                    )
923                } else {
924                    input
925                };
926                // Return the plan, and no arrangements.
927                let lir_id = self.allocate_lir_id();
928                LoweredExpr {
929                    plan: PlanNode::Negate {
930                        input: Box::new(input),
931                    }
932                    .as_plan(lir_id),
933                    keys: AvailableCollections::new_raw(),
934                    has_future_updates: input_future,
935                }
936            }
937            MirRelationExpr::Threshold { input } => {
938                let LoweredExpr {
939                    plan,
940                    keys,
941                    has_future_updates: input_future,
942                } = self.lower_mir_expr(input)?;
943                let arity = input.arity();
944                let (threshold_plan, required_arrangement) = ThresholdPlan::create_from(arity);
945
946                let plan = if !keys
947                    .arranged
948                    .iter()
949                    .any(|(key, _, _)| key == &required_arrangement.0)
950                {
951                    self.arrange_by(
952                        plan,
953                        AvailableCollections::new_arranged(vec![required_arrangement]),
954                        &keys,
955                        arity,
956                        input_future,
957                    )
958                } else {
959                    plan
960                };
961
962                let output_keys = threshold_plan.keys();
963                // Return the plan, and any produced keys.
964                let lir_id = self.allocate_lir_id();
965                LoweredExpr {
966                    plan: PlanNode::Threshold {
967                        input: Box::new(plan),
968                        threshold_plan,
969                    }
970                    .as_plan(lir_id),
971                    keys: output_keys,
972                    // Threshold builds its own output arrangement whose
973                    // MergeBatcher absorbs future-stamped updates, so no
974                    // future updates flow out.
975                    has_future_updates: false,
976                }
977            }
978            MirRelationExpr::Union { base, inputs } => {
979                let arity = base.arity();
980                let mut lowered_inputs = Vec::with_capacity(1 + inputs.len());
981                lowered_inputs.push(self.lower_mir_expr(base)?);
982                for input in inputs.iter() {
983                    lowered_inputs.push(self.lower_mir_expr(input)?);
984                }
985
986                // A Union with any `Negate` input should consolidate its
987                // output. The lowering is the only place where this decision
988                // can be coupled with the per-input bucketing strategy.
989                let consolidate_output = lowered_inputs
990                    .iter()
991                    .any(|l| matches!(l.plan.node, PlanNode::Negate { .. }));
992
993                // Per-input bucketing strategies: only meaningful when the
994                // Union consolidates its output, since bucketing only pays off
995                // ahead of a downstream consolidator.
996                let temporal_bucketing_strategies: Vec<ArrangementStrategy> = if consolidate_output
997                {
998                    lowered_inputs
999                        .iter()
1000                        .map(|l| strategy_from_future(l.has_future_updates))
1001                        .collect()
1002                } else {
1003                    lowered_inputs
1004                        .iter()
1005                        .map(|_| ArrangementStrategy::Direct)
1006                        .collect()
1007                };
1008
1009                let has_future_updates = if consolidate_output {
1010                    // The MergeBatcher will hold back future updates (regardless of whether we are
1011                    // bucketing here or not).
1012                    false
1013                } else {
1014                    lowered_inputs.iter().any(|l| l.has_future_updates)
1015                };
1016
1017                let plans = lowered_inputs
1018                    .into_iter()
1019                    .map(
1020                        |LoweredExpr {
1021                             plan,
1022                             keys,
1023                             has_future_updates: _,
1024                         }| {
1025                            // We don't have an MFP here -- install an operator to permute the
1026                            // input, if necessary.
1027                            if !keys.raw {
1028                                self.arrange_by(
1029                                    plan,
1030                                    AvailableCollections::new_raw(),
1031                                    &keys,
1032                                    arity,
1033                                    // `new_raw` means no arrangement, so no bucketing is needed
1034                                    false,
1035                                )
1036                            } else {
1037                                plan
1038                            }
1039                        },
1040                    )
1041                    .collect();
1042                // Return the plan and no arrangements.
1043                let lir_id = self.allocate_lir_id();
1044                LoweredExpr {
1045                    plan: PlanNode::Union {
1046                        inputs: plans,
1047                        consolidate_output,
1048                        temporal_bucketing_strategies,
1049                    }
1050                    .as_plan(lir_id),
1051                    keys: AvailableCollections::new_raw(),
1052                    has_future_updates,
1053                }
1054            }
1055            MirRelationExpr::ArrangeBy { input, keys } => {
1056                let input_mir = input;
1057                let LoweredExpr {
1058                    plan: input,
1059                    keys: mut input_keys,
1060                    has_future_updates: input_has_future_updates,
1061                } = self.lower_mir_expr(input)?;
1062                // Fill the `types` in `input_keys` if not already present.
1063                let arity = input_mir.arity();
1064
1065                // Determine keys that are not present in `input_keys`.
1066                let new_keys = keys
1067                    .iter()
1068                    .filter(|k1| !input_keys.arranged.iter().any(|(k2, _, _)| k1 == &k2))
1069                    .cloned()
1070                    .collect::<Vec<_>>();
1071                if new_keys.is_empty() {
1072                    LoweredExpr {
1073                        plan: input,
1074                        keys: input_keys,
1075                        has_future_updates: input_has_future_updates,
1076                    }
1077                } else {
1078                    let mut new_keys = new_keys
1079                        .iter()
1080                        .cloned()
1081                        .map(|k| {
1082                            let (permutation, thinning) = permutation_for_arrangement(&k, arity);
1083                            (k, permutation, thinning)
1084                        })
1085                        .collect::<Vec<_>>();
1086                    let forms = AvailableCollections {
1087                        raw: input_keys.raw,
1088                        arranged: new_keys.clone(),
1089                    };
1090                    let (input_key, input_mfp) = if let Some((input_key, permutation, thinning)) =
1091                        input_keys.arbitrary_arrangement()
1092                    {
1093                        let mut mfp = MapFilterProject::new(arity);
1094                        mfp.permute_fn(|c| permutation[c], thinning.len() + input_key.len());
1095                        (Some(input_key.clone()), mfp)
1096                    } else {
1097                        (None, MapFilterProject::new(arity))
1098                    };
1099                    input_keys.arranged.append(&mut new_keys);
1100                    input_keys.arranged.sort_by(|k1, k2| k1.0.cmp(&k2.0));
1101
1102                    // Return the plan and extended keys.
1103                    let lir_id = self.allocate_lir_id();
1104                    let strategy = strategy_from_future(input_has_future_updates);
1105                    assert!(!forms.arranged.is_empty()); // i.e., we do build an arrangement
1106                    let has_future_updates = false;
1107                    LoweredExpr {
1108                        plan: PlanNode::ArrangeBy {
1109                            input_key,
1110                            input: Box::new(input),
1111                            input_mfp,
1112                            forms,
1113                            strategy,
1114                        }
1115                        .as_plan(lir_id),
1116                        keys: input_keys,
1117                        has_future_updates,
1118                    }
1119                }
1120            }
1121        };
1122
1123        // If the plan stage did not absorb all linear operators, introduce a new stage to implement them.
1124        if !mfp.is_identity() {
1125            // Check if this MFP introduces future updates.
1126            let mfp_is_temporal = mfp.has_temporal_predicates();
1127            has_future_updates = has_future_updates || mfp_is_temporal;
1128            // Seek out an arrangement key that might be constrained to a literal.
1129            // TODO: Improve key selection heuristic.
1130            let key_val = keys
1131                .arranged
1132                .iter()
1133                .filter_map(|(key, permutation, thinning)| {
1134                    let mut mfp = mfp.clone();
1135                    mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
1136                    mfp.literal_constraints(key)
1137                        .map(|val| (key.clone(), permutation, thinning, val))
1138                })
1139                .max_by_key(|(key, _, _, _)| key.len());
1140
1141            // Input key selection strategy:
1142            // (1) If we can read a key at a particular value, do so
1143            // (2) Otherwise, if there is a key that causes the MFP to be the identity, and
1144            // therefore allows us to avoid discarding the arrangement, use that.
1145            // (3) Otherwise, if there is _some_ key, use that,
1146            // (4) Otherwise just read the raw collection.
1147            let input_key_val = if let Some((key, permutation, thinning, val)) = key_val {
1148                mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
1149
1150                Some((key, Some(val)))
1151            } else if let Some((key, permutation, thinning)) =
1152                keys.arranged.iter().find(|(key, permutation, thinning)| {
1153                    let mut mfp = mfp.clone();
1154                    mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
1155                    mfp.is_identity()
1156                })
1157            {
1158                mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
1159                Some((key.clone(), None))
1160            } else if let Some((key, permutation, thinning)) = keys.arbitrary_arrangement() {
1161                mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
1162                Some((key.clone(), None))
1163            } else {
1164                None
1165            };
1166
1167            if mfp.is_identity() {
1168                // We have discovered a key
1169                // whose permutation causes the MFP to actually
1170                // be the identity! We can keep it around,
1171                // but without its permutation this time,
1172                // and with a trivial thinning of the right length.
1173                let (key, val) = input_key_val.unwrap();
1174                let (_old_key, old_permutation, old_thinning) = keys
1175                    .arranged
1176                    .iter_mut()
1177                    .find(|(key2, _, _)| key2 == &key)
1178                    .unwrap();
1179                *old_permutation = (0..mfp.input_arity).collect();
1180                let old_thinned_arity = old_thinning.len();
1181                *old_thinning = (0..old_thinned_arity).collect();
1182                // Get rid of all other forms, as this is now the only one known to be valid.
1183                // TODO[btv] we can probably save the other arrangements too, if we adjust their permutations.
1184                // This is not hard to do, but leaving it for a quick follow-up to avoid making the present diff too unwieldy.
1185                keys.arranged.retain(|(key2, _, _)| key2 == &key);
1186                keys.raw = false;
1187
1188                // Creating a Plan::Mfp node is now logically unnecessary, but we
1189                // should do so anyway when `val` is populated, so that
1190                // the `key_val` optimization gets applied.
1191                let lir_id = self.allocate_lir_id();
1192                if val.is_some() {
1193                    plan = PlanNode::Mfp {
1194                        input: Box::new(plan),
1195                        mfp,
1196                        input_key_val: Some((key, val)),
1197                    }
1198                    .as_plan(lir_id)
1199                }
1200            } else {
1201                let lir_id = self.allocate_lir_id();
1202                plan = PlanNode::Mfp {
1203                    input: Box::new(plan),
1204                    mfp,
1205                    input_key_val,
1206                }
1207                .as_plan(lir_id);
1208                keys = AvailableCollections::new_raw();
1209            }
1210        }
1211
1212        Ok(LoweredExpr {
1213            plan,
1214            keys,
1215            has_future_updates,
1216        })
1217    }
1218
1219    /// Lowers a `Reduce` with the given fields and an `mfp_on_top`, which is the MFP that is
1220    /// originally on top of the `Reduce`. This MFP, or a part of it, might be fused into the
1221    /// `Reduce`, in which case `mfp_on_top` is mutated to be the residual MFP, i.e., what was not
1222    /// fused.
1223    fn lower_reduce(
1224        &mut self,
1225        input: &MirRelationExpr,
1226        group_key: &Vec<MirScalarExpr>,
1227        aggregates: &Vec<AggregateExpr>,
1228        monotonic: &bool,
1229        expected_group_size: &Option<u64>,
1230        mfp_on_top: &mut MapFilterProject,
1231        fused_unnest_list: bool,
1232    ) -> Result<LoweredExpr, String> {
1233        let input_arity = input.arity();
1234        let LoweredExpr {
1235            plan: input,
1236            keys,
1237            has_future_updates: input_future,
1238        } = self.lower_mir_expr(input)?;
1239        let (input_key, permutation_and_new_arity) =
1240            if let Some((input_key, permutation, thinning)) = keys.arbitrary_arrangement() {
1241                (
1242                    Some(input_key.clone()),
1243                    Some((permutation.clone(), thinning.len() + input_key.len())),
1244                )
1245            } else {
1246                (None, None)
1247            };
1248        let key_val_plan = KeyValPlan::new(
1249            input_arity,
1250            group_key,
1251            aggregates,
1252            permutation_and_new_arity,
1253        );
1254        let reduce_plan = ReducePlan::create_from(
1255            aggregates.clone(),
1256            *monotonic,
1257            *expected_group_size,
1258            fused_unnest_list,
1259        );
1260        // Return the plan, and the keys it produces.
1261        let mfp_after;
1262        let output_arity;
1263        if self.enable_reduce_mfp_fusion {
1264            (mfp_after, *mfp_on_top, output_arity) =
1265                reduce_plan.extract_mfp_after(mfp_on_top.clone(), group_key.len());
1266        } else {
1267            (mfp_after, output_arity) = (
1268                MapFilterProject::new(mfp_on_top.input_arity),
1269                group_key.len() + aggregates.len(),
1270            );
1271        }
1272        soft_assert_eq_or_log!(
1273            mfp_on_top.input_arity,
1274            output_arity,
1275            "Output arity of reduce must match input arity for MFP on top of it"
1276        );
1277        let output_keys = reduce_plan.keys(group_key.len(), output_arity);
1278        let lir_id = self.allocate_lir_id();
1279        // `Reduce` builds its own input arrangement inside `render_reduce` (via `KeyValPlan`),
1280        // bypassing `ensure_collections`. So we can't piggy-back on an upstream `ArrangeBy`'s
1281        // strategy to request temporal bucketing on a temporal-MFP-fed input: there is no such
1282        // `ArrangeBy`. Instead we record the strategy directly on the `Reduce` node, and
1283        // `render_reduce` applies bucketing to the keyed `(key, val)` stream itself.
1284        let temporal_bucketing_strategy = strategy_from_future(input_future);
1285        // (This can't currently happen due to `extract_mfp_after` separating out any temporal part.)
1286        let has_future_updates = mfp_after.has_temporal_predicates();
1287        Ok(LoweredExpr {
1288            plan: PlanNode::Reduce {
1289                input_key,
1290                input: Box::new(input),
1291                key_val_plan,
1292                plan: reduce_plan,
1293                mfp_after,
1294                temporal_bucketing_strategy,
1295            }
1296            .as_plan(lir_id),
1297            keys: output_keys,
1298            has_future_updates,
1299        })
1300    }
1301
1302    /// Replace the plan with another one
1303    /// that has the collection in some additional forms.
1304    pub fn arrange_by(
1305        &mut self,
1306        plan: Plan,
1307        collections: AvailableCollections,
1308        old_collections: &AvailableCollections,
1309        arity: usize,
1310        has_future_updates: bool,
1311    ) -> Plan {
1312        if let Plan {
1313            node:
1314                PlanNode::ArrangeBy {
1315                    input_key,
1316                    input,
1317                    input_mfp,
1318                    mut forms,
1319                    strategy,
1320                },
1321            lir_id,
1322        } = plan
1323        {
1324            forms.raw |= collections.raw;
1325            forms.arranged.extend(collections.arranged);
1326            forms.arranged.sort_by(|k1, k2| k1.0.cmp(&k2.0));
1327            forms.arranged.dedup_by(|k1, k2| k1.0 == k2.0);
1328            PlanNode::ArrangeBy {
1329                input_key,
1330                input,
1331                input_mfp,
1332                forms,
1333                strategy,
1334            }
1335            .as_plan(lir_id)
1336        } else {
1337            let (input_key, input_mfp) = if let Some((input_key, permutation, thinning)) =
1338                old_collections.arbitrary_arrangement()
1339            {
1340                let mut mfp = MapFilterProject::new(arity);
1341                mfp.permute_fn(|c| permutation[c], thinning.len() + input_key.len());
1342                (Some(input_key.clone()), mfp)
1343            } else {
1344                (None, MapFilterProject::new(arity))
1345            };
1346            let lir_id = self.allocate_lir_id();
1347
1348            PlanNode::ArrangeBy {
1349                input_key,
1350                input: Box::new(plan),
1351                input_mfp,
1352                forms: collections,
1353                strategy: strategy_from_future(has_future_updates),
1354            }
1355            .as_plan(lir_id)
1356        }
1357    }
1358}
1359
1360/// Various bits of state to print along with error messages during LIR planning,
1361/// to aid debugging.
1362#[derive(Clone, Debug)]
1363pub struct LirDebugInfo {
1364    debug_name: String,
1365    id: GlobalId,
1366}
1367
1368impl std::fmt::Display for LirDebugInfo {
1369    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1370        write!(f, "Debug name: {}; id: {}", self.debug_name, self.id)
1371    }
1372}