mz_compute_types/plan/lowering.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Lowering [`DataflowDescription`]s from MIR ([`MirRelationExpr`]) to LIR ([`Plan`]).
11
12use std::collections::{BTreeMap, BTreeSet};
13
14use columnar::Len;
15use itertools::Itertools;
16use mz_expr::JoinImplementation::{DeltaQuery, Differential, IndexedFilter, Unimplemented};
17use mz_expr::{
18 AggregateExpr, Columns, Id, JoinInputMapper, MapFilterProject, MirRelationExpr, MirScalarExpr,
19 OptimizedMirRelationExpr, TableFunc, permutation_for_arrangement,
20};
21use mz_ore::{assert_none, soft_assert_eq_or_log, soft_panic_or_log};
22use mz_repr::optimize::OptimizerFeatures;
23use mz_repr::{GlobalId, Timestamp};
24
25use crate::dataflows::{BuildDesc, DataflowDescription, IndexImport};
26use crate::plan::join::{DeltaJoinPlan, JoinPlan, LinearJoinPlan};
27use crate::plan::reduce::{KeyValPlan, ReducePlan};
28use crate::plan::threshold::ThresholdPlan;
29use crate::plan::top_k::TopKPlan;
30use crate::plan::{ArrangementStrategy, AvailableCollections, GetPlan, LirId, Plan, PlanNode};
31
32/// Pick an [`ArrangementStrategy`] based on whether the input may contain future-stamped
33/// updates. Future updates are the only case where temporal bucketing pays off.
34///
35/// Any arrangement or consolidation that absorbs data that can have future updates should be
36/// guarded by a temporal bucketing operator.
37fn strategy_from_future(has_future_updates: bool) -> ArrangementStrategy {
38 if has_future_updates {
39 ArrangementStrategy::TemporalBucketing
40 } else {
41 ArrangementStrategy::Direct
42 }
43}
44
45/// The result of lowering a [`MirRelationExpr`] to a [`Plan`].
46struct LoweredExpr {
47 /// The lowered plan.
48 plan: Plan,
49 /// The arrangement keys that the plan is certain to produce.
50 keys: AvailableCollections,
51 /// Whether the plan's output may contain updates at future timestamps,
52 /// e.g., from a temporal MFP using `mz_now()`.
53 has_future_updates: bool,
54}
55
56pub(super) struct Context {
57 /// Known bindings to (possibly arranged) collections.
58 arrangements: BTreeMap<Id, AvailableCollections>,
59 /// Ids whose collections may contain updates at future timestamps,
60 /// e.g., from a temporal MFP using `mz_now()`.
61 has_future_updates: BTreeSet<Id>,
62 /// Tracks the next available `LirId`.
63 next_lir_id: LirId,
64 /// Information to print along with error messages.
65 debug_info: LirDebugInfo,
66 /// Whether to enable fusion of MFPs in reductions.
67 enable_reduce_mfp_fusion: bool,
68}
69
70impl Context {
71 pub fn new(debug_name: String, features: &OptimizerFeatures) -> Self {
72 Self {
73 arrangements: Default::default(),
74 has_future_updates: Default::default(),
75 next_lir_id: LirId(1),
76 debug_info: LirDebugInfo {
77 debug_name,
78 id: GlobalId::Transient(0),
79 },
80 enable_reduce_mfp_fusion: features.enable_reduce_mfp_fusion,
81 }
82 }
83
84 fn allocate_lir_id(&mut self) -> LirId {
85 let id = self.next_lir_id;
86 self.next_lir_id = LirId(
87 self.next_lir_id
88 .0
89 .checked_add(1)
90 .expect("No LirId overflow"),
91 );
92 id
93 }
94
95 pub fn lower(
96 mut self,
97 desc: DataflowDescription<OptimizedMirRelationExpr>,
98 ) -> Result<DataflowDescription<Plan>, String> {
99 // Sources might provide arranged forms of their data, in the future.
100 // Indexes provide arranged forms of their data.
101 for IndexImport {
102 desc: index_desc,
103 typ,
104 ..
105 } in desc.index_imports.values()
106 {
107 let key = index_desc.key.clone();
108 // TODO[btv] - We should be told the permutation by
109 // `index_desc`, and it should have been generated
110 // at the same point the thinning logic was.
111 //
112 // We should for sure do that soon, but it requires
113 // a bit of a refactor, so for now we just
114 // _assume_ that they were both generated by `permutation_for_arrangement`,
115 // and recover it here.
116 let (permutation, thinning) = permutation_for_arrangement(&key, typ.arity());
117 let index_keys = self
118 .arrangements
119 .entry(Id::Global(index_desc.on_id))
120 .or_insert_with(AvailableCollections::default);
121 index_keys.arranged.push((key, permutation, thinning));
122 }
123 for id in desc.source_imports.keys() {
124 self.arrangements
125 .entry(Id::Global(*id))
126 .or_insert_with(AvailableCollections::new_raw);
127 }
128
129 // Build each object in order, registering the arrangements it forms.
130 let mut objects_to_build = Vec::with_capacity(desc.objects_to_build.len());
131 for build in desc.objects_to_build {
132 self.debug_info.id = build.id;
133 let LoweredExpr {
134 plan,
135 keys,
136 has_future_updates,
137 } = self.lower_mir_expr(&build.plan)?;
138
139 self.arrangements.insert(Id::Global(build.id), keys);
140 if has_future_updates {
141 self.has_future_updates.insert(Id::Global(build.id));
142 }
143 objects_to_build.push(BuildDesc { id: build.id, plan });
144 }
145
146 Ok(DataflowDescription {
147 source_imports: desc.source_imports,
148 index_imports: desc.index_imports,
149 objects_to_build,
150 index_exports: desc.index_exports,
151 sink_exports: desc.sink_exports,
152 as_of: desc.as_of,
153 until: desc.until,
154 initial_storage_as_of: desc.initial_storage_as_of,
155 refresh_schedule: desc.refresh_schedule,
156 debug_name: desc.debug_name,
157 time_dependence: desc.time_dependence,
158 })
159 }
160
161 /// This method converts a MirRelationExpr into a plan that can be directly rendered.
162 ///
163 /// The rough structure is that we repeatedly extract map/filter/project operators
164 /// from each expression we see, bundle them up as a `MapFilterProject` object, and
165 /// then produce a plan for the combination of that with the next operator.
166 ///
167 /// The method accesses `self.arrangements`, which it will locally add to and remove from for
168 /// `Let` bindings (by the end of the call it should contain the same bindings as when it
169 /// started).
170 ///
171 /// The result of the method is both a `Plan`, but also a list of arrangements that
172 /// are certain to be produced, which can be relied on by the next steps in the plan.
173 /// Each of the arrangement keys is associated with an MFP that must be applied if that
174 /// arrangement is used, to back out the permutation associated with that arrangement.
175 ///
176 /// An empty list of arrangement keys indicates that only a `Collection` stream can
177 /// be assumed to exist.
178 fn lower_mir_expr(&mut self, expr: &MirRelationExpr) -> Result<LoweredExpr, String> {
179 // This function is recursive and can overflow its stack, so grow it if
180 // needed. The growth here is unbounded. Our general solution for this problem
181 // is to use [`ore::stack::RecursionGuard`] to additionally limit the stack
182 // depth. That however requires upstream error handling. This function is
183 // currently called by the Coordinator after calls to `catalog_transact`,
184 // and thus are not allowed to fail. Until that allows errors, we choose
185 // to allow the unbounded growth here. We are though somewhat protected by
186 // higher levels enforcing their own limits on stack depth (in the parser,
187 // transformer/desugarer, and planner).
188 mz_ore::stack::maybe_grow(|| self.lower_mir_expr_stack_safe(expr))
189 }
190
191 fn lower_mir_expr_stack_safe(&mut self, expr: &MirRelationExpr) -> Result<LoweredExpr, String> {
192 // Extract a maximally large MapFilterProject from `expr`.
193 // We will then try and push this in to the resulting expression.
194 //
195 // Importantly, `mfp` may contain temporal operators and not be a "safe" MFP.
196 // While we would eventually like all plan stages to be able to absorb such
197 // general operators, not all of them can.
198 let (mut mfp, expr) = MapFilterProject::extract_from_expression(expr);
199 // We attempt to plan what we have remaining, in the context of `mfp`.
200 // We may not be able to do this, and must wrap some operators with a `Mfp` stage.
201 let LoweredExpr {
202 mut plan,
203 mut keys,
204 mut has_future_updates,
205 } = match expr {
206 // These operators should have been extracted from the expression.
207 MirRelationExpr::Map { .. } => {
208 panic!("This operator should have been extracted");
209 }
210 MirRelationExpr::Filter { .. } => {
211 panic!("This operator should have been extracted");
212 }
213 MirRelationExpr::Project { .. } => {
214 panic!("This operator should have been extracted");
215 }
216 // These operators may not have been extracted, and need to result in a `Plan`.
217 MirRelationExpr::Constant { rows, typ: _ } => {
218 let lir_id = self.allocate_lir_id();
219 let node = PlanNode::Constant {
220 rows: rows.clone().map(|rows| {
221 rows.into_iter()
222 .map(|(row, diff)| (row, Timestamp::MIN, diff))
223 .collect()
224 }),
225 };
226 // The plan, not arranged in any way.
227 LoweredExpr {
228 plan: node.as_plan(lir_id),
229 keys: AvailableCollections::new_raw(),
230 has_future_updates: false,
231 }
232 }
233 MirRelationExpr::Get { id, typ: _, .. } => {
234 // This stage can absorb arbitrary MFP operators.
235 let mut mfp = mfp.take();
236 // If `mfp` is the identity, we can surface all imported arrangements.
237 // Otherwise, we apply `mfp` and promise no arrangements.
238 let mut in_keys = self
239 .arrangements
240 .get(id)
241 .cloned()
242 .unwrap_or_else(AvailableCollections::new_raw);
243
244 // Seek out an arrangement key that might be constrained to a literal.
245 // Note: this code has very little use nowadays, as its job was mostly taken over
246 // by `LiteralConstraints` (see in the below longer comment).
247 let key_val = in_keys
248 .arranged
249 .iter()
250 .filter_map(|key| {
251 mfp.literal_constraints(&key.0)
252 .map(|val| (key.clone(), val))
253 })
254 .max_by_key(|(key, _val)| key.0.len());
255
256 // Determine the plan of action for the `Get` stage.
257 let plan = if let Some(((key, permutation, thinning), val)) = &key_val {
258 // This code path used to handle looking up literals from indexes, but it's
259 // mostly deprecated, as this is nowadays performed by the `LiteralConstraints`
260 // MIR transform instead. However, it's still called in a couple of tricky
261 // special cases:
262 // - `LiteralConstraints` handles only Gets of global ids, so this code still
263 // gets to handle Filters on top of Gets of local ids.
264 // - Lowering does a `MapFilterProject::extract_from_expression`, while
265 // `LiteralConstraints` does
266 // `MapFilterProject::extract_non_errors_from_expr_mut`.
267 // - It might happen that new literal constraint optimization opportunities
268 // appear somewhere near the end of the MIR optimizer after
269 // `LiteralConstraints` has already run.
270 // (Also note that a similar literal constraint handling machinery is also
271 // present when handling the leftover MFP after this big match.)
272 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
273 in_keys.arranged = vec![(key.clone(), permutation.clone(), thinning.clone())];
274 GetPlan::Arrangement(key.clone(), Some(val.clone()), mfp)
275 } else if !mfp.is_identity() {
276 // We need to ensure a collection exists, which means we must form it.
277 if let Some((key, permutation, thinning)) =
278 in_keys.arbitrary_arrangement().cloned()
279 {
280 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
281 in_keys.arranged = vec![(key.clone(), permutation, thinning)];
282 GetPlan::Arrangement(key, None, mfp)
283 } else {
284 GetPlan::Collection(mfp)
285 }
286 } else {
287 // By default, just pass input arrangements through.
288 GetPlan::PassArrangements
289 };
290
291 let out_keys = if let GetPlan::PassArrangements = plan {
292 in_keys.clone()
293 } else {
294 AvailableCollections::new_raw()
295 };
296
297 // Even with a non-temporal MFP, we must propagate `has_future_updates`
298 // from the underlying binding — applying an MFP doesn't drop future-
299 // timestamped updates that already exist on the input.
300 //
301 // Note that global Gets from different dataflows can't have future updates, because
302 // both indexes and materialized views hold back future updates.
303 let has_future_updates = self.has_future_updates.contains(id)
304 || match &plan {
305 GetPlan::Arrangement(_, _, mfp) | GetPlan::Collection(mfp) => {
306 mfp.has_temporal_predicates()
307 }
308 GetPlan::PassArrangements => false,
309 };
310
311 let lir_id = self.allocate_lir_id();
312 let node = PlanNode::Get {
313 id: id.clone(),
314 keys: in_keys,
315 plan,
316 };
317 // Return the plan, and any keys if an identity `mfp`.
318 LoweredExpr {
319 plan: node.as_plan(lir_id),
320 keys: out_keys,
321 has_future_updates,
322 }
323 }
324 MirRelationExpr::Let { id, value, body } => {
325 // It would be unfortunate to have a non-trivial `mfp` here, as we hope
326 // that they would be pushed down. I am not sure if we should take the
327 // initiative to push down the `mfp` ourselves.
328
329 // Plan the value using only the initial arrangements, but
330 // introduce any resulting arrangements bound to `id`.
331 let LoweredExpr {
332 plan: value,
333 keys: v_keys,
334 has_future_updates: v_future,
335 } = self.lower_mir_expr(value)?;
336 let pre_existing = self.arrangements.insert(Id::Local(*id), v_keys);
337 assert_none!(pre_existing);
338 if v_future {
339 self.has_future_updates.insert(Id::Local(*id));
340 }
341 // Plan the body using initial and `value` arrangements,
342 // and then remove reference to the value arrangements.
343 let LoweredExpr {
344 plan: body,
345 keys: b_keys,
346 has_future_updates: b_future,
347 } = self.lower_mir_expr(body)?;
348 self.arrangements.remove(&Id::Local(*id));
349 self.has_future_updates.remove(&Id::Local(*id));
350 // Return the plan, and any `body` arrangements.
351 let lir_id = self.allocate_lir_id();
352 LoweredExpr {
353 plan: PlanNode::Let {
354 id: id.clone(),
355 value: Box::new(value),
356 body: Box::new(body),
357 }
358 .as_plan(lir_id),
359 keys: b_keys,
360 has_future_updates: b_future,
361 }
362 }
363 MirRelationExpr::LetRec {
364 ids,
365 values,
366 limits,
367 body,
368 } => {
369 assert_eq!(ids.len(), values.len());
370 assert_eq!(ids.len(), limits.len());
371 // Plan the values using only the available arrangements, but
372 // introduce any resulting arrangements bound to each `id`.
373 // Arrangements made available cannot be used by prior bindings,
374 // as we cannot circulate an arrangement through a `Variable` yet.
375 let mut lir_values = Vec::with_capacity(values.len());
376 let mut any_v_future = false;
377 for (id, value) in ids.iter().zip_eq(values) {
378 let LoweredExpr {
379 plan: mut lir_value,
380 keys: mut v_keys,
381 has_future_updates: v_future,
382 } = self.lower_mir_expr(value)?;
383 any_v_future |= v_future;
384 // If `v_keys` does not contain an unarranged collection, we must form it.
385 if !v_keys.raw {
386 // Choose an "arbitrary" arrangement; TODO: prefer a specific one.
387 let (input_key, permutation, thinning) =
388 v_keys.arbitrary_arrangement().unwrap();
389 let mut input_mfp = MapFilterProject::new(value.arity());
390 input_mfp.permute_fn(|c| permutation[c], thinning.len() + input_key.len());
391 let input_key = Some(input_key.clone());
392
393 let forms = AvailableCollections::new_raw();
394
395 // We just want to insert an `ArrangeBy` to form an unarranged collection,
396 // but there is a complication: We shouldn't break the invariant (created by
397 // `NormalizeLets`, and relied upon by the rendering) that there isn't
398 // anything between two `LetRec`s. So if `lir_value` is itself a `LetRec`,
399 // then we insert the `ArrangeBy` on the `body` of the inner `LetRec`,
400 // instead of on top of the inner `LetRec`.
401 //
402 // We forward `v_future` for honesty; bucketing has no observable effect
403 // inside an iterative scope, but the field should reflect reality.
404 lir_value = match lir_value {
405 Plan {
406 node:
407 PlanNode::LetRec {
408 ids,
409 values,
410 limits,
411 body,
412 },
413 lir_id,
414 } => {
415 let inner_lir_id = self.allocate_lir_id();
416 PlanNode::LetRec {
417 ids,
418 values,
419 limits,
420 body: Box::new(
421 PlanNode::ArrangeBy {
422 input_key,
423 input: body,
424 input_mfp,
425 forms,
426 strategy: strategy_from_future(v_future),
427 }
428 .as_plan(inner_lir_id),
429 ),
430 }
431 .as_plan(lir_id)
432 }
433 lir_value => {
434 let lir_id = self.allocate_lir_id();
435 PlanNode::ArrangeBy {
436 input_key,
437 input: Box::new(lir_value),
438 input_mfp,
439 forms,
440 strategy: strategy_from_future(v_future),
441 }
442 .as_plan(lir_id)
443 }
444 };
445 v_keys.raw = true;
446 }
447 let pre_existing = self.arrangements.insert(Id::Local(*id), v_keys);
448 assert_none!(pre_existing);
449 if v_future {
450 self.has_future_updates.insert(Id::Local(*id));
451 }
452 lir_values.push(lir_value);
453 }
454 // As we exit the iterative scope, we must leave all arrangements behind,
455 // as they reference a timestamp coordinate that must be stripped off.
456 for id in ids.iter() {
457 self.arrangements
458 .insert(Id::Local(*id), AvailableCollections::new_raw());
459 }
460 // Plan the body using initial and `value` arrangements,
461 // and then remove reference to the value arrangements.
462 let LoweredExpr {
463 plan: body,
464 keys: b_keys,
465 has_future_updates: b_future,
466 } = self.lower_mir_expr(body)?;
467 for id in ids.iter() {
468 self.arrangements.remove(&Id::Local(*id));
469 self.has_future_updates.remove(&Id::Local(*id));
470 }
471 // Return the plan, and any `body` arrangements.
472 //
473 // The body's `b_future` alone can under-report: an earlier binding may only
474 // inherit `has_future_updates` via a Variable to a *later* binding, which the
475 // sequential sweep can't observe at the time the earlier binding is lowered.
476 // A precise fix would require a fixpoint (or the MIR `Analysis` framework with
477 // a `true ⊑ false` lattice). As a cheap correct alternative, OR with the
478 // bindings' future flags: any cross-binding propagation must originate from a
479 // local temporal predicate inside *some* binding, so the OR captures it
480 // without forcing bucketing on a fully non-temporal LetRec.
481 let lir_id = self.allocate_lir_id();
482 LoweredExpr {
483 plan: PlanNode::LetRec {
484 ids: ids.clone(),
485 values: lir_values,
486 limits: limits.clone(),
487 body: Box::new(body),
488 }
489 .as_plan(lir_id),
490 keys: b_keys,
491 has_future_updates: b_future || any_v_future,
492 }
493 }
494 MirRelationExpr::FlatMap {
495 input: flat_map_input,
496 func,
497 exprs,
498 } => {
499 // A `FlatMap UnnestList` that comes after the `Reduce` of a window function can be
500 // fused into the lowered `Reduce`.
501 //
502 // In theory, we could have implemented this also as an MIR transform. However, this
503 // is more of a physical optimization, which are sometimes unpleasant to make a part
504 // of the MIR pipeline. The specific problem here with putting this into the MIR
505 // pipeline would be that we'd need to modify MIR's semantics: MIR's Reduce
506 // currently always emits exactly 1 row per group, but the fused Reduce-FlatMap can
507 // emit multiple rows per group. Such semantic changes of MIR are very scary, since
508 // various parts of the optimizer assume that Reduce emits only 1 row per group, and
509 // it would be very hard to hunt down all these parts. (For example, key inference
510 // infers the group key as a unique key.)
511 let fused_with_reduce = 'fusion: {
512 if !matches!(func, TableFunc::UnnestList { .. }) {
513 break 'fusion None;
514 }
515 // We might have a Project of a single col between the FlatMap and the
516 // Reduce. (It projects away the grouping keys of the Reduce, and keeps the
517 // result of the window function.)
518 let (maybe_reduce, num_grouping_keys) = if let MirRelationExpr::Project {
519 input: project_input,
520 outputs: projection,
521 } = &**flat_map_input
522 {
523 // We want this to be a single column, because we'll want to deal with only
524 // one aggregation in the `Reduce`. (The aggregation of a window function
525 // always stands alone currently: we plan them separately from other
526 // aggregations, and Reduces are never fused. When window functions are
527 // fused with each other, they end up in one aggregation. When there are
528 // multiple window functions in the same SELECT, but can't be fused, they
529 // end up in different Reduces.)
530 if let &[single_col] = &**projection {
531 (project_input, single_col)
532 } else {
533 break 'fusion None;
534 }
535 } else {
536 (flat_map_input, 0)
537 };
538 if let MirRelationExpr::Reduce {
539 input,
540 group_key,
541 aggregates,
542 monotonic,
543 expected_group_size,
544 } = &**maybe_reduce
545 {
546 if group_key.len() != num_grouping_keys
547 || aggregates.len() != 1
548 || !aggregates[0].func.can_fuse_with_unnest_list()
549 {
550 break 'fusion None;
551 }
552 // At the beginning, `non_fused_mfp_above_flat_map` will be the original MFP
553 // above the FlatMap. Later, we'll mutate this to be the residual MFP that
554 // didn't get fused into the `Reduce`.
555 let non_fused_mfp_above_flat_map = &mut mfp;
556 let reduce_output_arity = num_grouping_keys + 1;
557 // We are fusing away the list that the FlatMap would have been unnesting,
558 // so the column that had that list disappears, so we have to permute the
559 // MFP above the FlatMap with this column disappearance.
560 let tweaked_mfp = {
561 let mut mfp = non_fused_mfp_above_flat_map.clone();
562 if mfp.demand().contains(&0) {
563 // I don't think this can happen currently that this MFP would
564 // refer to the list column, because both the list column and the
565 // MFP were constructed by the HIR-to-MIR lowering, so it's not just
566 // some random MFP that we are seeing here. But anyhow, it's better
567 // to check this here for robustness against future code changes.
568 break 'fusion None;
569 }
570 let permutation: BTreeMap<_, _> =
571 (1..mfp.input_arity).map(|col| (col, col - 1)).collect();
572 mfp.permute_fn(|c| permutation[&c], mfp.input_arity - 1);
573 mfp
574 };
575 // We now put together the project that was before the FlatMap, and the
576 // tweaked version of the MFP that was after the FlatMap.
577 // (Part of this MFP might be fused into the Reduce.)
578 let mut project_and_tweaked_mfp = {
579 let mut mfp = MapFilterProject::new(reduce_output_arity);
580 mfp = mfp.project(vec![num_grouping_keys]);
581 mfp = MapFilterProject::compose(mfp, tweaked_mfp);
582 mfp
583 };
584 let fused = self.lower_reduce(
585 input,
586 group_key,
587 aggregates,
588 monotonic,
589 expected_group_size,
590 &mut project_and_tweaked_mfp,
591 true,
592 )?;
593 // Update the residual MFP.
594 *non_fused_mfp_above_flat_map = project_and_tweaked_mfp;
595 Some(fused)
596 } else {
597 break 'fusion None;
598 }
599 };
600 if let Some(fused_with_reduce) = fused_with_reduce {
601 fused_with_reduce
602 } else {
603 // Couldn't fuse it with a `Reduce`, so lower as a normal `FlatMap`.
604 let LoweredExpr {
605 plan: input,
606 keys,
607 has_future_updates: input_future,
608 } = self.lower_mir_expr(flat_map_input)?;
609 // This stage can absorb arbitrary MFP instances.
610 let mut mfp = mfp.take();
611 let mut exprs = exprs.clone();
612 // Prefer the unarranged collection when present: it presents input columns
613 // in logical order, so no permutation is required.
614 let input_key = if keys.raw {
615 None
616 } else if let Some((k, permutation, thinning)) = keys.arbitrary_arrangement() {
617 // Reading from this arrangement exposes input columns in arrangement
618 // order (key columns followed by thinned value columns). We must
619 // permute every reference to an input column accordingly: the
620 // `expr`s feeding the table function arguments, and the `mfp` running
621 // after the table function (which still references input columns at
622 // positions `0..input_arity`).
623 //
624 // The renderer hands the `mfp` the *whole* arranged row and appends the
625 // table-function output after it. The arranged row can be wider than the
626 // logical input row when the key is not a set of distinct columns (an
627 // expression, functional, or repeated-column key carries extra key
628 // values). So the table-function output columns at positions
629 // `input_arity..` must be shifted to land after the arranged row, and the
630 // `mfp`'s new input arity must reflect the arranged width.
631 for expr in &mut exprs {
632 expr.permute(permutation);
633 }
634 let input_arity = permutation.len();
635 let arranged_arity = thinning.len() + k.len();
636 let output_arity = mfp.input_arity - input_arity;
637 mfp.permute_fn(
638 |c| {
639 if c < input_arity {
640 permutation[c]
641 } else {
642 arranged_arity + (c - input_arity)
643 }
644 },
645 arranged_arity + output_arity,
646 );
647 Some(k.clone())
648 } else {
649 None
650 };
651
652 let lir_id = self.allocate_lir_id();
653 // The absorbed `mfp` may contain temporal predicates, which can
654 // introduce future-stamped updates that aren't present on the input.
655 let has_future_updates = input_future || mfp.has_temporal_predicates();
656 // Return the plan, and no arrangements.
657 LoweredExpr {
658 plan: PlanNode::FlatMap {
659 input_key,
660 input: Box::new(input),
661 exprs,
662 func: func.clone(),
663 mfp_after: mfp,
664 }
665 .as_plan(lir_id),
666 keys: AvailableCollections::new_raw(),
667 has_future_updates,
668 }
669 }
670 }
671 MirRelationExpr::Join {
672 inputs,
673 equivalences,
674 implementation,
675 } => {
676 // Plan each of the join inputs independently.
677 // The `plans` get surfaced upwards, and the `input_keys` should
678 // be used as part of join planning / to validate the existing
679 // plans / to aid in indexed seeding of update streams.
680 let mut plans = Vec::new();
681 let mut input_keys = Vec::new();
682 let mut input_arities = Vec::new();
683 let mut input_futures = Vec::new();
684 for input in inputs.iter() {
685 let LoweredExpr {
686 plan,
687 keys,
688 has_future_updates: input_future,
689 } = self.lower_mir_expr(input)?;
690 input_arities.push(input.arity());
691 plans.push(plan);
692 input_keys.push(keys);
693 input_futures.push(input_future);
694 }
695 let any_input_future = input_futures.iter().any(|&f| f);
696
697 let input_mapper =
698 JoinInputMapper::new_from_input_arities(input_arities.iter().copied());
699
700 // Extract temporal predicates as joins cannot currently absorb them.
701 let (plan, missing) = match implementation {
702 IndexedFilter(_coll_id, _idx_id, key, _val) => {
703 // Start with the constant input. (This used to be important before database-issues#4016
704 // was fixed.)
705 let start: usize = 1;
706 let order = vec![(0usize, key.clone(), None)];
707 // All columns of the constant input will be part of the arrangement key.
708 let source_arrangement = (
709 (0..key.len())
710 .map(MirScalarExpr::column)
711 .collect::<Vec<_>>(),
712 (0..key.len()).collect::<Vec<_>>(),
713 Vec::<usize>::new(),
714 );
715 let (ljp, missing) = LinearJoinPlan::create_from(
716 start,
717 Some(&source_arrangement),
718 equivalences,
719 &order,
720 input_mapper,
721 &mut mfp,
722 &input_keys,
723 );
724 (JoinPlan::Linear(ljp), missing)
725 }
726 Differential((start, start_arr, _start_characteristic), order) => {
727 let source_arrangement = start_arr.as_ref().and_then(|key| {
728 input_keys[*start]
729 .arranged
730 .iter()
731 .find(|(k, _, _)| k == key)
732 .clone()
733 });
734 let (ljp, missing) = LinearJoinPlan::create_from(
735 *start,
736 source_arrangement,
737 equivalences,
738 order,
739 input_mapper,
740 &mut mfp,
741 &input_keys,
742 );
743 (JoinPlan::Linear(ljp), missing)
744 }
745 DeltaQuery(orders) => {
746 let (djp, missing) = DeltaJoinPlan::create_from(
747 equivalences,
748 orders,
749 input_mapper,
750 &mut mfp,
751 &input_keys,
752 );
753 (JoinPlan::Delta(djp), missing)
754 }
755 // Other plans are errors, and should be reported as such.
756 Unimplemented => return Err("unimplemented join".to_string()),
757 };
758 // The renderer will expect certain arrangements to exist; if any of those are not available, the join planning functions above should have returned them in
759 // `missing`. We thus need to plan them here so they'll exist.
760 let is_delta = matches!(plan, JoinPlan::Delta(_));
761 for ((((input_plan, input_keys), missing), arity), input_future) in plans
762 .iter_mut()
763 .zip_eq(input_keys.iter())
764 .zip_eq(missing)
765 .zip_eq(input_arities.iter().cloned())
766 .zip_eq(input_futures.iter().copied())
767 {
768 if missing != Default::default() {
769 if is_delta {
770 // join_implementation.rs produced a sub-optimal plan here;
771 // we shouldn't plan delta joins at all if not all of the required
772 // arrangements are available. Soft panic in CI and log an error in
773 // production to increase the chances that we will catch all situations
774 // that violate this constraint.
775 soft_panic_or_log!("Arrangements depended on by delta join alarmingly absent: {:?}
776Dataflow info: {}
777This is not expected to cause incorrect results, but could indicate a performance issue in Materialize.", missing, self.debug_info);
778 } else {
779 soft_panic_or_log!("Arrangements depended on by a non-delta join are absent: {:?}
780Dataflow info: {}
781This is not expected to cause incorrect results, but could indicate a performance issue in Materialize.", missing, self.debug_info);
782 // Nowadays MIR transforms take care to insert MIR ArrangeBys for each
783 // Join input. (Earlier, they were missing in the following cases:
784 // - They were const-folded away for constant inputs. This is not
785 // happening since
786 // https://github.com/MaterializeInc/materialize/pull/16351
787 // - They were not being inserted for the constant input of
788 // `IndexedFilter`s. This was fixed in
789 // https://github.com/MaterializeInc/materialize/pull/20920
790 // - They were not being inserted for the first input of Differential
791 // joins. This was fixed in
792 // https://github.com/MaterializeInc/materialize/pull/16099)
793 }
794 let lir_id = self.allocate_lir_id();
795 let raw_plan = std::mem::replace(
796 input_plan,
797 PlanNode::Constant {
798 rows: Ok(Vec::new()),
799 }
800 .as_plan(lir_id),
801 );
802 *input_plan =
803 self.arrange_by(raw_plan, missing, input_keys, arity, input_future);
804 }
805 }
806 // Return the plan, and no arrangements.
807 // Both linear and delta join planning extract temporal predicates back into the
808 // residual `mfp` (see `LinearJoinPlan::create_from` / `DeltaJoinPlan::create_from`),
809 // so the absorbed MFP cannot introduce future updates — the join's output future
810 // flag is just the OR of its inputs.
811 let lir_id = self.allocate_lir_id();
812 LoweredExpr {
813 plan: PlanNode::Join {
814 inputs: plans,
815 plan,
816 }
817 .as_plan(lir_id),
818 keys: AvailableCollections::new_raw(),
819 has_future_updates: any_input_future,
820 }
821 }
822 MirRelationExpr::Reduce {
823 input,
824 group_key,
825 aggregates,
826 monotonic,
827 expected_group_size,
828 } => {
829 if aggregates
830 .iter()
831 .any(|agg| agg.func.can_fuse_with_unnest_list())
832 {
833 // This case should have been handled at the `MirRelationExpr::FlatMap` case
834 // above. But that has a pretty complicated pattern matching, so it's not
835 // unthinkable that it fails.
836 soft_panic_or_log!(
837 "Window function performance issue: `reduce_unnest_list_fusion` failed"
838 );
839 }
840 self.lower_reduce(
841 input,
842 group_key,
843 aggregates,
844 monotonic,
845 expected_group_size,
846 &mut mfp,
847 false,
848 )?
849 }
850 MirRelationExpr::TopK {
851 input,
852 group_key,
853 order_key,
854 limit,
855 offset,
856 monotonic,
857 expected_group_size,
858 } => {
859 let arity = input.arity();
860 let LoweredExpr {
861 plan: input,
862 keys,
863 has_future_updates: input_future,
864 } = self.lower_mir_expr(input)?;
865
866 let top_k_plan = TopKPlan::create_from(
867 group_key.clone(),
868 order_key.clone(),
869 *offset,
870 limit.clone(),
871 arity,
872 *monotonic,
873 *expected_group_size,
874 );
875
876 // We don't have an MFP here -- install an operator to permute the
877 // input, if necessary.
878 let input = if !keys.raw {
879 self.arrange_by(
880 input,
881 AvailableCollections::new_raw(),
882 &keys,
883 arity,
884 // `new_raw` means no arrangement, so no bucketing is needed
885 false,
886 )
887 } else {
888 input
889 };
890 // Return the plan, and no arrangements.
891 let temporal_bucketing_strategy = strategy_from_future(input_future);
892 let lir_id = self.allocate_lir_id();
893 LoweredExpr {
894 plan: PlanNode::TopK {
895 input: Box::new(input),
896 top_k_plan,
897 temporal_bucketing_strategy,
898 }
899 .as_plan(lir_id),
900 keys: AvailableCollections::new_raw(),
901 has_future_updates: false,
902 }
903 }
904 MirRelationExpr::Negate { input } => {
905 let arity = input.arity();
906 let LoweredExpr {
907 plan: input,
908 keys,
909 has_future_updates: input_future,
910 } = self.lower_mir_expr(input)?;
911
912 // We don't have an MFP here -- install an operator to permute the
913 // input, if necessary.
914 let input = if !keys.raw {
915 self.arrange_by(
916 input,
917 AvailableCollections::new_raw(),
918 &keys,
919 arity,
920 // `new_raw` means no arrangement, so no bucketing is needed
921 false,
922 )
923 } else {
924 input
925 };
926 // Return the plan, and no arrangements.
927 let lir_id = self.allocate_lir_id();
928 LoweredExpr {
929 plan: PlanNode::Negate {
930 input: Box::new(input),
931 }
932 .as_plan(lir_id),
933 keys: AvailableCollections::new_raw(),
934 has_future_updates: input_future,
935 }
936 }
937 MirRelationExpr::Threshold { input } => {
938 let LoweredExpr {
939 plan,
940 keys,
941 has_future_updates: input_future,
942 } = self.lower_mir_expr(input)?;
943 let arity = input.arity();
944 let (threshold_plan, required_arrangement) = ThresholdPlan::create_from(arity);
945
946 let plan = if !keys
947 .arranged
948 .iter()
949 .any(|(key, _, _)| key == &required_arrangement.0)
950 {
951 self.arrange_by(
952 plan,
953 AvailableCollections::new_arranged(vec![required_arrangement]),
954 &keys,
955 arity,
956 input_future,
957 )
958 } else {
959 plan
960 };
961
962 let output_keys = threshold_plan.keys();
963 // Return the plan, and any produced keys.
964 let lir_id = self.allocate_lir_id();
965 LoweredExpr {
966 plan: PlanNode::Threshold {
967 input: Box::new(plan),
968 threshold_plan,
969 }
970 .as_plan(lir_id),
971 keys: output_keys,
972 // Threshold builds its own output arrangement whose
973 // MergeBatcher absorbs future-stamped updates, so no
974 // future updates flow out.
975 has_future_updates: false,
976 }
977 }
978 MirRelationExpr::Union { base, inputs } => {
979 let arity = base.arity();
980 let mut lowered_inputs = Vec::with_capacity(1 + inputs.len());
981 lowered_inputs.push(self.lower_mir_expr(base)?);
982 for input in inputs.iter() {
983 lowered_inputs.push(self.lower_mir_expr(input)?);
984 }
985
986 // A Union with any `Negate` input should consolidate its
987 // output. The lowering is the only place where this decision
988 // can be coupled with the per-input bucketing strategy.
989 let consolidate_output = lowered_inputs
990 .iter()
991 .any(|l| matches!(l.plan.node, PlanNode::Negate { .. }));
992
993 // Per-input bucketing strategies: only meaningful when the
994 // Union consolidates its output, since bucketing only pays off
995 // ahead of a downstream consolidator.
996 let temporal_bucketing_strategies: Vec<ArrangementStrategy> = if consolidate_output
997 {
998 lowered_inputs
999 .iter()
1000 .map(|l| strategy_from_future(l.has_future_updates))
1001 .collect()
1002 } else {
1003 lowered_inputs
1004 .iter()
1005 .map(|_| ArrangementStrategy::Direct)
1006 .collect()
1007 };
1008
1009 let has_future_updates = if consolidate_output {
1010 // The MergeBatcher will hold back future updates (regardless of whether we are
1011 // bucketing here or not).
1012 false
1013 } else {
1014 lowered_inputs.iter().any(|l| l.has_future_updates)
1015 };
1016
1017 let plans = lowered_inputs
1018 .into_iter()
1019 .map(
1020 |LoweredExpr {
1021 plan,
1022 keys,
1023 has_future_updates: _,
1024 }| {
1025 // We don't have an MFP here -- install an operator to permute the
1026 // input, if necessary.
1027 if !keys.raw {
1028 self.arrange_by(
1029 plan,
1030 AvailableCollections::new_raw(),
1031 &keys,
1032 arity,
1033 // `new_raw` means no arrangement, so no bucketing is needed
1034 false,
1035 )
1036 } else {
1037 plan
1038 }
1039 },
1040 )
1041 .collect();
1042 // Return the plan and no arrangements.
1043 let lir_id = self.allocate_lir_id();
1044 LoweredExpr {
1045 plan: PlanNode::Union {
1046 inputs: plans,
1047 consolidate_output,
1048 temporal_bucketing_strategies,
1049 }
1050 .as_plan(lir_id),
1051 keys: AvailableCollections::new_raw(),
1052 has_future_updates,
1053 }
1054 }
1055 MirRelationExpr::ArrangeBy { input, keys } => {
1056 let input_mir = input;
1057 let LoweredExpr {
1058 plan: input,
1059 keys: mut input_keys,
1060 has_future_updates: input_has_future_updates,
1061 } = self.lower_mir_expr(input)?;
1062 // Fill the `types` in `input_keys` if not already present.
1063 let arity = input_mir.arity();
1064
1065 // Determine keys that are not present in `input_keys`.
1066 let new_keys = keys
1067 .iter()
1068 .filter(|k1| !input_keys.arranged.iter().any(|(k2, _, _)| k1 == &k2))
1069 .cloned()
1070 .collect::<Vec<_>>();
1071 if new_keys.is_empty() {
1072 LoweredExpr {
1073 plan: input,
1074 keys: input_keys,
1075 has_future_updates: input_has_future_updates,
1076 }
1077 } else {
1078 let mut new_keys = new_keys
1079 .iter()
1080 .cloned()
1081 .map(|k| {
1082 let (permutation, thinning) = permutation_for_arrangement(&k, arity);
1083 (k, permutation, thinning)
1084 })
1085 .collect::<Vec<_>>();
1086 let forms = AvailableCollections {
1087 raw: input_keys.raw,
1088 arranged: new_keys.clone(),
1089 };
1090 let (input_key, input_mfp) = if let Some((input_key, permutation, thinning)) =
1091 input_keys.arbitrary_arrangement()
1092 {
1093 let mut mfp = MapFilterProject::new(arity);
1094 mfp.permute_fn(|c| permutation[c], thinning.len() + input_key.len());
1095 (Some(input_key.clone()), mfp)
1096 } else {
1097 (None, MapFilterProject::new(arity))
1098 };
1099 input_keys.arranged.append(&mut new_keys);
1100 input_keys.arranged.sort_by(|k1, k2| k1.0.cmp(&k2.0));
1101
1102 // Return the plan and extended keys.
1103 let lir_id = self.allocate_lir_id();
1104 let strategy = strategy_from_future(input_has_future_updates);
1105 assert!(!forms.arranged.is_empty()); // i.e., we do build an arrangement
1106 let has_future_updates = false;
1107 LoweredExpr {
1108 plan: PlanNode::ArrangeBy {
1109 input_key,
1110 input: Box::new(input),
1111 input_mfp,
1112 forms,
1113 strategy,
1114 }
1115 .as_plan(lir_id),
1116 keys: input_keys,
1117 has_future_updates,
1118 }
1119 }
1120 }
1121 };
1122
1123 // If the plan stage did not absorb all linear operators, introduce a new stage to implement them.
1124 if !mfp.is_identity() {
1125 // Check if this MFP introduces future updates.
1126 let mfp_is_temporal = mfp.has_temporal_predicates();
1127 has_future_updates = has_future_updates || mfp_is_temporal;
1128 // Seek out an arrangement key that might be constrained to a literal.
1129 // TODO: Improve key selection heuristic.
1130 let key_val = keys
1131 .arranged
1132 .iter()
1133 .filter_map(|(key, permutation, thinning)| {
1134 let mut mfp = mfp.clone();
1135 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
1136 mfp.literal_constraints(key)
1137 .map(|val| (key.clone(), permutation, thinning, val))
1138 })
1139 .max_by_key(|(key, _, _, _)| key.len());
1140
1141 // Input key selection strategy:
1142 // (1) If we can read a key at a particular value, do so
1143 // (2) Otherwise, if there is a key that causes the MFP to be the identity, and
1144 // therefore allows us to avoid discarding the arrangement, use that.
1145 // (3) Otherwise, if there is _some_ key, use that,
1146 // (4) Otherwise just read the raw collection.
1147 let input_key_val = if let Some((key, permutation, thinning, val)) = key_val {
1148 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
1149
1150 Some((key, Some(val)))
1151 } else if let Some((key, permutation, thinning)) =
1152 keys.arranged.iter().find(|(key, permutation, thinning)| {
1153 let mut mfp = mfp.clone();
1154 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
1155 mfp.is_identity()
1156 })
1157 {
1158 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
1159 Some((key.clone(), None))
1160 } else if let Some((key, permutation, thinning)) = keys.arbitrary_arrangement() {
1161 mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
1162 Some((key.clone(), None))
1163 } else {
1164 None
1165 };
1166
1167 if mfp.is_identity() {
1168 // We have discovered a key
1169 // whose permutation causes the MFP to actually
1170 // be the identity! We can keep it around,
1171 // but without its permutation this time,
1172 // and with a trivial thinning of the right length.
1173 let (key, val) = input_key_val.unwrap();
1174 let (_old_key, old_permutation, old_thinning) = keys
1175 .arranged
1176 .iter_mut()
1177 .find(|(key2, _, _)| key2 == &key)
1178 .unwrap();
1179 *old_permutation = (0..mfp.input_arity).collect();
1180 let old_thinned_arity = old_thinning.len();
1181 *old_thinning = (0..old_thinned_arity).collect();
1182 // Get rid of all other forms, as this is now the only one known to be valid.
1183 // TODO[btv] we can probably save the other arrangements too, if we adjust their permutations.
1184 // This is not hard to do, but leaving it for a quick follow-up to avoid making the present diff too unwieldy.
1185 keys.arranged.retain(|(key2, _, _)| key2 == &key);
1186 keys.raw = false;
1187
1188 // Creating a Plan::Mfp node is now logically unnecessary, but we
1189 // should do so anyway when `val` is populated, so that
1190 // the `key_val` optimization gets applied.
1191 let lir_id = self.allocate_lir_id();
1192 if val.is_some() {
1193 plan = PlanNode::Mfp {
1194 input: Box::new(plan),
1195 mfp,
1196 input_key_val: Some((key, val)),
1197 }
1198 .as_plan(lir_id)
1199 }
1200 } else {
1201 let lir_id = self.allocate_lir_id();
1202 plan = PlanNode::Mfp {
1203 input: Box::new(plan),
1204 mfp,
1205 input_key_val,
1206 }
1207 .as_plan(lir_id);
1208 keys = AvailableCollections::new_raw();
1209 }
1210 }
1211
1212 Ok(LoweredExpr {
1213 plan,
1214 keys,
1215 has_future_updates,
1216 })
1217 }
1218
1219 /// Lowers a `Reduce` with the given fields and an `mfp_on_top`, which is the MFP that is
1220 /// originally on top of the `Reduce`. This MFP, or a part of it, might be fused into the
1221 /// `Reduce`, in which case `mfp_on_top` is mutated to be the residual MFP, i.e., what was not
1222 /// fused.
1223 fn lower_reduce(
1224 &mut self,
1225 input: &MirRelationExpr,
1226 group_key: &Vec<MirScalarExpr>,
1227 aggregates: &Vec<AggregateExpr>,
1228 monotonic: &bool,
1229 expected_group_size: &Option<u64>,
1230 mfp_on_top: &mut MapFilterProject,
1231 fused_unnest_list: bool,
1232 ) -> Result<LoweredExpr, String> {
1233 let input_arity = input.arity();
1234 let LoweredExpr {
1235 plan: input,
1236 keys,
1237 has_future_updates: input_future,
1238 } = self.lower_mir_expr(input)?;
1239 let (input_key, permutation_and_new_arity) =
1240 if let Some((input_key, permutation, thinning)) = keys.arbitrary_arrangement() {
1241 (
1242 Some(input_key.clone()),
1243 Some((permutation.clone(), thinning.len() + input_key.len())),
1244 )
1245 } else {
1246 (None, None)
1247 };
1248 let key_val_plan = KeyValPlan::new(
1249 input_arity,
1250 group_key,
1251 aggregates,
1252 permutation_and_new_arity,
1253 );
1254 let reduce_plan = ReducePlan::create_from(
1255 aggregates.clone(),
1256 *monotonic,
1257 *expected_group_size,
1258 fused_unnest_list,
1259 );
1260 // Return the plan, and the keys it produces.
1261 let mfp_after;
1262 let output_arity;
1263 if self.enable_reduce_mfp_fusion {
1264 (mfp_after, *mfp_on_top, output_arity) =
1265 reduce_plan.extract_mfp_after(mfp_on_top.clone(), group_key.len());
1266 } else {
1267 (mfp_after, output_arity) = (
1268 MapFilterProject::new(mfp_on_top.input_arity),
1269 group_key.len() + aggregates.len(),
1270 );
1271 }
1272 soft_assert_eq_or_log!(
1273 mfp_on_top.input_arity,
1274 output_arity,
1275 "Output arity of reduce must match input arity for MFP on top of it"
1276 );
1277 let output_keys = reduce_plan.keys(group_key.len(), output_arity);
1278 let lir_id = self.allocate_lir_id();
1279 // `Reduce` builds its own input arrangement inside `render_reduce` (via `KeyValPlan`),
1280 // bypassing `ensure_collections`. So we can't piggy-back on an upstream `ArrangeBy`'s
1281 // strategy to request temporal bucketing on a temporal-MFP-fed input: there is no such
1282 // `ArrangeBy`. Instead we record the strategy directly on the `Reduce` node, and
1283 // `render_reduce` applies bucketing to the keyed `(key, val)` stream itself.
1284 let temporal_bucketing_strategy = strategy_from_future(input_future);
1285 // (This can't currently happen due to `extract_mfp_after` separating out any temporal part.)
1286 let has_future_updates = mfp_after.has_temporal_predicates();
1287 Ok(LoweredExpr {
1288 plan: PlanNode::Reduce {
1289 input_key,
1290 input: Box::new(input),
1291 key_val_plan,
1292 plan: reduce_plan,
1293 mfp_after,
1294 temporal_bucketing_strategy,
1295 }
1296 .as_plan(lir_id),
1297 keys: output_keys,
1298 has_future_updates,
1299 })
1300 }
1301
1302 /// Replace the plan with another one
1303 /// that has the collection in some additional forms.
1304 pub fn arrange_by(
1305 &mut self,
1306 plan: Plan,
1307 collections: AvailableCollections,
1308 old_collections: &AvailableCollections,
1309 arity: usize,
1310 has_future_updates: bool,
1311 ) -> Plan {
1312 if let Plan {
1313 node:
1314 PlanNode::ArrangeBy {
1315 input_key,
1316 input,
1317 input_mfp,
1318 mut forms,
1319 strategy,
1320 },
1321 lir_id,
1322 } = plan
1323 {
1324 forms.raw |= collections.raw;
1325 forms.arranged.extend(collections.arranged);
1326 forms.arranged.sort_by(|k1, k2| k1.0.cmp(&k2.0));
1327 forms.arranged.dedup_by(|k1, k2| k1.0 == k2.0);
1328 PlanNode::ArrangeBy {
1329 input_key,
1330 input,
1331 input_mfp,
1332 forms,
1333 strategy,
1334 }
1335 .as_plan(lir_id)
1336 } else {
1337 let (input_key, input_mfp) = if let Some((input_key, permutation, thinning)) =
1338 old_collections.arbitrary_arrangement()
1339 {
1340 let mut mfp = MapFilterProject::new(arity);
1341 mfp.permute_fn(|c| permutation[c], thinning.len() + input_key.len());
1342 (Some(input_key.clone()), mfp)
1343 } else {
1344 (None, MapFilterProject::new(arity))
1345 };
1346 let lir_id = self.allocate_lir_id();
1347
1348 PlanNode::ArrangeBy {
1349 input_key,
1350 input: Box::new(plan),
1351 input_mfp,
1352 forms: collections,
1353 strategy: strategy_from_future(has_future_updates),
1354 }
1355 .as_plan(lir_id)
1356 }
1357 }
1358}
1359
1360/// Various bits of state to print along with error messages during LIR planning,
1361/// to aid debugging.
1362#[derive(Clone, Debug)]
1363pub struct LirDebugInfo {
1364 debug_name: String,
1365 id: GlobalId,
1366}
1367
1368impl std::fmt::Display for LirDebugInfo {
1369 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1370 write!(f, "Debug name: {}; id: {}", self.debug_name, self.id)
1371 }
1372}