Skip to main content

mz_compute_types/explain/
text.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! `EXPLAIN ... AS TEXT` support for LIR structures.
11//!
12//! The format adheres to the following conventions:
13//! 1. In general, every line that starts with an uppercase character
14//!    corresponds to a [`Plan`] variant.
15//! 2. Whenever the variant has an attached `~Plan`, the printed name is
16//!    `$V::$P` where `$V` identifies the variant and `$P` the plan.
17//! 3. The fields of a `~Plan` struct attached to a [`Plan`] are rendered as if
18//!    they were part of the variant themself.
19//! 4. Non-recursive parameters of each sub-plan are written as `$key=$val`
20//!    pairs on the same line or as lowercase `$key` fields on indented lines.
21//! 5. A single non-recursive parameter can be written just as `$val`.
22
23use std::fmt;
24use std::ops::Deref;
25
26use itertools::Itertools;
27use mz_expr::explain::{HumanizedExplain, HumanizerMode, fmt_text_constant_rows};
28use mz_expr::{Id, MirScalarExpr};
29use mz_ore::soft_assert_or_log;
30use mz_ore::str::{IndentLike, StrExt, separated};
31use mz_repr::explain::text::DisplayText;
32use mz_repr::explain::{
33    CompactScalarSeq, CompactScalars, ExplainConfig, ExprHumanizer, Indices, PlanRenderingContext,
34};
35
36use crate::plan::join::delta_join::{DeltaPathPlan, DeltaStagePlan};
37use crate::plan::join::linear_join::LinearStagePlan;
38use crate::plan::join::{DeltaJoinPlan, JoinClosure, LinearJoinPlan};
39use crate::plan::reduce::{
40    AccumulablePlan, BasicPlan, BucketedPlan, HierarchicalPlan, MonotonicPlan, SingleBasicPlan,
41};
42use crate::plan::threshold::ThresholdPlan;
43use crate::plan::{ArrangementStrategy, AvailableCollections, LirId, Plan, PlanNode};
44
45impl DisplayText<PlanRenderingContext<'_, Plan>> for Plan {
46    fn fmt_text(
47        &self,
48        f: &mut fmt::Formatter<'_>,
49        ctx: &mut PlanRenderingContext<'_, Plan>,
50    ) -> fmt::Result {
51        if ctx.config.verbose_syntax {
52            self.fmt_verbose_text(f, ctx)
53        } else {
54            self.fmt_default_text(f, ctx)
55        }
56    }
57}
58
59impl Plan {
60    // NOTE: This code needs to be kept in sync with the `Display` instance for
61    // `RenderPlan:ExprHumanizer`.
62    //
63    // This code determines what you see in `EXPLAIN`; that other code
64    // determine what you see when you run `mz_lir_mapping`.
65    fn fmt_default_text(
66        &self,
67        f: &mut fmt::Formatter<'_>,
68        ctx: &mut PlanRenderingContext<'_, Plan>,
69    ) -> fmt::Result {
70        use PlanNode::*;
71
72        let mode = HumanizedExplain::new(ctx.config.redacted);
73        let annotations = PlanAnnotations::new(ctx.config.clone(), self);
74
75        match &self.node {
76            Constant { rows } => {
77                write!(f, "{}→Constant ", ctx.indent)?;
78
79                match rows {
80                    Ok(rows) => write!(
81                        f,
82                        "({} row{})",
83                        rows.len(),
84                        if rows.len() == 1 { "" } else { "s" }
85                    )?,
86                    Err(err) => {
87                        if mode.redacted() {
88                            write!(f, "(error: █)")?;
89                        } else {
90                            write!(f, "(error: {})", err.to_string().quoted(),)?;
91                        }
92                    }
93                }
94
95                writeln!(f, "{annotations}")?;
96            }
97            Get { id, keys, plan } => {
98                ctx.indent.set(); // mark the current indent level
99
100                // Resolve the id as a string.
101                let id = match id {
102                    Id::Local(id) => id.to_string(),
103                    Id::Global(id) => ctx
104                        .humanizer
105                        .humanize_id(*id)
106                        .unwrap_or_else(|| id.to_string()),
107                };
108                // Render plan-specific fields.
109                use crate::plan::GetPlan;
110                match plan {
111                    GetPlan::PassArrangements => {
112                        if keys.raw && keys.arranged.is_empty() {
113                            writeln!(f, "{}→Stream {id}{annotations}", ctx.indent)?;
114                        } else {
115                            // we're not reporting on whether or not `raw` is set
116                            // we're not reporting on how many arrangements there are
117                            writeln!(f, "{}→Arranged {id}{annotations}", ctx.indent)?;
118                        }
119                    }
120                    GetPlan::Arrangement(key, Some(val), mfp) => {
121                        if !mfp.is_identity() {
122                            writeln!(f, "{}→Fused with Child Map/Filter/Project", ctx.indent)?;
123                            ctx.indent += 1;
124                            mode.expr(mfp, None).fmt_default_text(f, ctx)?;
125                            ctx.indent += 1;
126                        }
127
128                        writeln!(f, "{}→Index Lookup on {id}{annotations}", ctx.indent)?;
129                        ctx.indent += 1;
130                        let key = CompactScalars(mode.seq(key, None));
131                        write!(f, "{}Key: ({key}) ", ctx.indent)?;
132                        let val = mode.expr(val, None);
133                        writeln!(f, "Value: {val}")?;
134                    }
135                    GetPlan::Arrangement(key, None, mfp) => {
136                        if !mfp.is_identity() {
137                            writeln!(f, "{}→Fused with Child Map/Filter/Project", ctx.indent)?;
138                            ctx.indent += 1;
139                            mode.expr(mfp, None).fmt_default_text(f, ctx)?;
140                            ctx.indent += 1;
141                        }
142
143                        writeln!(f, "{}→Arranged {id}{annotations}", ctx.indent)?;
144                        ctx.indent += 1;
145                        let key = CompactScalars(mode.seq(key, None));
146                        writeln!(f, "{}Key: ({key})", ctx.indent)?;
147                    }
148                    GetPlan::Collection(mfp) => {
149                        if !mfp.is_identity() {
150                            writeln!(f, "{}→Fused with Child Map/Filter/Project", ctx.indent)?;
151                            ctx.indent += 1;
152                            mode.expr(mfp, None).fmt_default_text(f, ctx)?;
153                            ctx.indent += 1;
154                        }
155
156                        writeln!(f, "{}→Read {id}{annotations}", ctx.indent)?;
157                    }
158                }
159                ctx.indent.reset(); // reset the original indent level
160            }
161            Let { id, value, body } => {
162                let mut bindings = vec![(id, value.as_ref())];
163                let mut head = body.as_ref();
164
165                // Render Let-blocks nested in the body an outer Let-block in one step
166                // with a flattened list of bindings
167                while let Let { id, value, body } = &head.node {
168                    bindings.push((id, value.as_ref()));
169                    head = body.as_ref();
170                }
171
172                writeln!(f, "{}→With", ctx.indent)?;
173                ctx.indented(|ctx| {
174                    for (id, value) in bindings.iter() {
175                        writeln!(f, "{}cte {} =", ctx.indent, *id)?;
176                        ctx.indented(|ctx| value.fmt_text(f, ctx))?;
177                    }
178                    Ok(())
179                })?;
180                writeln!(f, "{}→Return{annotations}", ctx.indent)?;
181                ctx.indented(|ctx| head.fmt_text(f, ctx))?;
182            }
183            LetRec {
184                ids,
185                values,
186                limits,
187                body,
188            } => {
189                let head = body.as_ref();
190
191                writeln!(f, "{}→With Mutually Recursive", ctx.indent)?;
192                ctx.indented(|ctx| {
193                    let bindings = ids.iter().zip_eq(values).zip_eq(limits);
194                    for ((id, value), limit) in bindings {
195                        if let Some(limit) = limit {
196                            writeln!(f, "{}cte {} {} =", ctx.indent, limit, *id)?;
197                        } else {
198                            writeln!(f, "{}cte {} =", ctx.indent, *id)?;
199                        }
200                        ctx.indented(|ctx| value.fmt_text(f, ctx))?;
201                    }
202                    Ok(())
203                })?;
204                writeln!(f, "{}→Return{annotations}", ctx.indent)?;
205                ctx.indented(|ctx| head.fmt_text(f, ctx))?;
206            }
207            Mfp {
208                input,
209                mfp,
210                input_key_val: _,
211            } => {
212                writeln!(f, "{}→Map/Filter/Project{annotations}", ctx.indent)?;
213                ctx.indent.set();
214
215                ctx.indent += 1;
216                mode.expr(mfp, None).fmt_default_text(f, ctx)?;
217
218                // one more nesting level if we showed anything for the MFP
219                if !mfp.is_identity() {
220                    ctx.indent += 1;
221                }
222                input.fmt_text(f, ctx)?;
223                ctx.indent.reset();
224            }
225            FlatMap {
226                input_key: _,
227                input,
228                exprs,
229                func,
230                mfp_after,
231            } => {
232                ctx.indent.set();
233                if !mfp_after.expressions.is_empty() || !mfp_after.predicates.is_empty() {
234                    writeln!(f, "{}→Fused with Child Map/Filter/Project", ctx.indent)?;
235                    ctx.indent += 1;
236                    mode.expr(mfp_after, None).fmt_default_text(f, ctx)?;
237                    ctx.indent += 1;
238                }
239
240                let exprs = mode.seq(exprs, None);
241                let exprs = CompactScalars(exprs);
242                writeln!(
243                    f,
244                    "{}→Table Function {func}({exprs}){annotations}",
245                    ctx.indent
246                )?;
247                ctx.indent += 1;
248
249                input.fmt_text(f, ctx)?;
250
251                ctx.indent.reset();
252            }
253            Join { inputs, plan } => {
254                use crate::plan::join::JoinPlan;
255                match plan {
256                    JoinPlan::Linear(plan) => {
257                        let label = if plan.has_cross_stage() {
258                            "→Differential Cross Join"
259                        } else {
260                            "→Differential Join"
261                        };
262                        write!(f, "{}{label} ", ctx.indent)?;
263                        fmt_join_chain(
264                            f,
265                            ctx.humanizer,
266                            &mode,
267                            inputs,
268                            plan.source_relation,
269                            plan.source_key.as_ref(),
270                            plan.stage_plans
271                                .iter()
272                                .map(|s| (s.lookup_relation, &s.lookup_key)),
273                        )?;
274                        writeln!(f, "{annotations}")?;
275                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
276                    }
277                    JoinPlan::Delta(plan) => {
278                        let label = if plan.has_cross_stage() {
279                            "→Delta Cross Join"
280                        } else {
281                            "→Delta Join"
282                        };
283                        write!(f, "{}{label}", ctx.indent)?;
284                        for dpp in &plan.path_plans {
285                            write!(f, " [")?;
286                            fmt_join_chain(
287                                f,
288                                ctx.humanizer,
289                                &mode,
290                                inputs,
291                                dpp.source_relation,
292                                Some(&dpp.source_key),
293                                dpp.stage_plans
294                                    .iter()
295                                    .map(|s| (s.lookup_relation, &s.lookup_key)),
296                            )?;
297                            write!(f, "]")?;
298                        }
299                        writeln!(f, "{annotations}")?;
300                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
301                    }
302                }
303
304                ctx.indented(|ctx| {
305                    for input in inputs {
306                        input.fmt_text(f, ctx)?;
307                    }
308                    Ok(())
309                })?;
310            }
311            Reduce {
312                input_key: _,
313                input,
314                key_val_plan,
315                plan,
316                mfp_after,
317                temporal_bucketing_strategy,
318            } => {
319                ctx.indent.set();
320                if !mfp_after.expressions.is_empty() || !mfp_after.predicates.is_empty() {
321                    writeln!(f, "{}→Fused with Child Map/Filter/Project", ctx.indent)?;
322                    ctx.indent += 1;
323                    mode.expr(mfp_after, None).fmt_default_text(f, ctx)?;
324                    ctx.indent += 1;
325                }
326
327                let temporally_bucketed = matches!(
328                    temporal_bucketing_strategy,
329                    ArrangementStrategy::TemporalBucketing
330                );
331
332                use crate::plan::reduce::ReducePlan;
333                match plan {
334                    ReducePlan::Distinct => {
335                        write!(f, "{}→", ctx.indent)?;
336                        if temporally_bucketed {
337                            write!(f, "Temporally-Bucketed ")?;
338                        }
339                        writeln!(f, "Distinct GroupAggregate{annotations}")?;
340                    }
341                    ReducePlan::Accumulable(plan) => {
342                        write!(f, "{}→", ctx.indent)?;
343                        if temporally_bucketed {
344                            write!(f, "Temporally-Bucketed ")?;
345                        }
346                        writeln!(f, "Accumulable GroupAggregate{annotations}")?;
347                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
348                    }
349                    ReducePlan::Hierarchical(
350                        plan @ HierarchicalPlan::Bucketed(BucketedPlan { buckets, .. }),
351                    ) => {
352                        write!(f, "{}→", ctx.indent)?;
353                        if temporally_bucketed {
354                            write!(f, "Temporally-Bucketed ")?;
355                        }
356                        write!(f, "Bucketed Hierarchical GroupAggregate (buckets:")?;
357                        for bucket in buckets {
358                            write!(f, " {bucket}")?;
359                        }
360                        writeln!(f, "){annotations}")?;
361                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
362                    }
363                    ReducePlan::Hierarchical(
364                        plan @ HierarchicalPlan::Monotonic(MonotonicPlan {
365                            must_consolidate, ..
366                        }),
367                    ) => {
368                        write!(f, "{}→", ctx.indent)?;
369                        if temporally_bucketed {
370                            write!(f, "Temporally-Bucketed ")?;
371                        }
372                        if *must_consolidate {
373                            write!(f, "Consolidating ")?;
374                        }
375                        writeln!(f, "Monotonic GroupAggregate{annotations}",)?;
376                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
377                    }
378                    ReducePlan::Basic(plan) => {
379                        ctx.indent.set();
380                        if let BasicPlan::Single(SingleBasicPlan {
381                            fused_unnest_list, ..
382                        }) = &plan
383                        {
384                            if *fused_unnest_list {
385                                writeln!(
386                                    f,
387                                    "{}→Fused with Child Table Function unnest_list",
388                                    ctx.indent
389                                )?;
390                                ctx.indent += 1;
391                            }
392                        }
393                        write!(f, "{}→", ctx.indent)?;
394                        if temporally_bucketed {
395                            write!(f, "Temporally-Bucketed ")?;
396                        }
397                        writeln!(f, "Non-incremental GroupAggregate{annotations}")?;
398                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
399                        ctx.indent.reset();
400                    }
401                }
402
403                ctx.indented(|ctx| {
404                    let kvp = key_val_plan.key_plan.deref();
405                    if !kvp.is_identity() {
406                        writeln!(f, "{}Key:", ctx.indent)?;
407                        ctx.indented(|ctx| {
408                            let key_plan = mode.expr(kvp, None);
409                            key_plan.fmt_default_text(f, ctx)
410                        })?;
411                    }
412
413                    input.fmt_text(f, ctx)
414                })?;
415
416                ctx.indent.reset();
417            }
418            TopK {
419                input,
420                top_k_plan,
421                temporal_bucketing_strategy,
422            } => {
423                let temporally_bucketed = matches!(
424                    temporal_bucketing_strategy,
425                    ArrangementStrategy::TemporalBucketing
426                );
427                use crate::plan::top_k::TopKPlan;
428                match top_k_plan {
429                    TopKPlan::MonotonicTop1(plan) => {
430                        write!(f, "{}→", ctx.indent)?;
431                        if temporally_bucketed {
432                            write!(f, "Temporally-Bucketed ")?;
433                        }
434                        if plan.must_consolidate {
435                            write!(f, "Consolidating ")?;
436                        }
437                        writeln!(f, "Monotonic Top1{annotations}")?;
438
439                        ctx.indented(|ctx| {
440                            if plan.group_key.len() > 0 {
441                                let group_by = CompactScalars(mode.seq(&plan.group_key, None));
442                                writeln!(f, "{}Group By {group_by}", ctx.indent)?;
443                            }
444                            if plan.order_key.len() > 0 {
445                                let order_by = separated(", ", mode.seq(&plan.order_key, None));
446                                writeln!(f, "{}Order By {order_by}", ctx.indent)?;
447                            }
448                            Ok(())
449                        })?;
450                    }
451                    TopKPlan::MonotonicTopK(plan) => {
452                        write!(f, "{}→", ctx.indent)?;
453                        if temporally_bucketed {
454                            write!(f, "Temporally-Bucketed ")?;
455                        }
456                        if plan.must_consolidate {
457                            write!(f, "Consolidating ")?;
458                        }
459                        writeln!(f, "Monotonic TopK{annotations}")?;
460
461                        ctx.indented(|ctx| {
462                            if plan.group_key.len() > 0 {
463                                let group_by = CompactScalars(mode.seq(&plan.group_key, None));
464                                writeln!(f, "{}Group By {group_by}", ctx.indent)?;
465                            }
466                            if plan.order_key.len() > 0 {
467                                let order_by = separated(", ", mode.seq(&plan.order_key, None));
468                                writeln!(f, "{}Order By {order_by}", ctx.indent)?;
469                            }
470                            if let Some(limit) = &plan.limit {
471                                let limit = mode.expr(limit, None);
472                                writeln!(f, "{}Limit {limit}", ctx.indent)?;
473                            }
474                            Ok(())
475                        })?;
476                    }
477                    TopKPlan::Basic(plan) => {
478                        write!(f, "{}→", ctx.indent)?;
479                        if temporally_bucketed {
480                            write!(f, "Temporally-Bucketed ")?;
481                        }
482                        writeln!(f, "Non-monotonic TopK{annotations}")?;
483
484                        ctx.indented(|ctx| {
485                            if plan.group_key.len() > 0 {
486                                let group_by = CompactScalars(mode.seq(&plan.group_key, None));
487                                writeln!(f, "{}Group By {group_by}", ctx.indent)?;
488                            }
489                            if plan.order_key.len() > 0 {
490                                let order_by = separated(", ", mode.seq(&plan.order_key, None));
491                                writeln!(f, "{}Order By {order_by}", ctx.indent)?;
492                            }
493                            if let Some(limit) = &plan.limit {
494                                let limit = mode.expr(limit, None);
495                                writeln!(f, "{}Limit {limit}", ctx.indent)?;
496                            }
497                            if plan.offset != 0 {
498                                let offset = plan.offset;
499                                writeln!(f, "{}Offset {offset}", ctx.indent)?;
500                            }
501                            Ok(())
502                        })?;
503                    }
504                }
505
506                ctx.indented(|ctx| input.fmt_text(f, ctx))?;
507            }
508            Negate { input } => {
509                writeln!(f, "{}→Negate Diffs{annotations}", ctx.indent)?;
510
511                ctx.indented(|ctx| input.fmt_text(f, ctx))?;
512            }
513            Threshold {
514                input,
515                threshold_plan,
516            } => {
517                match threshold_plan {
518                    ThresholdPlan::Basic(plan) => {
519                        write!(f, "{}→Threshold Diffs ", ctx.indent)?;
520                        let ensure_arrangement = Arrangement::from(&plan.ensure_arrangement);
521                        ensure_arrangement.fmt_text(f, ctx)?;
522                        writeln!(f, "{annotations}")?;
523                    }
524                };
525
526                ctx.indented(|ctx| input.fmt_text(f, ctx))?;
527            }
528            Union {
529                inputs,
530                consolidate_output,
531                temporal_bucketing_strategies,
532            } => {
533                let any_temporally_bucketed = temporal_bucketing_strategies
534                    .iter()
535                    .any(|s| matches!(s, ArrangementStrategy::TemporalBucketing));
536                write!(f, "{}→", ctx.indent)?;
537                if any_temporally_bucketed {
538                    write!(f, "Temporally-Bucketed ")?;
539                }
540                if *consolidate_output {
541                    write!(f, "Consolidating ")?;
542                }
543                writeln!(f, "Union{annotations}")?;
544
545                ctx.indented(|ctx| {
546                    for input in inputs.iter() {
547                        input.fmt_text(f, ctx)?;
548                    }
549                    Ok(())
550                })?;
551            }
552            ArrangeBy {
553                input_key: _,
554                input,
555                input_mfp,
556                forms,
557                strategy: _,
558            } => {
559                ctx.indent.set();
560                if forms.raw && forms.arranged.is_empty() {
561                    soft_assert_or_log!(forms.raw, "raw stream with no arrangements");
562                    writeln!(f, "{}→Unarranged Raw Stream{annotations}", ctx.indent)?;
563                } else {
564                    write!(f, "{}→Arrange", ctx.indent)?;
565
566                    if !forms.arranged.is_empty() {
567                        let mode = HumanizedExplain::new(ctx.config.redacted);
568                        for (key, _, _) in &forms.arranged {
569                            if !key.is_empty() {
570                                let key = mode.seq(key, None);
571                                let key = CompactScalars(key);
572                                write!(f, " ({key})")?;
573                            } else {
574                                write!(f, " (empty key)")?;
575                            }
576                        }
577                    }
578                    writeln!(f, "{annotations}")?;
579                }
580
581                if !input_mfp.is_identity() {
582                    ctx.indent += 1;
583                    writeln!(f, "{}→Fused with Parent Map/Filter/Project", ctx.indent)?;
584                    ctx.indented(|ctx| mode.expr(input_mfp, None).fmt_default_text(f, ctx))?;
585                }
586
587                ctx.indent += 1;
588                input.fmt_text(f, ctx)?;
589                ctx.indent.reset();
590            }
591        }
592
593        Ok(())
594    }
595
596    fn fmt_verbose_text(
597        &self,
598        f: &mut fmt::Formatter<'_>,
599        ctx: &mut PlanRenderingContext<'_, Plan>,
600    ) -> fmt::Result {
601        use PlanNode::*;
602
603        let mode = HumanizedExplain::new(ctx.config.redacted);
604        let annotations = PlanAnnotations::new(ctx.config.clone(), self);
605
606        match &self.node {
607            Constant { rows } => match rows {
608                Ok(rows) => {
609                    if !rows.is_empty() {
610                        writeln!(f, "{}Constant{}", ctx.indent, annotations)?;
611                        ctx.indented(|ctx| {
612                            fmt_text_constant_rows(
613                                f,
614                                rows.iter().map(|(data, _, diff)| (data, diff)),
615                                &mut ctx.indent,
616                                ctx.config.redacted,
617                            )
618                        })?;
619                    } else {
620                        writeln!(f, "{}Constant <empty>{}", ctx.indent, annotations)?;
621                    }
622                }
623                Err(err) => {
624                    if mode.redacted() {
625                        writeln!(f, "{}Error █{}", ctx.indent, annotations)?;
626                    } else {
627                        {
628                            writeln!(
629                                f,
630                                "{}Error {}{}",
631                                ctx.indent,
632                                err.to_string().quoted(),
633                                annotations
634                            )?;
635                        }
636                    }
637                }
638            },
639
640            Get { id, keys, plan } => {
641                ctx.indent.set(); // mark the current indent level
642
643                // Resolve the id as a string.
644                let id = match id {
645                    Id::Local(id) => id.to_string(),
646                    Id::Global(id) => ctx
647                        .humanizer
648                        .humanize_id(*id)
649                        .unwrap_or_else(|| id.to_string()),
650                };
651                // Render plan-specific fields.
652                use crate::plan::GetPlan;
653                match plan {
654                    GetPlan::PassArrangements => {
655                        writeln!(
656                            f,
657                            "{}Get::PassArrangements {}{}",
658                            ctx.indent, id, annotations
659                        )?;
660                        ctx.indent += 1;
661                    }
662                    GetPlan::Arrangement(key, val, mfp) => {
663                        writeln!(f, "{}Get::Arrangement {}{}", ctx.indent, id, annotations)?;
664                        ctx.indent += 1;
665                        mode.expr(mfp, None).fmt_text(f, ctx)?;
666                        {
667                            let key = mode.seq(key, None);
668                            let key = CompactScalars(key);
669                            writeln!(f, "{}key={}", ctx.indent, key)?;
670                        }
671                        if let Some(val) = val {
672                            let val = mode.expr(val, None);
673                            writeln!(f, "{}val={}", ctx.indent, val)?;
674                        }
675                    }
676                    GetPlan::Collection(mfp) => {
677                        writeln!(f, "{}Get::Collection {}{}", ctx.indent, id, annotations)?;
678                        ctx.indent += 1;
679                        mode.expr(mfp, None).fmt_text(f, ctx)?;
680                    }
681                }
682
683                // Render plan-agnostic fields (common for all plans for this variant).
684                keys.fmt_text(f, ctx)?;
685
686                ctx.indent.reset(); // reset the original indent level
687            }
688            Let { id, value, body } => {
689                let mut bindings = vec![(id, value.as_ref())];
690                let mut head = body.as_ref();
691
692                // Render Let-blocks nested in the body an outer Let-block in one step
693                // with a flattened list of bindings
694                while let Let { id, value, body } = &head.node {
695                    bindings.push((id, value.as_ref()));
696                    head = body.as_ref();
697                }
698
699                writeln!(f, "{}With", ctx.indent)?;
700                ctx.indented(|ctx| {
701                    for (id, value) in bindings.iter() {
702                        writeln!(f, "{}cte {} =", ctx.indent, *id)?;
703                        ctx.indented(|ctx| value.fmt_text(f, ctx))?;
704                    }
705                    Ok(())
706                })?;
707                writeln!(f, "{}Return{}", ctx.indent, annotations)?;
708                ctx.indented(|ctx| head.fmt_text(f, ctx))?;
709            }
710            LetRec {
711                ids,
712                values,
713                limits,
714                body,
715            } => {
716                let head = body.as_ref();
717
718                writeln!(f, "{}With Mutually Recursive", ctx.indent)?;
719                ctx.indented(|ctx| {
720                    let bindings = ids.iter().zip_eq(values).zip_eq(limits);
721                    for ((id, value), limit) in bindings {
722                        if let Some(limit) = limit {
723                            writeln!(f, "{}cte {} {} =", ctx.indent, limit, *id)?;
724                        } else {
725                            writeln!(f, "{}cte {} =", ctx.indent, *id)?;
726                        }
727                        ctx.indented(|ctx| value.fmt_text(f, ctx))?;
728                    }
729                    Ok(())
730                })?;
731                writeln!(f, "{}Return{}", ctx.indent, annotations)?;
732                ctx.indented(|ctx| head.fmt_text(f, ctx))?;
733            }
734            Mfp {
735                input,
736                mfp,
737                input_key_val,
738            } => {
739                writeln!(f, "{}Mfp{}", ctx.indent, annotations)?;
740                ctx.indented(|ctx| {
741                    mode.expr(mfp, None).fmt_text(f, ctx)?;
742                    if let Some((key, val)) = input_key_val {
743                        {
744                            let key = mode.seq(key, None);
745                            let key = CompactScalars(key);
746                            writeln!(f, "{}input_key={}", ctx.indent, key)?;
747                        }
748                        if let Some(val) = val {
749                            let val = mode.expr(val, None);
750                            writeln!(f, "{}input_val={}", ctx.indent, val)?;
751                        }
752                    }
753                    input.fmt_text(f, ctx)
754                })?;
755            }
756            FlatMap {
757                input_key,
758                input,
759                exprs,
760                func,
761                mfp_after,
762            } => {
763                let exprs = mode.seq(exprs, None);
764                let exprs = CompactScalars(exprs);
765                writeln!(
766                    f,
767                    "{}FlatMap {}({}){}",
768                    ctx.indent, func, exprs, annotations
769                )?;
770                ctx.indented(|ctx| {
771                    if let Some(key) = input_key {
772                        let key = mode.seq(key, None);
773                        let key = CompactScalars(key);
774                        writeln!(f, "{}input_key={}", ctx.indent, key)?;
775                    }
776                    if !mfp_after.is_identity() {
777                        writeln!(f, "{}mfp_after", ctx.indent)?;
778                        ctx.indented(|ctx| mode.expr(mfp_after, None).fmt_text(f, ctx))?;
779                    }
780                    input.fmt_text(f, ctx)
781                })?;
782            }
783            Join { inputs, plan } => {
784                use crate::plan::join::JoinPlan;
785                match plan {
786                    JoinPlan::Linear(plan) => {
787                        writeln!(f, "{}Join::Linear{}", ctx.indent, annotations)?;
788                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
789                    }
790                    JoinPlan::Delta(plan) => {
791                        writeln!(f, "{}Join::Delta{}", ctx.indent, annotations)?;
792                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
793                    }
794                }
795                ctx.indented(|ctx| {
796                    for input in inputs {
797                        input.fmt_text(f, ctx)?;
798                    }
799                    Ok(())
800                })?;
801            }
802            Reduce {
803                input_key,
804                input,
805                key_val_plan,
806                plan,
807                mfp_after,
808                temporal_bucketing_strategy,
809            } => {
810                use crate::plan::reduce::ReducePlan;
811                match plan {
812                    ReducePlan::Distinct => {
813                        writeln!(f, "{}Reduce::Distinct{}", ctx.indent, annotations)?;
814                    }
815                    ReducePlan::Accumulable(plan) => {
816                        writeln!(f, "{}Reduce::Accumulable{}", ctx.indent, annotations)?;
817                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
818                    }
819                    ReducePlan::Hierarchical(plan) => {
820                        writeln!(f, "{}Reduce::Hierarchical{}", ctx.indent, annotations)?;
821                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
822                    }
823                    ReducePlan::Basic(plan) => {
824                        writeln!(f, "{}Reduce::Basic{}", ctx.indent, annotations)?;
825                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
826                    }
827                }
828                ctx.indented(|ctx| {
829                    if let Some(key) = input_key {
830                        let key = mode.seq(key, None);
831                        let key = CompactScalars(key);
832                        writeln!(f, "{}input_key={}", ctx.indent, key)?;
833                    }
834                    if !matches!(temporal_bucketing_strategy, ArrangementStrategy::Direct) {
835                        writeln!(
836                            f,
837                            "{}temporal_bucketing_strategy={}",
838                            ctx.indent, temporal_bucketing_strategy
839                        )?;
840                    }
841                    if key_val_plan.key_plan.deref().is_identity() {
842                        writeln!(f, "{}key_plan=id", ctx.indent)?;
843                    } else {
844                        writeln!(f, "{}key_plan", ctx.indent)?;
845                        ctx.indented(|ctx| {
846                            let key_plan = mode.expr(key_val_plan.key_plan.deref(), None);
847                            key_plan.fmt_text(f, ctx)
848                        })?;
849                    }
850                    if key_val_plan.val_plan.deref().is_identity() {
851                        writeln!(f, "{}val_plan=id", ctx.indent)?;
852                    } else {
853                        writeln!(f, "{}val_plan", ctx.indent)?;
854                        ctx.indented(|ctx| {
855                            let val_plan = mode.expr(key_val_plan.val_plan.deref(), None);
856                            val_plan.fmt_text(f, ctx)
857                        })?;
858                    }
859                    if !mfp_after.is_identity() {
860                        writeln!(f, "{}mfp_after", ctx.indent)?;
861                        ctx.indented(|ctx| mode.expr(mfp_after, None).fmt_text(f, ctx))?;
862                    }
863
864                    input.fmt_text(f, ctx)
865                })?;
866            }
867            TopK {
868                input,
869                top_k_plan,
870                temporal_bucketing_strategy,
871            } => {
872                use crate::plan::top_k::TopKPlan;
873                match top_k_plan {
874                    TopKPlan::MonotonicTop1(plan) => {
875                        write!(f, "{}TopK::MonotonicTop1", ctx.indent)?;
876                        if plan.group_key.len() > 0 {
877                            let group_by = mode.seq(&plan.group_key, None);
878                            let group_by = CompactScalars(group_by);
879                            write!(f, " group_by=[{}]", group_by)?;
880                        }
881                        if plan.order_key.len() > 0 {
882                            let order_by = mode.seq(&plan.order_key, None);
883                            let order_by = separated(", ", order_by);
884                            write!(f, " order_by=[{}]", order_by)?;
885                        }
886                        if plan.must_consolidate {
887                            write!(f, " must_consolidate")?;
888                        }
889                    }
890                    TopKPlan::MonotonicTopK(plan) => {
891                        write!(f, "{}TopK::MonotonicTopK", ctx.indent)?;
892                        if plan.group_key.len() > 0 {
893                            let group_by = mode.seq(&plan.group_key, None);
894                            let group_by = CompactScalars(group_by);
895                            write!(f, " group_by=[{}]", group_by)?;
896                        }
897                        if plan.order_key.len() > 0 {
898                            let order_by = mode.seq(&plan.order_key, None);
899                            let order_by = separated(", ", order_by);
900                            write!(f, " order_by=[{}]", order_by)?;
901                        }
902                        if let Some(limit) = &plan.limit {
903                            let limit = mode.expr(limit, None);
904                            write!(f, " limit={}", limit)?;
905                        }
906                        if plan.must_consolidate {
907                            write!(f, " must_consolidate")?;
908                        }
909                    }
910                    TopKPlan::Basic(plan) => {
911                        write!(f, "{}TopK::Basic", ctx.indent)?;
912                        if plan.group_key.len() > 0 {
913                            let group_by = mode.seq(&plan.group_key, None);
914                            let group_by = CompactScalars(group_by);
915                            write!(f, " group_by=[{}]", group_by)?;
916                        }
917                        if plan.order_key.len() > 0 {
918                            let order_by = mode.seq(&plan.order_key, None);
919                            let order_by = separated(", ", order_by);
920                            write!(f, " order_by=[{}]", order_by)?;
921                        }
922                        if let Some(limit) = &plan.limit {
923                            let limit = mode.expr(limit, None);
924                            write!(f, " limit={}", limit)?;
925                        }
926                        if &plan.offset > &0 {
927                            write!(f, " offset={}", plan.offset)?;
928                        }
929                    }
930                }
931                writeln!(f, "{}", annotations)?;
932                ctx.indented(|ctx| {
933                    if !matches!(temporal_bucketing_strategy, ArrangementStrategy::Direct) {
934                        writeln!(
935                            f,
936                            "{}temporal_bucketing_strategy={}",
937                            ctx.indent, temporal_bucketing_strategy
938                        )?;
939                    }
940                    input.fmt_text(f, ctx)
941                })?;
942            }
943            Negate { input } => {
944                writeln!(f, "{}Negate{}", ctx.indent, annotations)?;
945                ctx.indented(|ctx| input.fmt_text(f, ctx))?;
946            }
947            Threshold {
948                input,
949                threshold_plan,
950            } => {
951                use crate::plan::threshold::ThresholdPlan;
952                match threshold_plan {
953                    ThresholdPlan::Basic(plan) => {
954                        let ensure_arrangement = Arrangement::from(&plan.ensure_arrangement);
955                        write!(f, "{}Threshold::Basic", ctx.indent)?;
956                        write!(f, " ensure_arrangement=")?;
957                        ensure_arrangement.fmt_text(f, ctx)?;
958                        writeln!(f, "{}", annotations)?;
959                    }
960                };
961                ctx.indented(|ctx| input.fmt_text(f, ctx))?;
962            }
963            Union {
964                inputs,
965                consolidate_output,
966                temporal_bucketing_strategies,
967            } => {
968                if *consolidate_output {
969                    writeln!(
970                        f,
971                        "{}Union consolidate_output={}{}",
972                        ctx.indent, consolidate_output, annotations
973                    )?;
974                } else {
975                    writeln!(f, "{}Union{}", ctx.indent, annotations)?;
976                }
977                ctx.indented(|ctx| {
978                    if temporal_bucketing_strategies
979                        .iter()
980                        .any(|s| !matches!(s, ArrangementStrategy::Direct))
981                    {
982                        let strategies = temporal_bucketing_strategies
983                            .iter()
984                            .map(|s| format!("{}", s))
985                            .collect::<Vec<_>>()
986                            .join(", ");
987                        writeln!(
988                            f,
989                            "{}temporal_bucketing_strategies=[{}]",
990                            ctx.indent, strategies
991                        )?;
992                    }
993                    for input in inputs.iter() {
994                        input.fmt_text(f, ctx)?;
995                    }
996                    Ok(())
997                })?;
998            }
999            ArrangeBy {
1000                input_key,
1001                input,
1002                input_mfp,
1003                forms,
1004                strategy,
1005            } => {
1006                writeln!(f, "{}ArrangeBy{}", ctx.indent, annotations)?;
1007                ctx.indented(|ctx| {
1008                    if let Some(key) = input_key {
1009                        let key = mode.seq(key, None);
1010                        let key = CompactScalars(key);
1011                        writeln!(f, "{}input_key=[{}]", ctx.indent, key)?;
1012                    }
1013                    if !matches!(strategy, ArrangementStrategy::Direct) {
1014                        writeln!(f, "{}strategy={:?}", ctx.indent, strategy)?;
1015                    }
1016                    mode.expr(input_mfp, None).fmt_text(f, ctx)?;
1017                    forms.fmt_text(f, ctx)?;
1018                    // Render input
1019                    input.fmt_text(f, ctx)
1020                })?;
1021            }
1022        }
1023
1024        Ok(())
1025    }
1026}
1027
1028impl DisplayText<PlanRenderingContext<'_, Plan>> for AvailableCollections {
1029    fn fmt_text(
1030        &self,
1031        f: &mut fmt::Formatter<'_>,
1032        ctx: &mut PlanRenderingContext<'_, Plan>,
1033    ) -> fmt::Result {
1034        if ctx.config.verbose_syntax {
1035            self.fmt_verbose_text(f, ctx)
1036        } else {
1037            self.fmt_default_text(f, ctx)
1038        }
1039    }
1040}
1041impl AvailableCollections {
1042    fn fmt_default_text(
1043        &self,
1044        f: &mut fmt::Formatter<'_>,
1045        ctx: &mut PlanRenderingContext<'_, Plan>,
1046    ) -> fmt::Result {
1047        let plural = if self.arranged.len() == 1 { "" } else { "s" };
1048        write!(
1049            f,
1050            "{}Keys: {} arrangement{plural} available",
1051            ctx.indent,
1052            self.arranged.len()
1053        )?;
1054
1055        if self.raw {
1056            writeln!(f, ", plus raw stream")?;
1057        } else {
1058            writeln!(f, ", no raw stream")?;
1059        }
1060
1061        ctx.indented(|ctx| {
1062            for (i, arrangement) in self.arranged.iter().enumerate() {
1063                let arrangement = Arrangement::from(arrangement);
1064                write!(f, "{}Arrangement {i}: ", ctx.indent)?;
1065                arrangement.fmt_text(f, ctx)?;
1066                writeln!(f, "")?;
1067            }
1068            Ok(())
1069        })?;
1070
1071        Ok(())
1072    }
1073
1074    fn fmt_verbose_text(
1075        &self,
1076        f: &mut fmt::Formatter<'_>,
1077        ctx: &mut PlanRenderingContext<'_, Plan>,
1078    ) -> fmt::Result {
1079        // raw field
1080        let raw = &self.raw;
1081        writeln!(f, "{}raw={}", ctx.indent, raw)?;
1082        // arranged field
1083        for (i, arrangement) in self.arranged.iter().enumerate() {
1084            let arrangement = Arrangement::from(arrangement);
1085            write!(f, "{}arrangements[{}]=", ctx.indent, i)?;
1086            arrangement.fmt_text(f, ctx)?;
1087            writeln!(f, "")?;
1088        }
1089        Ok(())
1090    }
1091}
1092
1093/// Format a join implementation chain like `%0:t[#0{a}] » %1:u[#0{c}] » %2[×]`.
1094///
1095/// Each position is rendered as `%pos:name` when an underlying [`PlanNode::Get`] can be
1096/// dug out of the corresponding input plan (see [`humanize_input_name`]),
1097/// otherwise just `%pos`. `[×]` (U+00D7) marks a cross product (empty lookup
1098/// key). A `None` `source_key` renders the source position with no bracketed
1099/// suffix.
1100fn fmt_join_chain<'a, I>(
1101    f: &mut fmt::Formatter<'_>,
1102    humanizer: &dyn ExprHumanizer,
1103    mode: &HumanizedExplain,
1104    inputs: &[Plan],
1105    source_relation: usize,
1106    source_key: Option<&'a Vec<MirScalarExpr>>,
1107    stages: I,
1108) -> fmt::Result
1109where
1110    I: IntoIterator<Item = (usize, &'a Vec<MirScalarExpr>)>,
1111{
1112    write!(
1113        f,
1114        "{}",
1115        humanize_input_name(humanizer, &inputs[source_relation], source_relation)
1116    )?;
1117    if let Some(key) = source_key {
1118        fmt_join_key_brackets(f, mode, key)?;
1119    }
1120    for (lookup_relation, lookup_key) in stages {
1121        write!(
1122            f,
1123            " » {}",
1124            humanize_input_name(humanizer, &inputs[lookup_relation], lookup_relation)
1125        )?;
1126        fmt_join_key_brackets(f, mode, lookup_key)?;
1127    }
1128    Ok(())
1129}
1130
1131/// Render `[k1, k2, …]` for a non-empty join key, or `[×]` for a cross product.
1132fn fmt_join_key_brackets(
1133    f: &mut fmt::Formatter<'_>,
1134    mode: &HumanizedExplain,
1135    key: &Vec<MirScalarExpr>,
1136) -> fmt::Result {
1137    if key.is_empty() {
1138        write!(f, "[×]")
1139    } else {
1140        let key = CompactScalars(mode.seq(key, None));
1141        write!(f, "[{key}]")
1142    }
1143}
1144
1145/// Render a join input as `%pos:name` if we can dig a `Get` out of `plan`,
1146/// otherwise just `%pos`. Mirrors `dig_name_from_expr` in
1147/// `src/expr/src/explain/text.rs` for the MIR `EXPLAIN OPTIMIZED PLAN` output.
1148fn humanize_input_name(humanizer: &dyn ExprHumanizer, plan: &Plan, pos: usize) -> String {
1149    fn dig(humanizer: &dyn ExprHumanizer, plan: &Plan) -> Option<String> {
1150        use crate::plan::PlanNode::*;
1151        match &plan.node {
1152            Get { id, .. } => match id {
1153                Id::Local(lid) => Some(lid.to_string()),
1154                Id::Global(gid) => Some(
1155                    humanizer
1156                        .humanize_id_unqualified(*gid)
1157                        .unwrap_or_else(|| gid.to_string()),
1158                ),
1159            },
1160            // Transparent wrappers: keep digging.
1161            ArrangeBy { input, .. } => dig(humanizer, input),
1162            Mfp { input, .. } => dig(humanizer, input),
1163            _ => None,
1164        }
1165    }
1166    match dig(humanizer, plan) {
1167        Some(name) => format!("%{pos}:{name}"),
1168        None => format!("%{pos}"),
1169    }
1170}
1171
1172impl DisplayText<PlanRenderingContext<'_, Plan>> for LinearJoinPlan {
1173    fn fmt_text(
1174        &self,
1175        f: &mut fmt::Formatter<'_>,
1176        ctx: &mut PlanRenderingContext<'_, Plan>,
1177    ) -> fmt::Result {
1178        if ctx.config.verbose_syntax {
1179            self.fmt_verbose_text(f, ctx)
1180        } else {
1181            self.fmt_default_text(f, ctx)
1182        }
1183    }
1184}
1185impl LinearJoinPlan {
1186    /// True iff at least one stage is a cross product (empty lookup key).
1187    fn has_cross_stage(&self) -> bool {
1188        self.stage_plans.iter().any(|s| s.lookup_key.is_empty())
1189    }
1190
1191    #[allow(clippy::needless_pass_by_ref_mut)]
1192    fn fmt_default_text(
1193        &self,
1194        f: &mut fmt::Formatter<'_>,
1195        ctx: &mut PlanRenderingContext<'_, Plan>,
1196    ) -> fmt::Result {
1197        // Per-stage closures (natural 0..N order). The header chain already
1198        // shows each stage's position + key, so we only emit a block when
1199        // there's a non-identity closure to attribute to that stage.
1200        for stage in self.stage_plans.iter() {
1201            if stage.closure.maps_or_filters() {
1202                writeln!(f, "{}after %{}:", ctx.indent, stage.lookup_relation)?;
1203                ctx.indented(|ctx| stage.closure.fmt_default_text(f, ctx))?;
1204            }
1205        }
1206        if let Some(final_closure) = &self.final_closure {
1207            if final_closure.maps_or_filters() {
1208                writeln!(f, "{}Final closure:", ctx.indent)?;
1209                ctx.indented(|ctx| final_closure.fmt_default_text(f, ctx))?;
1210            }
1211        }
1212        Ok(())
1213    }
1214
1215    fn fmt_verbose_text(
1216        &self,
1217        f: &mut fmt::Formatter<'_>,
1218        ctx: &mut PlanRenderingContext<'_, Plan>,
1219    ) -> fmt::Result {
1220        let mode = HumanizedExplain::new(ctx.config.redacted);
1221        let plan = self;
1222        if let Some(closure) = plan.final_closure.as_ref() {
1223            if !closure.is_identity() {
1224                writeln!(f, "{}final_closure", ctx.indent)?;
1225                ctx.indented(|ctx| closure.fmt_text(f, ctx))?;
1226            }
1227        }
1228        for (i, plan) in plan.stage_plans.iter().enumerate() {
1229            writeln!(f, "{}linear_stage[{}]", ctx.indent, i)?;
1230            ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1231        }
1232        if let Some(closure) = plan.initial_closure.as_ref() {
1233            if !closure.is_identity() {
1234                writeln!(f, "{}initial_closure", ctx.indent)?;
1235                ctx.indented(|ctx| closure.fmt_text(f, ctx))?;
1236            }
1237        }
1238        match &plan.source_key {
1239            Some(source_key) => {
1240                let source_key = mode.seq(source_key, None);
1241                let source_key = CompactScalars(source_key);
1242                writeln!(
1243                    f,
1244                    "{}source={{ relation={}, key=[{}] }}",
1245                    ctx.indent, &plan.source_relation, source_key
1246                )?
1247            }
1248            None => writeln!(
1249                f,
1250                "{}source={{ relation={}, key=[] }}",
1251                ctx.indent, &plan.source_relation
1252            )?,
1253        };
1254        Ok(())
1255    }
1256}
1257
1258impl DisplayText<PlanRenderingContext<'_, Plan>> for LinearStagePlan {
1259    fn fmt_text(
1260        &self,
1261        f: &mut fmt::Formatter<'_>,
1262        ctx: &mut PlanRenderingContext<'_, Plan>,
1263    ) -> fmt::Result {
1264        if ctx.config.verbose_syntax {
1265            self.fmt_verbose_text(f, ctx)
1266        } else {
1267            self.fmt_default_text(f, ctx)
1268        }
1269    }
1270}
1271impl LinearStagePlan {
1272    #[allow(clippy::needless_pass_by_ref_mut)]
1273    fn fmt_default_text(
1274        &self,
1275        f: &mut fmt::Formatter<'_>,
1276        ctx: &mut PlanRenderingContext<'_, Plan>,
1277    ) -> fmt::Result {
1278        // NB this code path should not be live, as fmt_default_text for
1279        // `LinearJoinPlan` prints out each stage already
1280        let lookup_relation = &self.lookup_relation;
1281        if !self.lookup_key.is_empty() {
1282            let lookup_key = CompactScalarSeq(&self.lookup_key);
1283            writeln!(
1284                f,
1285                "{}Lookup key {lookup_key} in %{lookup_relation}",
1286                ctx.indent
1287            )
1288        } else {
1289            writeln!(f, "{}Lookup in %{lookup_relation}", ctx.indent)
1290        }
1291    }
1292
1293    fn fmt_verbose_text(
1294        &self,
1295        f: &mut fmt::Formatter<'_>,
1296        ctx: &mut PlanRenderingContext<'_, Plan>,
1297    ) -> fmt::Result {
1298        let mode = HumanizedExplain::new(ctx.config.redacted);
1299
1300        let plan = self;
1301        if !plan.closure.is_identity() {
1302            writeln!(f, "{}closure", ctx.indent)?;
1303            ctx.indented(|ctx| plan.closure.fmt_text(f, ctx))?;
1304        }
1305        {
1306            let lookup_relation = &plan.lookup_relation;
1307            let lookup_key = CompactScalarSeq(&plan.lookup_key);
1308            writeln!(
1309                f,
1310                "{}lookup={{ relation={}, key=[{}] }}",
1311                ctx.indent, lookup_relation, lookup_key
1312            )?;
1313        }
1314        {
1315            let stream_key = mode.seq(&plan.stream_key, None);
1316            let stream_key = CompactScalars(stream_key);
1317            let stream_thinning = Indices(&plan.stream_thinning);
1318            writeln!(
1319                f,
1320                "{}stream={{ key=[{}], thinning=({}) }}",
1321                ctx.indent, stream_key, stream_thinning
1322            )?;
1323        }
1324        Ok(())
1325    }
1326}
1327
1328impl DisplayText<PlanRenderingContext<'_, Plan>> for DeltaJoinPlan {
1329    fn fmt_text(
1330        &self,
1331        f: &mut fmt::Formatter<'_>,
1332        ctx: &mut PlanRenderingContext<'_, Plan>,
1333    ) -> fmt::Result {
1334        if ctx.config.verbose_syntax {
1335            self.fmt_verbose_text(f, ctx)
1336        } else {
1337            self.fmt_default_text(f, ctx)
1338        }
1339    }
1340}
1341impl DeltaJoinPlan {
1342    /// True iff any stage in any path is a cross product.
1343    fn has_cross_stage(&self) -> bool {
1344        self.path_plans
1345            .iter()
1346            .any(|p| p.stage_plans.iter().any(|s| s.lookup_key.is_empty()))
1347    }
1348
1349    fn fmt_default_text(
1350        &self,
1351        f: &mut fmt::Formatter<'_>,
1352        ctx: &mut PlanRenderingContext<'_, Plan>,
1353    ) -> fmt::Result {
1354        // The header chain already shows each path's positions + keys, so we
1355        // only print a `path %src:` block when at least one of its stage
1356        // closures or its final closure is non-identity.
1357        for plan in self.path_plans.iter() {
1358            let has_stage_closure = plan.stage_plans.iter().any(|s| s.closure.maps_or_filters());
1359            let has_final_closure = plan
1360                .final_closure
1361                .as_ref()
1362                .is_some_and(|c| c.maps_or_filters());
1363            if !has_stage_closure && !has_final_closure {
1364                continue;
1365            }
1366            writeln!(f, "{}path %{}:", ctx.indent, plan.source_relation)?;
1367            ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1368        }
1369        Ok(())
1370    }
1371
1372    fn fmt_verbose_text(
1373        &self,
1374        f: &mut fmt::Formatter<'_>,
1375        ctx: &mut PlanRenderingContext<'_, Plan>,
1376    ) -> fmt::Result {
1377        for (i, plan) in self.path_plans.iter().enumerate() {
1378            writeln!(f, "{}plan_path[{}]", ctx.indent, i)?;
1379            ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1380        }
1381        Ok(())
1382    }
1383}
1384
1385impl DisplayText<PlanRenderingContext<'_, Plan>> for DeltaPathPlan {
1386    fn fmt_text(
1387        &self,
1388        f: &mut fmt::Formatter<'_>,
1389        ctx: &mut PlanRenderingContext<'_, Plan>,
1390    ) -> fmt::Result {
1391        if ctx.config.verbose_syntax {
1392            self.fmt_verbose_text(f, ctx)
1393        } else {
1394            self.fmt_default_text(f, ctx)
1395        }
1396    }
1397}
1398
1399impl DeltaPathPlan {
1400    #[allow(clippy::needless_pass_by_ref_mut)]
1401    fn fmt_default_text(
1402        &self,
1403        f: &mut fmt::Formatter<'_>,
1404        ctx: &mut PlanRenderingContext<'_, Plan>,
1405    ) -> fmt::Result {
1406        for stage in self.stage_plans.iter() {
1407            if stage.closure.maps_or_filters() {
1408                writeln!(f, "{}after %{}:", ctx.indent, stage.lookup_relation)?;
1409                ctx.indented(|ctx| stage.closure.fmt_default_text(f, ctx))?;
1410            }
1411        }
1412        if let Some(final_closure) = &self.final_closure {
1413            if final_closure.maps_or_filters() {
1414                writeln!(f, "{}Final closure:", ctx.indent)?;
1415                ctx.indented(|ctx| final_closure.fmt_default_text(f, ctx))?;
1416            }
1417        }
1418        Ok(())
1419    }
1420
1421    fn fmt_verbose_text(
1422        &self,
1423        f: &mut fmt::Formatter<'_>,
1424        ctx: &mut PlanRenderingContext<'_, Plan>,
1425    ) -> fmt::Result {
1426        let mode = HumanizedExplain::new(ctx.config.redacted);
1427        let plan = self;
1428        if let Some(closure) = plan.final_closure.as_ref() {
1429            if !closure.is_identity() {
1430                writeln!(f, "{}final_closure", ctx.indent)?;
1431                ctx.indented(|ctx| closure.fmt_text(f, ctx))?;
1432            }
1433        }
1434        for (i, plan) in plan.stage_plans.iter().enumerate().rev() {
1435            writeln!(f, "{}delta_stage[{}]", ctx.indent, i)?;
1436            ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1437        }
1438        if !plan.initial_closure.is_identity() {
1439            writeln!(f, "{}initial_closure", ctx.indent)?;
1440            ctx.indented(|ctx| plan.initial_closure.fmt_text(f, ctx))?;
1441        }
1442        {
1443            let source_relation = &plan.source_relation;
1444            let source_key = mode.seq(&plan.source_key, None);
1445            let source_key = CompactScalars(source_key);
1446            writeln!(
1447                f,
1448                "{}source={{ relation={}, key=[{}] }}",
1449                ctx.indent, source_relation, source_key
1450            )?;
1451        }
1452        Ok(())
1453    }
1454}
1455
1456impl DisplayText<PlanRenderingContext<'_, Plan>> for DeltaStagePlan {
1457    fn fmt_text(
1458        &self,
1459        f: &mut fmt::Formatter<'_>,
1460        ctx: &mut PlanRenderingContext<'_, Plan>,
1461    ) -> fmt::Result {
1462        if ctx.config.verbose_syntax {
1463            self.fmt_verbose_text(f, ctx)
1464        } else {
1465            self.fmt_default_text(f, ctx)
1466        }
1467    }
1468}
1469impl DeltaStagePlan {
1470    #[allow(clippy::needless_pass_by_ref_mut)]
1471    fn fmt_default_text(
1472        &self,
1473        f: &mut fmt::Formatter<'_>,
1474        ctx: &mut PlanRenderingContext<'_, Plan>,
1475    ) -> fmt::Result {
1476        // NB this code path should not be live, as fmt_default_text for
1477        // `DeltaPathPlan` prints out each stage already
1478        let lookup_relation = &self.lookup_relation;
1479        let lookup_key = CompactScalarSeq(&self.lookup_key);
1480        writeln!(
1481            f,
1482            "{}Lookup key {lookup_key} in %{lookup_relation}",
1483            ctx.indent
1484        )
1485    }
1486
1487    fn fmt_verbose_text(
1488        &self,
1489        f: &mut fmt::Formatter<'_>,
1490        ctx: &mut PlanRenderingContext<'_, Plan>,
1491    ) -> fmt::Result {
1492        let mode = HumanizedExplain::new(ctx.config.redacted);
1493        let plan = self;
1494        if !plan.closure.is_identity() {
1495            writeln!(f, "{}closure", ctx.indent)?;
1496            ctx.indented(|ctx| plan.closure.fmt_text(f, ctx))?;
1497        }
1498        {
1499            let lookup_relation = &plan.lookup_relation;
1500            let lookup_key = mode.seq(&plan.lookup_key, None);
1501            let lookup_key = CompactScalars(lookup_key);
1502            writeln!(
1503                f,
1504                "{}lookup={{ relation={}, key=[{}] }}",
1505                ctx.indent, lookup_relation, lookup_key
1506            )?;
1507        }
1508        {
1509            let stream_key = mode.seq(&plan.stream_key, None);
1510            let stream_key = CompactScalars(stream_key);
1511            let stream_thinning = mode.seq(&plan.stream_thinning, None);
1512            let stream_thinning = CompactScalars(stream_thinning);
1513            writeln!(
1514                f,
1515                "{}stream={{ key=[{}], thinning=({}) }}",
1516                ctx.indent, stream_key, stream_thinning
1517            )?;
1518        }
1519        Ok(())
1520    }
1521}
1522
1523impl DisplayText<PlanRenderingContext<'_, Plan>> for JoinClosure {
1524    fn fmt_text(
1525        &self,
1526        f: &mut fmt::Formatter<'_>,
1527        ctx: &mut PlanRenderingContext<'_, Plan>,
1528    ) -> fmt::Result {
1529        if ctx.config.verbose_syntax {
1530            self.fmt_verbose_text(f, ctx)
1531        } else {
1532            self.fmt_default_text(f, ctx)
1533        }
1534    }
1535}
1536impl JoinClosure {
1537    fn fmt_default_text(
1538        &self,
1539        f: &mut fmt::Formatter<'_>,
1540        ctx: &mut PlanRenderingContext<'_, Plan>,
1541    ) -> fmt::Result {
1542        let mode = HumanizedExplain::new(ctx.config.redacted);
1543        if !self.before.is_identity() {
1544            mode.expr(self.before.deref(), None)
1545                .fmt_default_text(f, ctx)?;
1546        }
1547        if !self.ready_equivalences.is_empty() {
1548            let equivalences = separated(
1549                " AND ",
1550                self.ready_equivalences
1551                    .iter()
1552                    .map(|equivalence| separated(" = ", mode.seq(equivalence, None))),
1553            );
1554            writeln!(f, "{}Equivalences: {equivalences}", ctx.indent)?;
1555        }
1556        Ok(())
1557    }
1558
1559    fn fmt_verbose_text(
1560        &self,
1561        f: &mut fmt::Formatter<'_>,
1562        ctx: &mut PlanRenderingContext<'_, Plan>,
1563    ) -> fmt::Result {
1564        let mode = HumanizedExplain::new(ctx.config.redacted);
1565        mode.expr(self.before.deref(), None).fmt_text(f, ctx)?;
1566        if !self.ready_equivalences.is_empty() {
1567            let equivalences = separated(
1568                " AND ",
1569                self.ready_equivalences
1570                    .iter()
1571                    .map(|equivalence| separated(" = ", mode.seq(equivalence, None))),
1572            );
1573            writeln!(f, "{}ready_equivalences={}", ctx.indent, equivalences)?;
1574        }
1575        Ok(())
1576    }
1577}
1578
1579impl DisplayText<PlanRenderingContext<'_, Plan>> for AccumulablePlan {
1580    fn fmt_text(
1581        &self,
1582        f: &mut fmt::Formatter<'_>,
1583        ctx: &mut PlanRenderingContext<'_, Plan>,
1584    ) -> fmt::Result {
1585        if ctx.config.verbose_syntax {
1586            self.fmt_verbose_text(f, ctx)
1587        } else {
1588            self.fmt_default_text(f, ctx)
1589        }
1590    }
1591}
1592impl AccumulablePlan {
1593    #[allow(clippy::needless_pass_by_ref_mut)]
1594    fn fmt_default_text(
1595        &self,
1596        f: &mut fmt::Formatter<'_>,
1597        ctx: &mut PlanRenderingContext<'_, Plan>,
1598    ) -> fmt::Result {
1599        let mode = HumanizedExplain::new(ctx.config.redacted);
1600
1601        if !self.simple_aggrs.is_empty() {
1602            let simple_aggrs = self
1603                .simple_aggrs
1604                .iter()
1605                .map(|(_i_datum, agg)| mode.expr(agg, None));
1606            let simple_aggrs = separated(", ", simple_aggrs);
1607            writeln!(f, "{}Simple aggregates: {simple_aggrs}", ctx.indent)?;
1608        }
1609
1610        if !self.distinct_aggrs.is_empty() {
1611            let distinct_aggrs = self
1612                .distinct_aggrs
1613                .iter()
1614                .map(|(_i_datum, agg)| mode.expr(agg, None));
1615            let distinct_aggrs = separated(", ", distinct_aggrs);
1616            writeln!(f, "{}Distinct aggregates: {distinct_aggrs}", ctx.indent)?;
1617        }
1618        Ok(())
1619    }
1620
1621    #[allow(clippy::needless_pass_by_ref_mut)]
1622    fn fmt_verbose_text(
1623        &self,
1624        f: &mut fmt::Formatter<'_>,
1625        ctx: &mut PlanRenderingContext<'_, Plan>,
1626    ) -> fmt::Result {
1627        let mode = HumanizedExplain::new(ctx.config.redacted);
1628        // full_aggrs (skipped because they are repeated in simple_aggrs ∪ distinct_aggrs)
1629        // for (i, aggr) in self.full_aggrs.iter().enumerate() {
1630        //     write!(f, "{}full_aggrs[{}]=", ctx.indent, i)?;
1631        //     aggr.fmt_text(f, &mut ())?;
1632        //     writeln!(f)?;
1633        // }
1634        // simple_aggrs
1635        for (i, (i_datum, agg)) in self.simple_aggrs.iter().enumerate() {
1636            let agg = mode.expr(agg, None);
1637            write!(f, "{}simple_aggrs[{}]=", ctx.indent, i)?;
1638            writeln!(f, "({}, {})", i_datum, agg)?;
1639        }
1640        // distinct_aggrs
1641        for (i, (i_datum, agg)) in self.distinct_aggrs.iter().enumerate() {
1642            let agg = mode.expr(agg, None);
1643            write!(f, "{}distinct_aggrs[{}]=", ctx.indent, i)?;
1644            writeln!(f, "({}, {})", i_datum, agg)?;
1645        }
1646        Ok(())
1647    }
1648}
1649
1650impl DisplayText<PlanRenderingContext<'_, Plan>> for HierarchicalPlan {
1651    fn fmt_text(
1652        &self,
1653        f: &mut fmt::Formatter<'_>,
1654        ctx: &mut PlanRenderingContext<'_, Plan>,
1655    ) -> fmt::Result {
1656        if ctx.config.verbose_syntax {
1657            self.fmt_verbose_text(f, ctx)
1658        } else {
1659            self.fmt_default_text(f, ctx)
1660        }
1661    }
1662}
1663impl HierarchicalPlan {
1664    #[allow(clippy::needless_pass_by_ref_mut)]
1665    fn fmt_default_text(
1666        &self,
1667        f: &mut fmt::Formatter<'_>,
1668        ctx: &mut PlanRenderingContext<'_, Plan>,
1669    ) -> fmt::Result {
1670        let mode = HumanizedExplain::new(ctx.config.redacted);
1671        let aggr_funcs = mode.seq(self.aggr_funcs(), None);
1672        let aggr_funcs = separated(", ", aggr_funcs);
1673        writeln!(f, "{}Aggregations: {aggr_funcs}", ctx.indent)
1674    }
1675
1676    #[allow(clippy::needless_pass_by_ref_mut)]
1677    fn fmt_verbose_text(
1678        &self,
1679        f: &mut fmt::Formatter<'_>,
1680        ctx: &mut PlanRenderingContext<'_, Plan>,
1681    ) -> fmt::Result {
1682        let mode = HumanizedExplain::new(ctx.config.redacted);
1683        match self {
1684            HierarchicalPlan::Monotonic(plan) => {
1685                let aggr_funcs = mode.seq(&plan.aggr_funcs, None);
1686                let aggr_funcs = separated(", ", aggr_funcs);
1687                writeln!(f, "{}aggr_funcs=[{}]", ctx.indent, aggr_funcs)?;
1688                writeln!(f, "{}monotonic", ctx.indent)?;
1689                if plan.must_consolidate {
1690                    writeln!(f, "{}must_consolidate", ctx.indent)?;
1691                }
1692            }
1693            HierarchicalPlan::Bucketed(plan) => {
1694                let aggr_funcs = mode.seq(&plan.aggr_funcs, None);
1695                let aggr_funcs = separated(", ", aggr_funcs);
1696                writeln!(f, "{}aggr_funcs=[{}]", ctx.indent, aggr_funcs)?;
1697                let buckets = separated(", ", &plan.buckets);
1698                writeln!(f, "{}buckets=[{}]", ctx.indent, buckets)?;
1699            }
1700        }
1701        Ok(())
1702    }
1703}
1704
1705impl DisplayText<PlanRenderingContext<'_, Plan>> for BasicPlan {
1706    fn fmt_text(
1707        &self,
1708        f: &mut fmt::Formatter<'_>,
1709        ctx: &mut PlanRenderingContext<'_, Plan>,
1710    ) -> fmt::Result {
1711        if ctx.config.verbose_syntax {
1712            self.fmt_verbose_text(f, ctx)
1713        } else {
1714            self.fmt_default_text(f, ctx)
1715        }
1716    }
1717}
1718impl BasicPlan {
1719    #[allow(clippy::needless_pass_by_ref_mut)]
1720    fn fmt_default_text(
1721        &self,
1722        f: &mut fmt::Formatter<'_>,
1723        ctx: &mut PlanRenderingContext<'_, Plan>,
1724    ) -> fmt::Result {
1725        let mode = HumanizedExplain::new(ctx.config.redacted);
1726        match self {
1727            BasicPlan::Single(SingleBasicPlan {
1728                expr,
1729                fused_unnest_list: _,
1730            }) => {
1731                let agg = mode.expr(expr, None);
1732                writeln!(f, "{}Aggregation: {agg}", ctx.indent)?;
1733            }
1734            BasicPlan::Multiple(aggs) => {
1735                let mode = HumanizedExplain::new(ctx.config.redacted);
1736                write!(f, "{}Aggregations:", ctx.indent)?;
1737
1738                for agg in aggs.iter() {
1739                    let agg = mode.expr(agg, None);
1740                    write!(f, " {agg}")?;
1741                }
1742                writeln!(f)?;
1743            }
1744        }
1745        Ok(())
1746    }
1747
1748    #[allow(clippy::needless_pass_by_ref_mut)]
1749    fn fmt_verbose_text(
1750        &self,
1751        f: &mut fmt::Formatter<'_>,
1752        ctx: &mut PlanRenderingContext<'_, Plan>,
1753    ) -> fmt::Result {
1754        let mode = HumanizedExplain::new(ctx.config.redacted);
1755        match self {
1756            BasicPlan::Single(SingleBasicPlan {
1757                expr,
1758                fused_unnest_list,
1759            }) => {
1760                let agg = mode.expr(expr, None);
1761                let fused_unnest_list = if *fused_unnest_list {
1762                    ", fused_unnest_list=true"
1763                } else {
1764                    ""
1765                };
1766                writeln!(f, "{}aggr=({}{})", ctx.indent, agg, fused_unnest_list)?;
1767            }
1768            BasicPlan::Multiple(aggs) => {
1769                for (i, agg) in aggs.iter().enumerate() {
1770                    let agg = mode.expr(agg, None);
1771                    writeln!(f, "{}aggrs[{}]={}", ctx.indent, i, agg)?;
1772                }
1773            }
1774        }
1775        Ok(())
1776    }
1777}
1778
1779/// Helper struct for rendering an arrangement.
1780struct Arrangement<'a> {
1781    key: &'a Vec<MirScalarExpr>,
1782    permutation: Permutation<'a>,
1783    thinning: &'a Vec<usize>,
1784}
1785
1786impl<'a> From<&'a (Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)> for Arrangement<'a> {
1787    fn from(
1788        (key, permutation, thinning): &'a (Vec<MirScalarExpr>, Vec<usize>, Vec<usize>),
1789    ) -> Self {
1790        Arrangement {
1791            key,
1792            permutation: Permutation(permutation),
1793            thinning,
1794        }
1795    }
1796}
1797
1798impl<'a> DisplayText<PlanRenderingContext<'_, Plan>> for Arrangement<'a> {
1799    fn fmt_text(
1800        &self,
1801        f: &mut fmt::Formatter<'_>,
1802        ctx: &mut PlanRenderingContext<'_, Plan>,
1803    ) -> fmt::Result {
1804        if ctx.config.verbose_syntax {
1805            self.fmt_verbose_text(f, ctx)
1806        } else {
1807            self.fmt_default_text(f, ctx)
1808        }
1809    }
1810}
1811
1812impl<'a> Arrangement<'a> {
1813    #[allow(clippy::needless_pass_by_ref_mut)]
1814    fn fmt_default_text(
1815        &self,
1816        f: &mut fmt::Formatter<'_>,
1817        ctx: &PlanRenderingContext<'_, Plan>,
1818    ) -> fmt::Result {
1819        let mode = HumanizedExplain::new(ctx.config.redacted);
1820        if !self.key.is_empty() {
1821            let key = mode.seq(self.key, None);
1822            let key = CompactScalars(key);
1823            write!(f, "{key}")
1824        } else {
1825            write!(f, "(empty key)")
1826        }
1827    }
1828
1829    #[allow(clippy::needless_pass_by_ref_mut)]
1830    fn fmt_verbose_text(
1831        &self,
1832        f: &mut fmt::Formatter<'_>,
1833        ctx: &PlanRenderingContext<'_, Plan>,
1834    ) -> fmt::Result {
1835        let mode = HumanizedExplain::new(ctx.config.redacted);
1836        // prepare key
1837        let key = mode.seq(self.key, None);
1838        let key = CompactScalars(key);
1839        // prepare perumation map
1840        let permutation = &self.permutation;
1841        // prepare thinning
1842        let thinning = Indices(self.thinning);
1843        // write the arrangement spec
1844        write!(
1845            f,
1846            "{{ key=[{}], permutation={}, thinning=({}) }}",
1847            key, permutation, thinning
1848        )
1849    }
1850}
1851
1852/// Helper struct for rendering a permutation.
1853struct Permutation<'a>(&'a Vec<usize>);
1854
1855impl<'a> fmt::Display for Permutation<'a> {
1856    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1857        let mut pairs = vec![];
1858        for (x, y) in self.0.iter().enumerate().filter(|(x, y)| x != *y) {
1859            pairs.push(format!("#{}: #{}", x, y));
1860        }
1861
1862        if pairs.len() > 0 {
1863            write!(f, "{{{}}}", separated(", ", pairs))
1864        } else {
1865            write!(f, "id")
1866        }
1867    }
1868}
1869
1870/// Annotations for physical plans.
1871struct PlanAnnotations {
1872    config: ExplainConfig,
1873    node_id: LirId,
1874}
1875
1876// The current implementation deviates from the `AnnotatedPlan` used in `Mir~`-based plans. This is
1877// fine, since at the moment the only attribute we are going to explain is the `node_id`, which at
1878// the moment is kept inline with the `Plan` variants. If at some point in the future we want to
1879// start deriving and printing attributes that are derived ad-hoc, however, we might want to adopt
1880// `AnnotatedPlan` here as well.
1881impl PlanAnnotations {
1882    fn new(config: ExplainConfig, plan: &Plan) -> Self {
1883        let node_id = plan.lir_id;
1884        Self { config, node_id }
1885    }
1886}
1887
1888impl fmt::Display for PlanAnnotations {
1889    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1890        if self.config.node_ids {
1891            f.debug_struct(" //")
1892                .field("node_id", &self.node_id)
1893                .finish()
1894        } else {
1895            // No physical plan annotations enabled.
1896            Ok(())
1897        }
1898    }
1899}