mz_compute_types/explain/
text.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! `EXPLAIN ... AS TEXT` support for LIR structures.
11//!
12//! The format adheres to the following conventions:
13//! 1. In general, every line that starts with an uppercase character
14//!    corresponds to a [`Plan`] variant.
15//! 2. Whenever the variant has an attached `~Plan`, the printed name is
16//!    `$V::$P` where `$V` identifies the variant and `$P` the plan.
17//! 3. The fields of a `~Plan` struct attached to a [`Plan`] are rendered as if
18//!    they were part of the variant themself.
19//! 4. Non-recursive parameters of each sub-plan are written as `$key=$val`
20//!    pairs on the same line or as lowercase `$key` fields on indented lines.
21//! 5. A single non-recursive parameter can be written just as `$val`.
22
23use std::fmt;
24use std::ops::Deref;
25
26use itertools::Itertools;
27use mz_expr::explain::{HumanizedExplain, HumanizerMode, fmt_text_constant_rows};
28use mz_expr::{Id, MirScalarExpr};
29use mz_ore::soft_assert_or_log;
30use mz_ore::str::{IndentLike, StrExt, separated};
31use mz_repr::explain::text::DisplayText;
32use mz_repr::explain::{
33    CompactScalarSeq, CompactScalars, ExplainConfig, Indices, PlanRenderingContext,
34};
35
36use crate::plan::join::delta_join::{DeltaPathPlan, DeltaStagePlan};
37use crate::plan::join::linear_join::LinearStagePlan;
38use crate::plan::join::{DeltaJoinPlan, JoinClosure, LinearJoinPlan};
39use crate::plan::reduce::{
40    AccumulablePlan, BasicPlan, BucketedPlan, CollationPlan, HierarchicalPlan, MonotonicPlan,
41    SingleBasicPlan,
42};
43use crate::plan::threshold::ThresholdPlan;
44use crate::plan::{AvailableCollections, LirId, Plan, PlanNode};
45
46impl DisplayText<PlanRenderingContext<'_, Plan>> for Plan {
47    fn fmt_text(
48        &self,
49        f: &mut fmt::Formatter<'_>,
50        ctx: &mut PlanRenderingContext<'_, Plan>,
51    ) -> fmt::Result {
52        if ctx.config.verbose_syntax {
53            self.fmt_verbose_text(f, ctx)
54        } else {
55            self.fmt_default_text(f, ctx)
56        }
57    }
58}
59
60impl Plan {
61    // NOTE: This code needs to be kept in sync with the `Display` instance for
62    // `RenderPlan:ExprHumanizer`.
63    //
64    // This code determines what you see in `EXPLAIN`; that other code
65    // determine what you see when you run `mz_lir_mapping`.
66    fn fmt_default_text(
67        &self,
68        f: &mut fmt::Formatter<'_>,
69        ctx: &mut PlanRenderingContext<'_, Plan>,
70    ) -> fmt::Result {
71        use PlanNode::*;
72
73        let mode = HumanizedExplain::new(ctx.config.redacted);
74        let annotations = PlanAnnotations::new(ctx.config.clone(), self);
75
76        match &self.node {
77            Constant { rows } => {
78                write!(f, "{}→Constant ", ctx.indent)?;
79
80                match rows {
81                    Ok(rows) => write!(
82                        f,
83                        "({} row{})",
84                        rows.len(),
85                        if rows.len() == 1 { "" } else { "s" }
86                    )?,
87                    Err(err) => {
88                        if mode.redacted() {
89                            write!(f, "(error: █)")?;
90                        } else {
91                            write!(f, "(error: {})", err.to_string().quoted(),)?;
92                        }
93                    }
94                }
95
96                writeln!(f, "{annotations}")?;
97            }
98            Get { id, keys, plan } => {
99                ctx.indent.set(); // mark the current indent level
100
101                // Resolve the id as a string.
102                let id = match id {
103                    Id::Local(id) => id.to_string(),
104                    Id::Global(id) => ctx
105                        .humanizer
106                        .humanize_id(*id)
107                        .unwrap_or_else(|| id.to_string()),
108                };
109                // Render plan-specific fields.
110                use crate::plan::GetPlan;
111                match plan {
112                    GetPlan::PassArrangements => {
113                        if keys.raw && keys.arranged.is_empty() {
114                            writeln!(f, "{}→Stream {id}{annotations}", ctx.indent)?;
115                        } else {
116                            // we're not reporting on whether or not `raw` is set
117                            // we're not reporting on how many arrangements there are
118                            writeln!(f, "{}→Arranged {id}{annotations}", ctx.indent)?;
119                        }
120                    }
121                    GetPlan::Arrangement(key, Some(val), mfp) => {
122                        if !mfp.is_identity() {
123                            writeln!(f, "{}→Fused with Child Map/Filter/Project", ctx.indent)?;
124                            ctx.indent += 1;
125                            mode.expr(mfp, None).fmt_default_text(f, ctx)?;
126                            ctx.indent += 1;
127                        }
128
129                        writeln!(f, "{}→Index Lookup on {id}{annotations}", ctx.indent)?;
130                        ctx.indent += 1;
131                        let key = CompactScalars(mode.seq(key, None));
132                        write!(f, "{}Key: ({key}) ", ctx.indent)?;
133                        let val = mode.expr(val, None);
134                        writeln!(f, "Value: {val}")?;
135                    }
136                    GetPlan::Arrangement(key, None, mfp) => {
137                        if !mfp.is_identity() {
138                            writeln!(f, "{}→Fused with Child Map/Filter/Project", ctx.indent)?;
139                            ctx.indent += 1;
140                            mode.expr(mfp, None).fmt_default_text(f, ctx)?;
141                            ctx.indent += 1;
142                        }
143
144                        writeln!(f, "{}→Arranged {id}{annotations}", ctx.indent)?;
145                        ctx.indent += 1;
146                        let key = CompactScalars(mode.seq(key, None));
147                        writeln!(f, "{}Key: ({key})", ctx.indent)?;
148                    }
149                    GetPlan::Collection(mfp) => {
150                        if !mfp.is_identity() {
151                            writeln!(f, "{}→Fused with Child Map/Filter/Project", ctx.indent)?;
152                            ctx.indent += 1;
153                            mode.expr(mfp, None).fmt_default_text(f, ctx)?;
154                            ctx.indent += 1;
155                        }
156
157                        writeln!(f, "{}→Read {id}{annotations}", ctx.indent)?;
158                    }
159                }
160                ctx.indent.reset(); // reset the original indent level
161            }
162            Let { id, value, body } => {
163                let mut bindings = vec![(id, value.as_ref())];
164                let mut head = body.as_ref();
165
166                // Render Let-blocks nested in the body an outer Let-block in one step
167                // with a flattened list of bindings
168                while let Let { id, value, body } = &head.node {
169                    bindings.push((id, value.as_ref()));
170                    head = body.as_ref();
171                }
172
173                writeln!(f, "{}→With", ctx.indent)?;
174                ctx.indented(|ctx| {
175                    for (id, value) in bindings.iter() {
176                        writeln!(f, "{}cte {} =", ctx.indent, *id)?;
177                        ctx.indented(|ctx| value.fmt_text(f, ctx))?;
178                    }
179                    Ok(())
180                })?;
181                writeln!(f, "{}→Return{annotations}", ctx.indent)?;
182                ctx.indented(|ctx| head.fmt_text(f, ctx))?;
183            }
184            LetRec {
185                ids,
186                values,
187                limits,
188                body,
189            } => {
190                let head = body.as_ref();
191
192                writeln!(f, "{}→With Mutually Recursive", ctx.indent)?;
193                ctx.indented(|ctx| {
194                    let bindings = ids.iter().zip_eq(values).zip_eq(limits);
195                    for ((id, value), limit) in bindings {
196                        if let Some(limit) = limit {
197                            writeln!(f, "{}cte {} {} =", ctx.indent, limit, *id)?;
198                        } else {
199                            writeln!(f, "{}cte {} =", ctx.indent, *id)?;
200                        }
201                        ctx.indented(|ctx| value.fmt_text(f, ctx))?;
202                    }
203                    Ok(())
204                })?;
205                writeln!(f, "{}→Return{annotations}", ctx.indent)?;
206                ctx.indented(|ctx| head.fmt_text(f, ctx))?;
207            }
208            Mfp {
209                input,
210                mfp,
211                input_key_val: _,
212            } => {
213                writeln!(f, "{}→Map/Filter/Project{annotations}", ctx.indent)?;
214                ctx.indent.set();
215
216                ctx.indent += 1;
217                mode.expr(mfp, None).fmt_default_text(f, ctx)?;
218
219                // one more nesting level if we showed anything for the MFP
220                if !mfp.is_identity() {
221                    ctx.indent += 1;
222                }
223                input.fmt_text(f, ctx)?;
224                ctx.indent.reset();
225            }
226            FlatMap {
227                input_key: _,
228                input,
229                exprs,
230                func,
231                mfp_after,
232            } => {
233                ctx.indent.set();
234                if !mfp_after.expressions.is_empty() || !mfp_after.predicates.is_empty() {
235                    writeln!(f, "{}→Fused with Child Map/Filter/Project", ctx.indent)?;
236                    ctx.indent += 1;
237                    mode.expr(mfp_after, None).fmt_default_text(f, ctx)?;
238                    ctx.indent += 1;
239                }
240
241                let exprs = mode.seq(exprs, None);
242                let exprs = CompactScalars(exprs);
243                writeln!(
244                    f,
245                    "{}→Table Function {func}({exprs}){annotations}",
246                    ctx.indent
247                )?;
248                ctx.indent += 1;
249
250                input.fmt_text(f, ctx)?;
251
252                ctx.indent.reset();
253            }
254            Join { inputs, plan } => {
255                use crate::plan::join::JoinPlan;
256                match plan {
257                    JoinPlan::Linear(plan) => {
258                        write!(f, "{}→Differential Join", ctx.indent)?;
259                        write!(f, " %{}", plan.source_relation)?;
260                        for dsp in &plan.stage_plans {
261                            write!(f, " » %{}", dsp.lookup_relation)?;
262                        }
263                        writeln!(f, "{annotations}")?;
264                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
265                    }
266                    JoinPlan::Delta(plan) => {
267                        write!(f, "{}→Delta Join", ctx.indent)?;
268                        for dpp in &plan.path_plans {
269                            write!(f, " [%{}", dpp.source_relation)?;
270
271                            for dsp in &dpp.stage_plans {
272                                write!(f, " » %{}", dsp.lookup_relation)?;
273                            }
274                            write!(f, "]")?;
275                        }
276                        writeln!(f, "{annotations}")?;
277                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
278                    }
279                }
280
281                ctx.indented(|ctx| {
282                    for input in inputs {
283                        input.fmt_text(f, ctx)?;
284                    }
285                    Ok(())
286                })?;
287            }
288            Reduce {
289                input_key: _,
290                input,
291                key_val_plan,
292                plan,
293                mfp_after,
294            } => {
295                ctx.indent.set();
296                if !mfp_after.expressions.is_empty() || !mfp_after.predicates.is_empty() {
297                    writeln!(f, "{}→Fused with Child Map/Filter/Project", ctx.indent)?;
298                    ctx.indent += 1;
299                    mode.expr(mfp_after, None).fmt_default_text(f, ctx)?;
300                    ctx.indent += 1;
301                }
302
303                use crate::plan::reduce::ReducePlan;
304                match plan {
305                    ReducePlan::Distinct => {
306                        writeln!(f, "{}→Distinct GroupAggregate{annotations}", ctx.indent)?;
307                    }
308                    ReducePlan::Accumulable(plan) => {
309                        writeln!(f, "{}→Accumulable GroupAggregate{annotations}", ctx.indent)?;
310                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
311                    }
312                    ReducePlan::Hierarchical(
313                        plan @ HierarchicalPlan::Bucketed(BucketedPlan { buckets, .. }),
314                    ) => {
315                        write!(
316                            f,
317                            "{}→Bucketed Hierarchical GroupAggregate (buckets: ",
318                            ctx.indent
319                        )?;
320                        for bucket in buckets {
321                            write!(f, " {bucket}")?;
322                        }
323                        writeln!(f, "){annotations}")?;
324                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
325                    }
326                    ReducePlan::Hierarchical(
327                        plan @ HierarchicalPlan::Monotonic(MonotonicPlan {
328                            must_consolidate, ..
329                        }),
330                    ) => {
331                        write!(f, "{}→", ctx.indent)?;
332                        if *must_consolidate {
333                            write!(f, "Consolidating ")?;
334                        }
335                        writeln!(f, "Monotonic GroupAggregate{annotations}",)?;
336                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
337                    }
338                    ReducePlan::Basic(plan) => {
339                        ctx.indent.set();
340                        if let BasicPlan::Single(SingleBasicPlan {
341                            fused_unnest_list, ..
342                        }) = &plan
343                        {
344                            if *fused_unnest_list {
345                                writeln!(
346                                    f,
347                                    "{}→Fused with Child Table Function unnest_list",
348                                    ctx.indent
349                                )?;
350                                ctx.indent += 1;
351                            }
352                        }
353                        writeln!(
354                            f,
355                            "{}→Non-incremental GroupAggregate{annotations}",
356                            ctx.indent
357                        )?;
358                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
359                        ctx.indent.reset();
360                    }
361                    ReducePlan::Collation(plan) => {
362                        writeln!(
363                            f,
364                            "{}→Collated Multi-GroupAggregate{annotations}",
365                            ctx.indent
366                        )?;
367                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
368                    }
369                }
370
371                ctx.indented(|ctx| {
372                    let kvp = key_val_plan.key_plan.deref();
373                    if !kvp.is_identity() {
374                        writeln!(f, "{}Key:", ctx.indent)?;
375                        ctx.indented(|ctx| {
376                            let key_plan = mode.expr(kvp, None);
377                            key_plan.fmt_default_text(f, ctx)
378                        })?;
379                    }
380
381                    input.fmt_text(f, ctx)
382                })?;
383
384                ctx.indent.reset();
385            }
386            TopK { input, top_k_plan } => {
387                use crate::plan::top_k::TopKPlan;
388                match top_k_plan {
389                    TopKPlan::MonotonicTop1(plan) => {
390                        write!(f, "{}→", ctx.indent)?;
391                        if plan.must_consolidate {
392                            write!(f, "Consolidating ")?;
393                        }
394                        writeln!(f, "Monotonic Top1{annotations}")?;
395
396                        ctx.indented(|ctx| {
397                            if plan.group_key.len() > 0 {
398                                let group_by = CompactScalars(mode.seq(&plan.group_key, None));
399                                writeln!(f, "{}Group By{group_by}", ctx.indent)?;
400                            }
401                            if plan.order_key.len() > 0 {
402                                let order_by = separated(", ", mode.seq(&plan.order_key, None));
403                                writeln!(f, "{}Order By {order_by}", ctx.indent)?;
404                            }
405                            Ok(())
406                        })?;
407                    }
408                    TopKPlan::MonotonicTopK(plan) => {
409                        write!(f, "{}→", ctx.indent)?;
410                        if plan.must_consolidate {
411                            write!(f, "Consolidating ")?;
412                        }
413                        writeln!(f, "Monotonic TopK{annotations}")?;
414
415                        ctx.indented(|ctx| {
416                            if plan.group_key.len() > 0 {
417                                let group_by = CompactScalars(mode.seq(&plan.group_key, None));
418                                writeln!(f, "{}Group By{group_by}", ctx.indent)?;
419                            }
420                            if plan.order_key.len() > 0 {
421                                let order_by = separated(", ", mode.seq(&plan.order_key, None));
422                                writeln!(f, "{}Order By {order_by}", ctx.indent)?;
423                            }
424                            if let Some(limit) = &plan.limit {
425                                let limit = mode.expr(limit, None);
426                                writeln!(f, "{}Limit {limit}", ctx.indent)?;
427                            }
428                            Ok(())
429                        })?;
430                    }
431                    TopKPlan::Basic(plan) => {
432                        writeln!(f, "{}→Non-monotonic TopK{annotations}", ctx.indent)?;
433
434                        ctx.indented(|ctx| {
435                            if plan.group_key.len() > 0 {
436                                let group_by = CompactScalars(mode.seq(&plan.group_key, None));
437                                writeln!(f, "{}Group By{group_by}", ctx.indent)?;
438                            }
439                            if plan.order_key.len() > 0 {
440                                let order_by = separated(", ", mode.seq(&plan.order_key, None));
441                                writeln!(f, "{}Order By {order_by}", ctx.indent)?;
442                            }
443                            if let Some(limit) = &plan.limit {
444                                let limit = mode.expr(limit, None);
445                                writeln!(f, "{}Limit {limit}", ctx.indent)?;
446                            }
447                            if plan.offset != 0 {
448                                let offset = plan.offset;
449                                writeln!(f, "{}Offset {offset}", ctx.indent)?;
450                            }
451                            Ok(())
452                        })?;
453                    }
454                }
455
456                ctx.indented(|ctx| input.fmt_text(f, ctx))?;
457            }
458            Negate { input } => {
459                writeln!(f, "{}→Negate Diffs{annotations}", ctx.indent)?;
460
461                ctx.indented(|ctx| input.fmt_text(f, ctx))?;
462            }
463            Threshold {
464                input,
465                threshold_plan,
466            } => {
467                match threshold_plan {
468                    ThresholdPlan::Basic(plan) => {
469                        write!(f, "{}→Threshold Diffs ", ctx.indent)?;
470                        let ensure_arrangement = Arrangement::from(&plan.ensure_arrangement);
471                        ensure_arrangement.fmt_text(f, ctx)?;
472                        writeln!(f, "{annotations}")?;
473                    }
474                };
475
476                ctx.indented(|ctx| input.fmt_text(f, ctx))?;
477            }
478            Union {
479                inputs,
480                consolidate_output,
481            } => {
482                write!(f, "{}→", ctx.indent)?;
483                if *consolidate_output {
484                    write!(f, "Consolidating ")?;
485                }
486                writeln!(f, "Union{annotations}")?;
487
488                ctx.indented(|ctx| {
489                    for input in inputs.iter() {
490                        input.fmt_text(f, ctx)?;
491                    }
492                    Ok(())
493                })?;
494            }
495            ArrangeBy {
496                input_key: _,
497                input,
498                input_mfp,
499                forms,
500            } => {
501                ctx.indent.set();
502                if forms.raw && forms.arranged.is_empty() {
503                    soft_assert_or_log!(forms.raw, "raw stream with no arrangements");
504                    writeln!(f, "{}→Unarranged Raw Stream{annotations}", ctx.indent)?;
505                } else {
506                    write!(f, "{}→Arrange", ctx.indent)?;
507
508                    if !forms.arranged.is_empty() {
509                        let mode = HumanizedExplain::new(ctx.config.redacted);
510                        for (key, _, _) in &forms.arranged {
511                            if !key.is_empty() {
512                                let key = mode.seq(key, None);
513                                let key = CompactScalars(key);
514                                write!(f, " ({key})")?;
515                            } else {
516                                write!(f, " (empty key)")?;
517                            }
518                        }
519                    }
520                    writeln!(f, "{annotations}")?;
521                }
522
523                if !input_mfp.is_identity() {
524                    ctx.indent += 1;
525                    writeln!(f, "{}→Fused with Parent Map/Filter/Project", ctx.indent)?;
526                    ctx.indented(|ctx| mode.expr(input_mfp, None).fmt_default_text(f, ctx))?;
527                }
528
529                ctx.indent += 1;
530                input.fmt_text(f, ctx)?;
531                ctx.indent.reset();
532            }
533        }
534
535        Ok(())
536    }
537
538    fn fmt_verbose_text(
539        &self,
540        f: &mut fmt::Formatter<'_>,
541        ctx: &mut PlanRenderingContext<'_, Plan>,
542    ) -> fmt::Result {
543        use PlanNode::*;
544
545        let mode = HumanizedExplain::new(ctx.config.redacted);
546        let annotations = PlanAnnotations::new(ctx.config.clone(), self);
547
548        match &self.node {
549            Constant { rows } => match rows {
550                Ok(rows) => {
551                    if !rows.is_empty() {
552                        writeln!(f, "{}Constant{}", ctx.indent, annotations)?;
553                        ctx.indented(|ctx| {
554                            fmt_text_constant_rows(
555                                f,
556                                rows.iter().map(|(data, _, diff)| (data, diff)),
557                                &mut ctx.indent,
558                                ctx.config.redacted,
559                            )
560                        })?;
561                    } else {
562                        writeln!(f, "{}Constant <empty>{}", ctx.indent, annotations)?;
563                    }
564                }
565                Err(err) => {
566                    if mode.redacted() {
567                        writeln!(f, "{}Error █{}", ctx.indent, annotations)?;
568                    } else {
569                        {
570                            writeln!(
571                                f,
572                                "{}Error {}{}",
573                                ctx.indent,
574                                err.to_string().quoted(),
575                                annotations
576                            )?;
577                        }
578                    }
579                }
580            },
581
582            Get { id, keys, plan } => {
583                ctx.indent.set(); // mark the current indent level
584
585                // Resolve the id as a string.
586                let id = match id {
587                    Id::Local(id) => id.to_string(),
588                    Id::Global(id) => ctx
589                        .humanizer
590                        .humanize_id(*id)
591                        .unwrap_or_else(|| id.to_string()),
592                };
593                // Render plan-specific fields.
594                use crate::plan::GetPlan;
595                match plan {
596                    GetPlan::PassArrangements => {
597                        writeln!(
598                            f,
599                            "{}Get::PassArrangements {}{}",
600                            ctx.indent, id, annotations
601                        )?;
602                        ctx.indent += 1;
603                    }
604                    GetPlan::Arrangement(key, val, mfp) => {
605                        writeln!(f, "{}Get::Arrangement {}{}", ctx.indent, id, annotations)?;
606                        ctx.indent += 1;
607                        mode.expr(mfp, None).fmt_text(f, ctx)?;
608                        {
609                            let key = mode.seq(key, None);
610                            let key = CompactScalars(key);
611                            writeln!(f, "{}key={}", ctx.indent, key)?;
612                        }
613                        if let Some(val) = val {
614                            let val = mode.expr(val, None);
615                            writeln!(f, "{}val={}", ctx.indent, val)?;
616                        }
617                    }
618                    GetPlan::Collection(mfp) => {
619                        writeln!(f, "{}Get::Collection {}{}", ctx.indent, id, annotations)?;
620                        ctx.indent += 1;
621                        mode.expr(mfp, None).fmt_text(f, ctx)?;
622                    }
623                }
624
625                // Render plan-agnostic fields (common for all plans for this variant).
626                keys.fmt_text(f, ctx)?;
627
628                ctx.indent.reset(); // reset the original indent level
629            }
630            Let { id, value, body } => {
631                let mut bindings = vec![(id, value.as_ref())];
632                let mut head = body.as_ref();
633
634                // Render Let-blocks nested in the body an outer Let-block in one step
635                // with a flattened list of bindings
636                while let Let { id, value, body } = &head.node {
637                    bindings.push((id, value.as_ref()));
638                    head = body.as_ref();
639                }
640
641                writeln!(f, "{}With", ctx.indent)?;
642                ctx.indented(|ctx| {
643                    for (id, value) in bindings.iter() {
644                        writeln!(f, "{}cte {} =", ctx.indent, *id)?;
645                        ctx.indented(|ctx| value.fmt_text(f, ctx))?;
646                    }
647                    Ok(())
648                })?;
649                writeln!(f, "{}Return{}", ctx.indent, annotations)?;
650                ctx.indented(|ctx| head.fmt_text(f, ctx))?;
651            }
652            LetRec {
653                ids,
654                values,
655                limits,
656                body,
657            } => {
658                let head = body.as_ref();
659
660                writeln!(f, "{}With Mutually Recursive", ctx.indent)?;
661                ctx.indented(|ctx| {
662                    let bindings = ids.iter().zip_eq(values).zip_eq(limits);
663                    for ((id, value), limit) in bindings {
664                        if let Some(limit) = limit {
665                            writeln!(f, "{}cte {} {} =", ctx.indent, limit, *id)?;
666                        } else {
667                            writeln!(f, "{}cte {} =", ctx.indent, *id)?;
668                        }
669                        ctx.indented(|ctx| value.fmt_text(f, ctx))?;
670                    }
671                    Ok(())
672                })?;
673                writeln!(f, "{}Return{}", ctx.indent, annotations)?;
674                ctx.indented(|ctx| head.fmt_text(f, ctx))?;
675            }
676            Mfp {
677                input,
678                mfp,
679                input_key_val,
680            } => {
681                writeln!(f, "{}Mfp{}", ctx.indent, annotations)?;
682                ctx.indented(|ctx| {
683                    mode.expr(mfp, None).fmt_text(f, ctx)?;
684                    if let Some((key, val)) = input_key_val {
685                        {
686                            let key = mode.seq(key, None);
687                            let key = CompactScalars(key);
688                            writeln!(f, "{}input_key={}", ctx.indent, key)?;
689                        }
690                        if let Some(val) = val {
691                            let val = mode.expr(val, None);
692                            writeln!(f, "{}input_val={}", ctx.indent, val)?;
693                        }
694                    }
695                    input.fmt_text(f, ctx)
696                })?;
697            }
698            FlatMap {
699                input_key,
700                input,
701                exprs,
702                func,
703                mfp_after,
704            } => {
705                let exprs = mode.seq(exprs, None);
706                let exprs = CompactScalars(exprs);
707                writeln!(
708                    f,
709                    "{}FlatMap {}({}){}",
710                    ctx.indent, func, exprs, annotations
711                )?;
712                ctx.indented(|ctx| {
713                    if let Some(key) = input_key {
714                        let key = mode.seq(key, None);
715                        let key = CompactScalars(key);
716                        writeln!(f, "{}input_key={}", ctx.indent, key)?;
717                    }
718                    if !mfp_after.is_identity() {
719                        writeln!(f, "{}mfp_after", ctx.indent)?;
720                        ctx.indented(|ctx| mode.expr(mfp_after, None).fmt_text(f, ctx))?;
721                    }
722                    input.fmt_text(f, ctx)
723                })?;
724            }
725            Join { inputs, plan } => {
726                use crate::plan::join::JoinPlan;
727                match plan {
728                    JoinPlan::Linear(plan) => {
729                        writeln!(f, "{}Join::Linear{}", ctx.indent, annotations)?;
730                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
731                    }
732                    JoinPlan::Delta(plan) => {
733                        writeln!(f, "{}Join::Delta{}", ctx.indent, annotations)?;
734                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
735                    }
736                }
737                ctx.indented(|ctx| {
738                    for input in inputs {
739                        input.fmt_text(f, ctx)?;
740                    }
741                    Ok(())
742                })?;
743            }
744            Reduce {
745                input_key,
746                input,
747                key_val_plan,
748                plan,
749                mfp_after,
750            } => {
751                use crate::plan::reduce::ReducePlan;
752                match plan {
753                    ReducePlan::Distinct => {
754                        writeln!(f, "{}Reduce::Distinct{}", ctx.indent, annotations)?;
755                    }
756                    ReducePlan::Accumulable(plan) => {
757                        writeln!(f, "{}Reduce::Accumulable{}", ctx.indent, annotations)?;
758                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
759                    }
760                    ReducePlan::Hierarchical(plan) => {
761                        writeln!(f, "{}Reduce::Hierarchical{}", ctx.indent, annotations)?;
762                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
763                    }
764                    ReducePlan::Basic(plan) => {
765                        writeln!(f, "{}Reduce::Basic{}", ctx.indent, annotations)?;
766                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
767                    }
768                    ReducePlan::Collation(plan) => {
769                        writeln!(f, "{}Reduce::Collation{}", ctx.indent, annotations)?;
770                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
771                    }
772                }
773                ctx.indented(|ctx| {
774                    if let Some(key) = input_key {
775                        let key = mode.seq(key, None);
776                        let key = CompactScalars(key);
777                        writeln!(f, "{}input_key={}", ctx.indent, key)?;
778                    }
779                    if key_val_plan.key_plan.deref().is_identity() {
780                        writeln!(f, "{}key_plan=id", ctx.indent)?;
781                    } else {
782                        writeln!(f, "{}key_plan", ctx.indent)?;
783                        ctx.indented(|ctx| {
784                            let key_plan = mode.expr(key_val_plan.key_plan.deref(), None);
785                            key_plan.fmt_text(f, ctx)
786                        })?;
787                    }
788                    if key_val_plan.val_plan.deref().is_identity() {
789                        writeln!(f, "{}val_plan=id", ctx.indent)?;
790                    } else {
791                        writeln!(f, "{}val_plan", ctx.indent)?;
792                        ctx.indented(|ctx| {
793                            let val_plan = mode.expr(key_val_plan.val_plan.deref(), None);
794                            val_plan.fmt_text(f, ctx)
795                        })?;
796                    }
797                    if !mfp_after.is_identity() {
798                        writeln!(f, "{}mfp_after", ctx.indent)?;
799                        ctx.indented(|ctx| mode.expr(mfp_after, None).fmt_text(f, ctx))?;
800                    }
801
802                    input.fmt_text(f, ctx)
803                })?;
804            }
805            TopK { input, top_k_plan } => {
806                use crate::plan::top_k::TopKPlan;
807                match top_k_plan {
808                    TopKPlan::MonotonicTop1(plan) => {
809                        write!(f, "{}TopK::MonotonicTop1", ctx.indent)?;
810                        if plan.group_key.len() > 0 {
811                            let group_by = mode.seq(&plan.group_key, None);
812                            let group_by = CompactScalars(group_by);
813                            write!(f, " group_by=[{}]", group_by)?;
814                        }
815                        if plan.order_key.len() > 0 {
816                            let order_by = mode.seq(&plan.order_key, None);
817                            let order_by = separated(", ", order_by);
818                            write!(f, " order_by=[{}]", order_by)?;
819                        }
820                        if plan.must_consolidate {
821                            write!(f, " must_consolidate")?;
822                        }
823                    }
824                    TopKPlan::MonotonicTopK(plan) => {
825                        write!(f, "{}TopK::MonotonicTopK", ctx.indent)?;
826                        if plan.group_key.len() > 0 {
827                            let group_by = mode.seq(&plan.group_key, None);
828                            let group_by = CompactScalars(group_by);
829                            write!(f, " group_by=[{}]", group_by)?;
830                        }
831                        if plan.order_key.len() > 0 {
832                            let order_by = mode.seq(&plan.order_key, None);
833                            let order_by = separated(", ", order_by);
834                            write!(f, " order_by=[{}]", order_by)?;
835                        }
836                        if let Some(limit) = &plan.limit {
837                            let limit = mode.expr(limit, None);
838                            write!(f, " limit={}", limit)?;
839                        }
840                        if plan.must_consolidate {
841                            write!(f, " must_consolidate")?;
842                        }
843                    }
844                    TopKPlan::Basic(plan) => {
845                        write!(f, "{}TopK::Basic", ctx.indent)?;
846                        if plan.group_key.len() > 0 {
847                            let group_by = mode.seq(&plan.group_key, None);
848                            let group_by = CompactScalars(group_by);
849                            write!(f, " group_by=[{}]", group_by)?;
850                        }
851                        if plan.order_key.len() > 0 {
852                            let order_by = mode.seq(&plan.order_key, None);
853                            let order_by = separated(", ", order_by);
854                            write!(f, " order_by=[{}]", order_by)?;
855                        }
856                        if let Some(limit) = &plan.limit {
857                            let limit = mode.expr(limit, None);
858                            write!(f, " limit={}", limit)?;
859                        }
860                        if &plan.offset > &0 {
861                            write!(f, " offset={}", plan.offset)?;
862                        }
863                    }
864                }
865                writeln!(f, "{}", annotations)?;
866                ctx.indented(|ctx| input.fmt_text(f, ctx))?;
867            }
868            Negate { input } => {
869                writeln!(f, "{}Negate{}", ctx.indent, annotations)?;
870                ctx.indented(|ctx| input.fmt_text(f, ctx))?;
871            }
872            Threshold {
873                input,
874                threshold_plan,
875            } => {
876                use crate::plan::threshold::ThresholdPlan;
877                match threshold_plan {
878                    ThresholdPlan::Basic(plan) => {
879                        let ensure_arrangement = Arrangement::from(&plan.ensure_arrangement);
880                        write!(f, "{}Threshold::Basic", ctx.indent)?;
881                        write!(f, " ensure_arrangement=")?;
882                        ensure_arrangement.fmt_text(f, ctx)?;
883                        writeln!(f, "{}", annotations)?;
884                    }
885                };
886                ctx.indented(|ctx| input.fmt_text(f, ctx))?;
887            }
888            Union {
889                inputs,
890                consolidate_output,
891            } => {
892                if *consolidate_output {
893                    writeln!(
894                        f,
895                        "{}Union consolidate_output={}{}",
896                        ctx.indent, consolidate_output, annotations
897                    )?;
898                } else {
899                    writeln!(f, "{}Union{}", ctx.indent, annotations)?;
900                }
901                ctx.indented(|ctx| {
902                    for input in inputs.iter() {
903                        input.fmt_text(f, ctx)?;
904                    }
905                    Ok(())
906                })?;
907            }
908            ArrangeBy {
909                input_key,
910                input,
911                input_mfp,
912                forms,
913            } => {
914                writeln!(f, "{}ArrangeBy{}", ctx.indent, annotations)?;
915                ctx.indented(|ctx| {
916                    if let Some(key) = input_key {
917                        let key = mode.seq(key, None);
918                        let key = CompactScalars(key);
919                        writeln!(f, "{}input_key=[{}]", ctx.indent, key)?;
920                    }
921                    mode.expr(input_mfp, None).fmt_text(f, ctx)?;
922                    forms.fmt_text(f, ctx)?;
923                    // Render input
924                    input.fmt_text(f, ctx)
925                })?;
926            }
927        }
928
929        Ok(())
930    }
931}
932
933impl DisplayText<PlanRenderingContext<'_, Plan>> for AvailableCollections {
934    fn fmt_text(
935        &self,
936        f: &mut fmt::Formatter<'_>,
937        ctx: &mut PlanRenderingContext<'_, Plan>,
938    ) -> fmt::Result {
939        if ctx.config.verbose_syntax {
940            self.fmt_verbose_text(f, ctx)
941        } else {
942            self.fmt_default_text(f, ctx)
943        }
944    }
945}
946impl AvailableCollections {
947    fn fmt_default_text(
948        &self,
949        f: &mut fmt::Formatter<'_>,
950        ctx: &mut PlanRenderingContext<'_, Plan>,
951    ) -> fmt::Result {
952        let plural = if self.arranged.len() == 1 { "" } else { "s" };
953        write!(
954            f,
955            "{}Keys: {} arrangement{plural} available",
956            ctx.indent,
957            self.arranged.len()
958        )?;
959
960        if self.raw {
961            writeln!(f, ", plus raw stream")?;
962        } else {
963            writeln!(f, ", no raw stream")?;
964        }
965
966        ctx.indented(|ctx| {
967            for (i, arrangement) in self.arranged.iter().enumerate() {
968                let arrangement = Arrangement::from(arrangement);
969                write!(f, "{}Arrangement {i}: ", ctx.indent)?;
970                arrangement.fmt_text(f, ctx)?;
971                writeln!(f, "")?;
972            }
973            Ok(())
974        })?;
975
976        Ok(())
977    }
978
979    fn fmt_verbose_text(
980        &self,
981        f: &mut fmt::Formatter<'_>,
982        ctx: &mut PlanRenderingContext<'_, Plan>,
983    ) -> fmt::Result {
984        // raw field
985        let raw = &self.raw;
986        writeln!(f, "{}raw={}", ctx.indent, raw)?;
987        // arranged field
988        for (i, arrangement) in self.arranged.iter().enumerate() {
989            let arrangement = Arrangement::from(arrangement);
990            write!(f, "{}arrangements[{}]=", ctx.indent, i)?;
991            arrangement.fmt_text(f, ctx)?;
992            writeln!(f, "")?;
993        }
994        Ok(())
995    }
996}
997
998impl DisplayText<PlanRenderingContext<'_, Plan>> for LinearJoinPlan {
999    fn fmt_text(
1000        &self,
1001        f: &mut fmt::Formatter<'_>,
1002        ctx: &mut PlanRenderingContext<'_, Plan>,
1003    ) -> fmt::Result {
1004        if ctx.config.verbose_syntax {
1005            self.fmt_verbose_text(f, ctx)
1006        } else {
1007            self.fmt_default_text(f, ctx)
1008        }
1009    }
1010}
1011impl LinearJoinPlan {
1012    #[allow(clippy::needless_pass_by_ref_mut)]
1013    fn fmt_default_text(
1014        &self,
1015        f: &mut fmt::Formatter<'_>,
1016        ctx: &mut PlanRenderingContext<'_, Plan>,
1017    ) -> fmt::Result {
1018        for (i, plan) in self.stage_plans.iter().enumerate().rev() {
1019            let lookup_relation = &plan.lookup_relation;
1020            write!(f, "{}Join stage {i} in %{lookup_relation}", ctx.indent)?;
1021            if !plan.lookup_key.is_empty() {
1022                let lookup_key = CompactScalarSeq(&plan.lookup_key);
1023                writeln!(f, " with lookup key {lookup_key}",)?;
1024            } else {
1025                writeln!(f)?;
1026            }
1027            if plan.closure.maps_or_filters() {
1028                ctx.indented(|ctx| plan.closure.fmt_default_text(f, ctx))?;
1029            }
1030        }
1031        if let Some(final_closure) = &self.final_closure {
1032            if final_closure.maps_or_filters() {
1033                ctx.indented(|ctx| {
1034                    writeln!(f, "{}Final closure:", ctx.indent)?;
1035                    ctx.indented(|ctx| final_closure.fmt_default_text(f, ctx))
1036                })?;
1037            }
1038        }
1039        Ok(())
1040    }
1041
1042    fn fmt_verbose_text(
1043        &self,
1044        f: &mut fmt::Formatter<'_>,
1045        ctx: &mut PlanRenderingContext<'_, Plan>,
1046    ) -> fmt::Result {
1047        let mode = HumanizedExplain::new(ctx.config.redacted);
1048        let plan = self;
1049        if let Some(closure) = plan.final_closure.as_ref() {
1050            if !closure.is_identity() {
1051                writeln!(f, "{}final_closure", ctx.indent)?;
1052                ctx.indented(|ctx| closure.fmt_text(f, ctx))?;
1053            }
1054        }
1055        for (i, plan) in plan.stage_plans.iter().enumerate() {
1056            writeln!(f, "{}linear_stage[{}]", ctx.indent, i)?;
1057            ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1058        }
1059        if let Some(closure) = plan.initial_closure.as_ref() {
1060            if !closure.is_identity() {
1061                writeln!(f, "{}initial_closure", ctx.indent)?;
1062                ctx.indented(|ctx| closure.fmt_text(f, ctx))?;
1063            }
1064        }
1065        match &plan.source_key {
1066            Some(source_key) => {
1067                let source_key = mode.seq(source_key, None);
1068                let source_key = CompactScalars(source_key);
1069                writeln!(
1070                    f,
1071                    "{}source={{ relation={}, key=[{}] }}",
1072                    ctx.indent, &plan.source_relation, source_key
1073                )?
1074            }
1075            None => writeln!(
1076                f,
1077                "{}source={{ relation={}, key=[] }}",
1078                ctx.indent, &plan.source_relation
1079            )?,
1080        };
1081        Ok(())
1082    }
1083}
1084
1085impl DisplayText<PlanRenderingContext<'_, Plan>> for LinearStagePlan {
1086    fn fmt_text(
1087        &self,
1088        f: &mut fmt::Formatter<'_>,
1089        ctx: &mut PlanRenderingContext<'_, Plan>,
1090    ) -> fmt::Result {
1091        if ctx.config.verbose_syntax {
1092            self.fmt_verbose_text(f, ctx)
1093        } else {
1094            self.fmt_default_text(f, ctx)
1095        }
1096    }
1097}
1098impl LinearStagePlan {
1099    #[allow(clippy::needless_pass_by_ref_mut)]
1100    fn fmt_default_text(
1101        &self,
1102        f: &mut fmt::Formatter<'_>,
1103        ctx: &mut PlanRenderingContext<'_, Plan>,
1104    ) -> fmt::Result {
1105        // NB this code path should not be live, as fmt_default_text for
1106        // `LinearJoinPlan` prints out each stage already
1107        let lookup_relation = &self.lookup_relation;
1108        if !self.lookup_key.is_empty() {
1109            let lookup_key = CompactScalarSeq(&self.lookup_key);
1110            writeln!(
1111                f,
1112                "{}Lookup key {lookup_key} in %{lookup_relation}",
1113                ctx.indent
1114            )
1115        } else {
1116            writeln!(f, "{}Lookup in %{lookup_relation}", ctx.indent)
1117        }
1118    }
1119
1120    fn fmt_verbose_text(
1121        &self,
1122        f: &mut fmt::Formatter<'_>,
1123        ctx: &mut PlanRenderingContext<'_, Plan>,
1124    ) -> fmt::Result {
1125        let mode = HumanizedExplain::new(ctx.config.redacted);
1126
1127        let plan = self;
1128        if !plan.closure.is_identity() {
1129            writeln!(f, "{}closure", ctx.indent)?;
1130            ctx.indented(|ctx| plan.closure.fmt_text(f, ctx))?;
1131        }
1132        {
1133            let lookup_relation = &plan.lookup_relation;
1134            let lookup_key = CompactScalarSeq(&plan.lookup_key);
1135            writeln!(
1136                f,
1137                "{}lookup={{ relation={}, key=[{}] }}",
1138                ctx.indent, lookup_relation, lookup_key
1139            )?;
1140        }
1141        {
1142            let stream_key = mode.seq(&plan.stream_key, None);
1143            let stream_key = CompactScalars(stream_key);
1144            let stream_thinning = Indices(&plan.stream_thinning);
1145            writeln!(
1146                f,
1147                "{}stream={{ key=[{}], thinning=({}) }}",
1148                ctx.indent, stream_key, stream_thinning
1149            )?;
1150        }
1151        Ok(())
1152    }
1153}
1154
1155impl DisplayText<PlanRenderingContext<'_, Plan>> for DeltaJoinPlan {
1156    fn fmt_text(
1157        &self,
1158        f: &mut fmt::Formatter<'_>,
1159        ctx: &mut PlanRenderingContext<'_, Plan>,
1160    ) -> fmt::Result {
1161        if ctx.config.verbose_syntax {
1162            self.fmt_verbose_text(f, ctx)
1163        } else {
1164            self.fmt_default_text(f, ctx)
1165        }
1166    }
1167}
1168impl DeltaJoinPlan {
1169    fn fmt_default_text(
1170        &self,
1171        f: &mut fmt::Formatter<'_>,
1172        ctx: &mut PlanRenderingContext<'_, Plan>,
1173    ) -> fmt::Result {
1174        for (i, plan) in self.path_plans.iter().enumerate() {
1175            writeln!(f, "{}Delta join path for input %{i}", ctx.indent)?;
1176            ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1177        }
1178        Ok(())
1179    }
1180
1181    fn fmt_verbose_text(
1182        &self,
1183        f: &mut fmt::Formatter<'_>,
1184        ctx: &mut PlanRenderingContext<'_, Plan>,
1185    ) -> fmt::Result {
1186        for (i, plan) in self.path_plans.iter().enumerate() {
1187            writeln!(f, "{}plan_path[{}]", ctx.indent, i)?;
1188            ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1189        }
1190        Ok(())
1191    }
1192}
1193
1194impl DisplayText<PlanRenderingContext<'_, Plan>> for DeltaPathPlan {
1195    fn fmt_text(
1196        &self,
1197        f: &mut fmt::Formatter<'_>,
1198        ctx: &mut PlanRenderingContext<'_, Plan>,
1199    ) -> fmt::Result {
1200        if ctx.config.verbose_syntax {
1201            self.fmt_verbose_text(f, ctx)
1202        } else {
1203            self.fmt_default_text(f, ctx)
1204        }
1205    }
1206}
1207
1208impl DeltaPathPlan {
1209    #[allow(clippy::needless_pass_by_ref_mut)]
1210    fn fmt_default_text(
1211        &self,
1212        f: &mut fmt::Formatter<'_>,
1213        ctx: &mut PlanRenderingContext<'_, Plan>,
1214    ) -> fmt::Result {
1215        for (
1216            i,
1217            DeltaStagePlan {
1218                lookup_key,
1219                lookup_relation,
1220                closure,
1221                ..
1222            },
1223        ) in self.stage_plans.iter().enumerate()
1224        {
1225            if !lookup_key.is_empty() {
1226                let lookup_key = CompactScalarSeq(lookup_key);
1227                writeln!(
1228                    f,
1229                    "{}stage {i} for %{lookup_relation}: lookup key {lookup_key}",
1230                    ctx.indent
1231                )?;
1232            } else {
1233                writeln!(f, "{}stage %{i} for  %{lookup_relation}", ctx.indent)?;
1234            }
1235            if closure.maps_or_filters() {
1236                ctx.indented(|ctx| closure.fmt_default_text(f, ctx))?;
1237            }
1238        }
1239        if let Some(final_closure) = &self.final_closure {
1240            if final_closure.maps_or_filters() {
1241                ctx.indented(|ctx| {
1242                    writeln!(f, "{}Final closure:", ctx.indent)?;
1243                    ctx.indented(|ctx| final_closure.fmt_default_text(f, ctx))
1244                })?;
1245            }
1246        }
1247        Ok(())
1248    }
1249
1250    fn fmt_verbose_text(
1251        &self,
1252        f: &mut fmt::Formatter<'_>,
1253        ctx: &mut PlanRenderingContext<'_, Plan>,
1254    ) -> fmt::Result {
1255        let mode = HumanizedExplain::new(ctx.config.redacted);
1256        let plan = self;
1257        if let Some(closure) = plan.final_closure.as_ref() {
1258            if !closure.is_identity() {
1259                writeln!(f, "{}final_closure", ctx.indent)?;
1260                ctx.indented(|ctx| closure.fmt_text(f, ctx))?;
1261            }
1262        }
1263        for (i, plan) in plan.stage_plans.iter().enumerate().rev() {
1264            writeln!(f, "{}delta_stage[{}]", ctx.indent, i)?;
1265            ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1266        }
1267        if !plan.initial_closure.is_identity() {
1268            writeln!(f, "{}initial_closure", ctx.indent)?;
1269            ctx.indented(|ctx| plan.initial_closure.fmt_text(f, ctx))?;
1270        }
1271        {
1272            let source_relation = &plan.source_relation;
1273            let source_key = mode.seq(&plan.source_key, None);
1274            let source_key = CompactScalars(source_key);
1275            writeln!(
1276                f,
1277                "{}source={{ relation={}, key=[{}] }}",
1278                ctx.indent, source_relation, source_key
1279            )?;
1280        }
1281        Ok(())
1282    }
1283}
1284
1285impl DisplayText<PlanRenderingContext<'_, Plan>> for DeltaStagePlan {
1286    fn fmt_text(
1287        &self,
1288        f: &mut fmt::Formatter<'_>,
1289        ctx: &mut PlanRenderingContext<'_, Plan>,
1290    ) -> fmt::Result {
1291        if ctx.config.verbose_syntax {
1292            self.fmt_verbose_text(f, ctx)
1293        } else {
1294            self.fmt_default_text(f, ctx)
1295        }
1296    }
1297}
1298impl DeltaStagePlan {
1299    #[allow(clippy::needless_pass_by_ref_mut)]
1300    fn fmt_default_text(
1301        &self,
1302        f: &mut fmt::Formatter<'_>,
1303        ctx: &mut PlanRenderingContext<'_, Plan>,
1304    ) -> fmt::Result {
1305        // NB this code path should not be live, as fmt_default_text for
1306        // `DeltaPathPlan` prints out each stage already
1307        let lookup_relation = &self.lookup_relation;
1308        let lookup_key = CompactScalarSeq(&self.lookup_key);
1309        writeln!(
1310            f,
1311            "{}Lookup key {lookup_key} in %{lookup_relation}",
1312            ctx.indent
1313        )
1314    }
1315
1316    fn fmt_verbose_text(
1317        &self,
1318        f: &mut fmt::Formatter<'_>,
1319        ctx: &mut PlanRenderingContext<'_, Plan>,
1320    ) -> fmt::Result {
1321        let mode = HumanizedExplain::new(ctx.config.redacted);
1322        let plan = self;
1323        if !plan.closure.is_identity() {
1324            writeln!(f, "{}closure", ctx.indent)?;
1325            ctx.indented(|ctx| plan.closure.fmt_text(f, ctx))?;
1326        }
1327        {
1328            let lookup_relation = &plan.lookup_relation;
1329            let lookup_key = mode.seq(&plan.lookup_key, None);
1330            let lookup_key = CompactScalars(lookup_key);
1331            writeln!(
1332                f,
1333                "{}lookup={{ relation={}, key=[{}] }}",
1334                ctx.indent, lookup_relation, lookup_key
1335            )?;
1336        }
1337        {
1338            let stream_key = mode.seq(&plan.stream_key, None);
1339            let stream_key = CompactScalars(stream_key);
1340            let stream_thinning = mode.seq(&plan.stream_thinning, None);
1341            let stream_thinning = CompactScalars(stream_thinning);
1342            writeln!(
1343                f,
1344                "{}stream={{ key=[{}], thinning=({}) }}",
1345                ctx.indent, stream_key, stream_thinning
1346            )?;
1347        }
1348        Ok(())
1349    }
1350}
1351
1352impl DisplayText<PlanRenderingContext<'_, Plan>> for JoinClosure {
1353    fn fmt_text(
1354        &self,
1355        f: &mut fmt::Formatter<'_>,
1356        ctx: &mut PlanRenderingContext<'_, Plan>,
1357    ) -> fmt::Result {
1358        if ctx.config.verbose_syntax {
1359            self.fmt_verbose_text(f, ctx)
1360        } else {
1361            self.fmt_default_text(f, ctx)
1362        }
1363    }
1364}
1365impl JoinClosure {
1366    fn fmt_default_text(
1367        &self,
1368        f: &mut fmt::Formatter<'_>,
1369        ctx: &mut PlanRenderingContext<'_, Plan>,
1370    ) -> fmt::Result {
1371        let mode = HumanizedExplain::new(ctx.config.redacted);
1372        if !self.before.expressions.is_empty() || !self.before.predicates.is_empty() {
1373            mode.expr(self.before.deref(), None).fmt_text(f, ctx)?;
1374        }
1375        if !self.ready_equivalences.is_empty() {
1376            let equivalences = separated(
1377                " AND ",
1378                self.ready_equivalences
1379                    .iter()
1380                    .map(|equivalence| separated(" = ", mode.seq(equivalence, None))),
1381            );
1382            writeln!(f, "{}Equivalences: {equivalences}", ctx.indent)?;
1383        }
1384        Ok(())
1385    }
1386
1387    fn fmt_verbose_text(
1388        &self,
1389        f: &mut fmt::Formatter<'_>,
1390        ctx: &mut PlanRenderingContext<'_, Plan>,
1391    ) -> fmt::Result {
1392        let mode = HumanizedExplain::new(ctx.config.redacted);
1393        mode.expr(self.before.deref(), None).fmt_text(f, ctx)?;
1394        if !self.ready_equivalences.is_empty() {
1395            let equivalences = separated(
1396                " AND ",
1397                self.ready_equivalences
1398                    .iter()
1399                    .map(|equivalence| separated(" = ", mode.seq(equivalence, None))),
1400            );
1401            writeln!(f, "{}ready_equivalences={}", ctx.indent, equivalences)?;
1402        }
1403        Ok(())
1404    }
1405}
1406
1407impl DisplayText<PlanRenderingContext<'_, Plan>> for AccumulablePlan {
1408    fn fmt_text(
1409        &self,
1410        f: &mut fmt::Formatter<'_>,
1411        ctx: &mut PlanRenderingContext<'_, Plan>,
1412    ) -> fmt::Result {
1413        if ctx.config.verbose_syntax {
1414            self.fmt_verbose_text(f, ctx)
1415        } else {
1416            self.fmt_default_text(f, ctx)
1417        }
1418    }
1419}
1420impl AccumulablePlan {
1421    #[allow(clippy::needless_pass_by_ref_mut)]
1422    fn fmt_default_text(
1423        &self,
1424        f: &mut fmt::Formatter<'_>,
1425        ctx: &mut PlanRenderingContext<'_, Plan>,
1426    ) -> fmt::Result {
1427        let mode = HumanizedExplain::new(ctx.config.redacted);
1428
1429        if !self.simple_aggrs.is_empty() {
1430            let simple_aggrs = self
1431                .simple_aggrs
1432                .iter()
1433                .map(|(_i_aggs, _i_datum, agg)| mode.expr(agg, None));
1434            let simple_aggrs = separated(", ", simple_aggrs);
1435            writeln!(f, "{}Simple aggregates: {simple_aggrs}", ctx.indent)?;
1436        }
1437
1438        if !self.distinct_aggrs.is_empty() {
1439            let distinct_aggrs = self
1440                .distinct_aggrs
1441                .iter()
1442                .map(|(_i_aggs, _i_datum, agg)| mode.expr(agg, None));
1443            let distinct_aggrs = separated(", ", distinct_aggrs);
1444            writeln!(f, "{}Distinct aggregates: {distinct_aggrs}", ctx.indent)?;
1445        }
1446        Ok(())
1447    }
1448
1449    #[allow(clippy::needless_pass_by_ref_mut)]
1450    fn fmt_verbose_text(
1451        &self,
1452        f: &mut fmt::Formatter<'_>,
1453        ctx: &mut PlanRenderingContext<'_, Plan>,
1454    ) -> fmt::Result {
1455        let mode = HumanizedExplain::new(ctx.config.redacted);
1456        // full_aggrs (skipped because they are repeated in simple_aggrs ∪ distinct_aggrs)
1457        // for (i, aggr) in self.full_aggrs.iter().enumerate() {
1458        //     write!(f, "{}full_aggrs[{}]=", ctx.indent, i)?;
1459        //     aggr.fmt_text(f, &mut ())?;
1460        //     writeln!(f)?;
1461        // }
1462        // simple_aggrs
1463        for (i, (i_aggs, i_datum, agg)) in self.simple_aggrs.iter().enumerate() {
1464            let agg = mode.expr(agg, None);
1465            write!(f, "{}simple_aggrs[{}]=", ctx.indent, i)?;
1466            writeln!(f, "({}, {}, {})", i_aggs, i_datum, agg)?;
1467        }
1468        // distinct_aggrs
1469        for (i, (i_aggs, i_datum, agg)) in self.distinct_aggrs.iter().enumerate() {
1470            let agg = mode.expr(agg, None);
1471            write!(f, "{}distinct_aggrs[{}]=", ctx.indent, i)?;
1472            writeln!(f, "({}, {}, {})", i_aggs, i_datum, agg)?;
1473        }
1474        Ok(())
1475    }
1476}
1477
1478impl DisplayText<PlanRenderingContext<'_, Plan>> for HierarchicalPlan {
1479    fn fmt_text(
1480        &self,
1481        f: &mut fmt::Formatter<'_>,
1482        ctx: &mut PlanRenderingContext<'_, Plan>,
1483    ) -> fmt::Result {
1484        if ctx.config.verbose_syntax {
1485            self.fmt_verbose_text(f, ctx)
1486        } else {
1487            self.fmt_default_text(f, ctx)
1488        }
1489    }
1490}
1491impl HierarchicalPlan {
1492    #[allow(clippy::needless_pass_by_ref_mut)]
1493    fn fmt_default_text(
1494        &self,
1495        f: &mut fmt::Formatter<'_>,
1496        ctx: &mut PlanRenderingContext<'_, Plan>,
1497    ) -> fmt::Result {
1498        let mode = HumanizedExplain::new(ctx.config.redacted);
1499        let aggr_funcs = mode.seq(self.aggr_funcs(), None);
1500        let aggr_funcs = separated(", ", aggr_funcs);
1501        writeln!(f, "{}Aggregations: {aggr_funcs}", ctx.indent)
1502    }
1503
1504    #[allow(clippy::needless_pass_by_ref_mut)]
1505    fn fmt_verbose_text(
1506        &self,
1507        f: &mut fmt::Formatter<'_>,
1508        ctx: &mut PlanRenderingContext<'_, Plan>,
1509    ) -> fmt::Result {
1510        let mode = HumanizedExplain::new(ctx.config.redacted);
1511        match self {
1512            HierarchicalPlan::Monotonic(plan) => {
1513                let aggr_funcs = mode.seq(&plan.aggr_funcs, None);
1514                let aggr_funcs = separated(", ", aggr_funcs);
1515                writeln!(f, "{}aggr_funcs=[{}]", ctx.indent, aggr_funcs)?;
1516                let skips = separated(", ", &plan.skips);
1517                writeln!(f, "{}skips=[{}]", ctx.indent, skips)?;
1518                writeln!(f, "{}monotonic", ctx.indent)?;
1519                if plan.must_consolidate {
1520                    writeln!(f, "{}must_consolidate", ctx.indent)?;
1521                }
1522            }
1523            HierarchicalPlan::Bucketed(plan) => {
1524                let aggr_funcs = mode.seq(&plan.aggr_funcs, None);
1525                let aggr_funcs = separated(", ", aggr_funcs);
1526                writeln!(f, "{}aggr_funcs=[{}]", ctx.indent, aggr_funcs)?;
1527                let skips = separated(", ", &plan.skips);
1528                writeln!(f, "{}skips=[{}]", ctx.indent, skips)?;
1529                let buckets = separated(", ", &plan.buckets);
1530                writeln!(f, "{}buckets=[{}]", ctx.indent, buckets)?;
1531            }
1532        }
1533        Ok(())
1534    }
1535}
1536
1537impl DisplayText<PlanRenderingContext<'_, Plan>> for BasicPlan {
1538    fn fmt_text(
1539        &self,
1540        f: &mut fmt::Formatter<'_>,
1541        ctx: &mut PlanRenderingContext<'_, Plan>,
1542    ) -> fmt::Result {
1543        if ctx.config.verbose_syntax {
1544            self.fmt_verbose_text(f, ctx)
1545        } else {
1546            self.fmt_default_text(f, ctx)
1547        }
1548    }
1549}
1550impl BasicPlan {
1551    #[allow(clippy::needless_pass_by_ref_mut)]
1552    fn fmt_default_text(
1553        &self,
1554        f: &mut fmt::Formatter<'_>,
1555        ctx: &mut PlanRenderingContext<'_, Plan>,
1556    ) -> fmt::Result {
1557        let mode = HumanizedExplain::new(ctx.config.redacted);
1558        match self {
1559            BasicPlan::Single(SingleBasicPlan {
1560                index: _,
1561                expr,
1562                fused_unnest_list: _,
1563            }) => {
1564                let agg = mode.expr(expr, None);
1565                writeln!(f, "{}Aggregation: {agg}", ctx.indent)?;
1566            }
1567            BasicPlan::Multiple(aggs) => {
1568                let mode = HumanizedExplain::new(ctx.config.redacted);
1569                write!(f, "{}Aggregations:", ctx.indent)?;
1570
1571                for (_, agg) in aggs.iter() {
1572                    let agg = mode.expr(agg, None);
1573                    write!(f, " {agg}")?;
1574                }
1575                writeln!(f)?;
1576            }
1577        }
1578        Ok(())
1579    }
1580
1581    #[allow(clippy::needless_pass_by_ref_mut)]
1582    fn fmt_verbose_text(
1583        &self,
1584        f: &mut fmt::Formatter<'_>,
1585        ctx: &mut PlanRenderingContext<'_, Plan>,
1586    ) -> fmt::Result {
1587        let mode = HumanizedExplain::new(ctx.config.redacted);
1588        match self {
1589            BasicPlan::Single(SingleBasicPlan {
1590                index,
1591                expr,
1592                fused_unnest_list,
1593            }) => {
1594                let agg = mode.expr(expr, None);
1595                let fused_unnest_list = if *fused_unnest_list {
1596                    ", fused_unnest_list=true"
1597                } else {
1598                    ""
1599                };
1600                writeln!(
1601                    f,
1602                    "{}aggr=({}, {}{})",
1603                    ctx.indent, index, agg, fused_unnest_list
1604                )?;
1605            }
1606            BasicPlan::Multiple(aggs) => {
1607                for (i, (i_datum, agg)) in aggs.iter().enumerate() {
1608                    let agg = mode.expr(agg, None);
1609                    writeln!(f, "{}aggrs[{}]=({}, {})", ctx.indent, i, i_datum, agg)?;
1610                }
1611            }
1612        }
1613        Ok(())
1614    }
1615}
1616
1617impl DisplayText<PlanRenderingContext<'_, Plan>> for CollationPlan {
1618    fn fmt_text(
1619        &self,
1620        f: &mut fmt::Formatter<'_>,
1621        ctx: &mut PlanRenderingContext<'_, Plan>,
1622    ) -> fmt::Result {
1623        if ctx.config.verbose_syntax {
1624            self.fmt_verbose_text(f, ctx)
1625        } else {
1626            self.fmt_default_text(f, ctx)
1627        }
1628    }
1629}
1630
1631impl CollationPlan {
1632    #[allow(clippy::needless_pass_by_ref_mut)]
1633    fn fmt_default_text(
1634        &self,
1635        f: &mut fmt::Formatter<'_>,
1636        ctx: &mut PlanRenderingContext<'_, Plan>,
1637    ) -> fmt::Result {
1638        if let Some(plan) = &self.accumulable {
1639            writeln!(f, "{}Accumulable sub-aggregation", ctx.indent)?;
1640            ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1641        }
1642        if let Some(plan) = &self.hierarchical {
1643            writeln!(f, "{}Hierarchical sub-aggregation", ctx.indent)?;
1644            ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1645        }
1646        if let Some(plan) = &self.basic {
1647            writeln!(f, "{}Non-incremental sub-aggregation", ctx.indent)?;
1648            ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1649        }
1650        Ok(())
1651    }
1652
1653    fn fmt_verbose_text(
1654        &self,
1655        f: &mut fmt::Formatter<'_>,
1656        ctx: &mut PlanRenderingContext<'_, Plan>,
1657    ) -> fmt::Result {
1658        {
1659            use crate::plan::reduce::ReductionType;
1660            let aggregate_types = &self
1661                .aggregate_types
1662                .iter()
1663                .map(|reduction_type| match reduction_type {
1664                    ReductionType::Accumulable => "a".to_string(),
1665                    ReductionType::Hierarchical => "h".to_string(),
1666                    ReductionType::Basic => "b".to_string(),
1667                })
1668                .collect::<Vec<_>>();
1669            let aggregate_types = separated(", ", aggregate_types);
1670            writeln!(f, "{}aggregate_types=[{}]", ctx.indent, aggregate_types)?;
1671        }
1672        if let Some(plan) = &self.accumulable {
1673            writeln!(f, "{}accumulable", ctx.indent)?;
1674            ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1675        }
1676        if let Some(plan) = &self.hierarchical {
1677            writeln!(f, "{}hierarchical", ctx.indent)?;
1678            ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1679        }
1680        if let Some(plan) = &self.basic {
1681            writeln!(f, "{}basic", ctx.indent)?;
1682            ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1683        }
1684        Ok(())
1685    }
1686}
1687
1688/// Helper struct for rendering an arrangement.
1689struct Arrangement<'a> {
1690    key: &'a Vec<MirScalarExpr>,
1691    permutation: Permutation<'a>,
1692    thinning: &'a Vec<usize>,
1693}
1694
1695impl<'a> From<&'a (Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)> for Arrangement<'a> {
1696    fn from(
1697        (key, permutation, thinning): &'a (Vec<MirScalarExpr>, Vec<usize>, Vec<usize>),
1698    ) -> Self {
1699        Arrangement {
1700            key,
1701            permutation: Permutation(permutation),
1702            thinning,
1703        }
1704    }
1705}
1706
1707impl<'a> DisplayText<PlanRenderingContext<'_, Plan>> for Arrangement<'a> {
1708    fn fmt_text(
1709        &self,
1710        f: &mut fmt::Formatter<'_>,
1711        ctx: &mut PlanRenderingContext<'_, Plan>,
1712    ) -> fmt::Result {
1713        if ctx.config.verbose_syntax {
1714            self.fmt_verbose_text(f, ctx)
1715        } else {
1716            self.fmt_default_text(f, ctx)
1717        }
1718    }
1719}
1720
1721impl<'a> Arrangement<'a> {
1722    #[allow(clippy::needless_pass_by_ref_mut)]
1723    fn fmt_default_text(
1724        &self,
1725        f: &mut fmt::Formatter<'_>,
1726        ctx: &PlanRenderingContext<'_, Plan>,
1727    ) -> fmt::Result {
1728        let mode = HumanizedExplain::new(ctx.config.redacted);
1729        if !self.key.is_empty() {
1730            let key = mode.seq(self.key, None);
1731            let key = CompactScalars(key);
1732            write!(f, "{key}")
1733        } else {
1734            write!(f, "(empty key)")
1735        }
1736    }
1737
1738    #[allow(clippy::needless_pass_by_ref_mut)]
1739    fn fmt_verbose_text(
1740        &self,
1741        f: &mut fmt::Formatter<'_>,
1742        ctx: &PlanRenderingContext<'_, Plan>,
1743    ) -> fmt::Result {
1744        let mode = HumanizedExplain::new(ctx.config.redacted);
1745        // prepare key
1746        let key = mode.seq(self.key, None);
1747        let key = CompactScalars(key);
1748        // prepare perumation map
1749        let permutation = &self.permutation;
1750        // prepare thinning
1751        let thinning = Indices(self.thinning);
1752        // write the arrangement spec
1753        write!(
1754            f,
1755            "{{ key=[{}], permutation={}, thinning=({}) }}",
1756            key, permutation, thinning
1757        )
1758    }
1759}
1760
1761/// Helper struct for rendering a permutation.
1762struct Permutation<'a>(&'a Vec<usize>);
1763
1764impl<'a> fmt::Display for Permutation<'a> {
1765    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1766        let mut pairs = vec![];
1767        for (x, y) in self.0.iter().enumerate().filter(|(x, y)| x != *y) {
1768            pairs.push(format!("#{}: #{}", x, y));
1769        }
1770
1771        if pairs.len() > 0 {
1772            write!(f, "{{{}}}", separated(", ", pairs))
1773        } else {
1774            write!(f, "id")
1775        }
1776    }
1777}
1778
1779/// Annotations for physical plans.
1780struct PlanAnnotations {
1781    config: ExplainConfig,
1782    node_id: LirId,
1783}
1784
1785// The current implementation deviates from the `AnnotatedPlan` used in `Mir~`-based plans. This is
1786// fine, since at the moment the only attribute we are going to explain is the `node_id`, which at
1787// the moment is kept inline with the `Plan` variants. If at some point in the future we want to
1788// start deriving and printing attributes that are derived ad-hoc, however, we might want to adopt
1789// `AnnotatedPlan` here as well.
1790impl PlanAnnotations {
1791    fn new(config: ExplainConfig, plan: &Plan) -> Self {
1792        let node_id = plan.lir_id;
1793        Self { config, node_id }
1794    }
1795}
1796
1797impl fmt::Display for PlanAnnotations {
1798    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1799        if self.config.node_ids {
1800            f.debug_struct(" //")
1801                .field("node_id", &self.node_id)
1802                .finish()
1803        } else {
1804            // No physical plan annotations enabled.
1805            Ok(())
1806        }
1807    }
1808}