Skip to main content

mz_compute_types/explain/
text.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! `EXPLAIN ... AS TEXT` support for LIR structures.
11//!
12//! The format adheres to the following conventions:
13//! 1. In general, every line that starts with an uppercase character
14//!    corresponds to a [`Plan`] variant.
15//! 2. Whenever the variant has an attached `~Plan`, the printed name is
16//!    `$V::$P` where `$V` identifies the variant and `$P` the plan.
17//! 3. The fields of a `~Plan` struct attached to a [`Plan`] are rendered as if
18//!    they were part of the variant themself.
19//! 4. Non-recursive parameters of each sub-plan are written as `$key=$val`
20//!    pairs on the same line or as lowercase `$key` fields on indented lines.
21//! 5. A single non-recursive parameter can be written just as `$val`.
22
23use std::fmt;
24use std::ops::Deref;
25
26use itertools::Itertools;
27use mz_expr::explain::{HumanizedExplain, HumanizerMode, fmt_text_constant_rows};
28use mz_expr::{Id, MirScalarExpr};
29use mz_ore::soft_assert_or_log;
30use mz_ore::str::{IndentLike, StrExt, separated};
31use mz_repr::explain::text::DisplayText;
32use mz_repr::explain::{
33    CompactScalarSeq, CompactScalars, ExplainConfig, Indices, PlanRenderingContext,
34};
35
36use crate::plan::join::delta_join::{DeltaPathPlan, DeltaStagePlan};
37use crate::plan::join::linear_join::LinearStagePlan;
38use crate::plan::join::{DeltaJoinPlan, JoinClosure, LinearJoinPlan};
39use crate::plan::reduce::{
40    AccumulablePlan, BasicPlan, BucketedPlan, HierarchicalPlan, MonotonicPlan, SingleBasicPlan,
41};
42use crate::plan::threshold::ThresholdPlan;
43use crate::plan::{ArrangementStrategy, AvailableCollections, LirId, Plan, PlanNode};
44
45impl DisplayText<PlanRenderingContext<'_, Plan>> for Plan {
46    fn fmt_text(
47        &self,
48        f: &mut fmt::Formatter<'_>,
49        ctx: &mut PlanRenderingContext<'_, Plan>,
50    ) -> fmt::Result {
51        if ctx.config.verbose_syntax {
52            self.fmt_verbose_text(f, ctx)
53        } else {
54            self.fmt_default_text(f, ctx)
55        }
56    }
57}
58
59impl Plan {
60    // NOTE: This code needs to be kept in sync with the `Display` instance for
61    // `RenderPlan:ExprHumanizer`.
62    //
63    // This code determines what you see in `EXPLAIN`; that other code
64    // determine what you see when you run `mz_lir_mapping`.
65    fn fmt_default_text(
66        &self,
67        f: &mut fmt::Formatter<'_>,
68        ctx: &mut PlanRenderingContext<'_, Plan>,
69    ) -> fmt::Result {
70        use PlanNode::*;
71
72        let mode = HumanizedExplain::new(ctx.config.redacted);
73        let annotations = PlanAnnotations::new(ctx.config.clone(), self);
74
75        match &self.node {
76            Constant { rows } => {
77                write!(f, "{}→Constant ", ctx.indent)?;
78
79                match rows {
80                    Ok(rows) => write!(
81                        f,
82                        "({} row{})",
83                        rows.len(),
84                        if rows.len() == 1 { "" } else { "s" }
85                    )?,
86                    Err(err) => {
87                        if mode.redacted() {
88                            write!(f, "(error: █)")?;
89                        } else {
90                            write!(f, "(error: {})", err.to_string().quoted(),)?;
91                        }
92                    }
93                }
94
95                writeln!(f, "{annotations}")?;
96            }
97            Get { id, keys, plan } => {
98                ctx.indent.set(); // mark the current indent level
99
100                // Resolve the id as a string.
101                let id = match id {
102                    Id::Local(id) => id.to_string(),
103                    Id::Global(id) => ctx
104                        .humanizer
105                        .humanize_id(*id)
106                        .unwrap_or_else(|| id.to_string()),
107                };
108                // Render plan-specific fields.
109                use crate::plan::GetPlan;
110                match plan {
111                    GetPlan::PassArrangements => {
112                        if keys.raw && keys.arranged.is_empty() {
113                            writeln!(f, "{}→Stream {id}{annotations}", ctx.indent)?;
114                        } else {
115                            // we're not reporting on whether or not `raw` is set
116                            // we're not reporting on how many arrangements there are
117                            writeln!(f, "{}→Arranged {id}{annotations}", ctx.indent)?;
118                        }
119                    }
120                    GetPlan::Arrangement(key, Some(val), mfp) => {
121                        if !mfp.is_identity() {
122                            writeln!(f, "{}→Fused with Child Map/Filter/Project", ctx.indent)?;
123                            ctx.indent += 1;
124                            mode.expr(mfp, None).fmt_default_text(f, ctx)?;
125                            ctx.indent += 1;
126                        }
127
128                        writeln!(f, "{}→Index Lookup on {id}{annotations}", ctx.indent)?;
129                        ctx.indent += 1;
130                        let key = CompactScalars(mode.seq(key, None));
131                        write!(f, "{}Key: ({key}) ", ctx.indent)?;
132                        let val = mode.expr(val, None);
133                        writeln!(f, "Value: {val}")?;
134                    }
135                    GetPlan::Arrangement(key, None, mfp) => {
136                        if !mfp.is_identity() {
137                            writeln!(f, "{}→Fused with Child Map/Filter/Project", ctx.indent)?;
138                            ctx.indent += 1;
139                            mode.expr(mfp, None).fmt_default_text(f, ctx)?;
140                            ctx.indent += 1;
141                        }
142
143                        writeln!(f, "{}→Arranged {id}{annotations}", ctx.indent)?;
144                        ctx.indent += 1;
145                        let key = CompactScalars(mode.seq(key, None));
146                        writeln!(f, "{}Key: ({key})", ctx.indent)?;
147                    }
148                    GetPlan::Collection(mfp) => {
149                        if !mfp.is_identity() {
150                            writeln!(f, "{}→Fused with Child Map/Filter/Project", ctx.indent)?;
151                            ctx.indent += 1;
152                            mode.expr(mfp, None).fmt_default_text(f, ctx)?;
153                            ctx.indent += 1;
154                        }
155
156                        writeln!(f, "{}→Read {id}{annotations}", ctx.indent)?;
157                    }
158                }
159                ctx.indent.reset(); // reset the original indent level
160            }
161            Let { id, value, body } => {
162                let mut bindings = vec![(id, value.as_ref())];
163                let mut head = body.as_ref();
164
165                // Render Let-blocks nested in the body an outer Let-block in one step
166                // with a flattened list of bindings
167                while let Let { id, value, body } = &head.node {
168                    bindings.push((id, value.as_ref()));
169                    head = body.as_ref();
170                }
171
172                writeln!(f, "{}→With", ctx.indent)?;
173                ctx.indented(|ctx| {
174                    for (id, value) in bindings.iter() {
175                        writeln!(f, "{}cte {} =", ctx.indent, *id)?;
176                        ctx.indented(|ctx| value.fmt_text(f, ctx))?;
177                    }
178                    Ok(())
179                })?;
180                writeln!(f, "{}→Return{annotations}", ctx.indent)?;
181                ctx.indented(|ctx| head.fmt_text(f, ctx))?;
182            }
183            LetRec {
184                ids,
185                values,
186                limits,
187                body,
188            } => {
189                let head = body.as_ref();
190
191                writeln!(f, "{}→With Mutually Recursive", ctx.indent)?;
192                ctx.indented(|ctx| {
193                    let bindings = ids.iter().zip_eq(values).zip_eq(limits);
194                    for ((id, value), limit) in bindings {
195                        if let Some(limit) = limit {
196                            writeln!(f, "{}cte {} {} =", ctx.indent, limit, *id)?;
197                        } else {
198                            writeln!(f, "{}cte {} =", ctx.indent, *id)?;
199                        }
200                        ctx.indented(|ctx| value.fmt_text(f, ctx))?;
201                    }
202                    Ok(())
203                })?;
204                writeln!(f, "{}→Return{annotations}", ctx.indent)?;
205                ctx.indented(|ctx| head.fmt_text(f, ctx))?;
206            }
207            Mfp {
208                input,
209                mfp,
210                input_key_val: _,
211            } => {
212                writeln!(f, "{}→Map/Filter/Project{annotations}", ctx.indent)?;
213                ctx.indent.set();
214
215                ctx.indent += 1;
216                mode.expr(mfp, None).fmt_default_text(f, ctx)?;
217
218                // one more nesting level if we showed anything for the MFP
219                if !mfp.is_identity() {
220                    ctx.indent += 1;
221                }
222                input.fmt_text(f, ctx)?;
223                ctx.indent.reset();
224            }
225            FlatMap {
226                input_key: _,
227                input,
228                exprs,
229                func,
230                mfp_after,
231            } => {
232                ctx.indent.set();
233                if !mfp_after.expressions.is_empty() || !mfp_after.predicates.is_empty() {
234                    writeln!(f, "{}→Fused with Child Map/Filter/Project", ctx.indent)?;
235                    ctx.indent += 1;
236                    mode.expr(mfp_after, None).fmt_default_text(f, ctx)?;
237                    ctx.indent += 1;
238                }
239
240                let exprs = mode.seq(exprs, None);
241                let exprs = CompactScalars(exprs);
242                writeln!(
243                    f,
244                    "{}→Table Function {func}({exprs}){annotations}",
245                    ctx.indent
246                )?;
247                ctx.indent += 1;
248
249                input.fmt_text(f, ctx)?;
250
251                ctx.indent.reset();
252            }
253            Join { inputs, plan } => {
254                use crate::plan::join::JoinPlan;
255                match plan {
256                    JoinPlan::Linear(plan) => {
257                        write!(f, "{}→Differential Join", ctx.indent)?;
258                        write!(f, " %{}", plan.source_relation)?;
259                        for dsp in &plan.stage_plans {
260                            write!(f, " » %{}", dsp.lookup_relation)?;
261                        }
262                        writeln!(f, "{annotations}")?;
263                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
264                    }
265                    JoinPlan::Delta(plan) => {
266                        write!(f, "{}→Delta Join", ctx.indent)?;
267                        for dpp in &plan.path_plans {
268                            write!(f, " [%{}", dpp.source_relation)?;
269
270                            for dsp in &dpp.stage_plans {
271                                write!(f, " » %{}", dsp.lookup_relation)?;
272                            }
273                            write!(f, "]")?;
274                        }
275                        writeln!(f, "{annotations}")?;
276                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
277                    }
278                }
279
280                ctx.indented(|ctx| {
281                    for input in inputs {
282                        input.fmt_text(f, ctx)?;
283                    }
284                    Ok(())
285                })?;
286            }
287            Reduce {
288                input_key: _,
289                input,
290                key_val_plan,
291                plan,
292                mfp_after,
293            } => {
294                ctx.indent.set();
295                if !mfp_after.expressions.is_empty() || !mfp_after.predicates.is_empty() {
296                    writeln!(f, "{}→Fused with Child Map/Filter/Project", ctx.indent)?;
297                    ctx.indent += 1;
298                    mode.expr(mfp_after, None).fmt_default_text(f, ctx)?;
299                    ctx.indent += 1;
300                }
301
302                use crate::plan::reduce::ReducePlan;
303                match plan {
304                    ReducePlan::Distinct => {
305                        writeln!(f, "{}→Distinct GroupAggregate{annotations}", ctx.indent)?;
306                    }
307                    ReducePlan::Accumulable(plan) => {
308                        writeln!(f, "{}→Accumulable GroupAggregate{annotations}", ctx.indent)?;
309                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
310                    }
311                    ReducePlan::Hierarchical(
312                        plan @ HierarchicalPlan::Bucketed(BucketedPlan { buckets, .. }),
313                    ) => {
314                        write!(
315                            f,
316                            "{}→Bucketed Hierarchical GroupAggregate (buckets: ",
317                            ctx.indent
318                        )?;
319                        for bucket in buckets {
320                            write!(f, " {bucket}")?;
321                        }
322                        writeln!(f, "){annotations}")?;
323                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
324                    }
325                    ReducePlan::Hierarchical(
326                        plan @ HierarchicalPlan::Monotonic(MonotonicPlan {
327                            must_consolidate, ..
328                        }),
329                    ) => {
330                        write!(f, "{}→", ctx.indent)?;
331                        if *must_consolidate {
332                            write!(f, "Consolidating ")?;
333                        }
334                        writeln!(f, "Monotonic GroupAggregate{annotations}",)?;
335                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
336                    }
337                    ReducePlan::Basic(plan) => {
338                        ctx.indent.set();
339                        if let BasicPlan::Single(SingleBasicPlan {
340                            fused_unnest_list, ..
341                        }) = &plan
342                        {
343                            if *fused_unnest_list {
344                                writeln!(
345                                    f,
346                                    "{}→Fused with Child Table Function unnest_list",
347                                    ctx.indent
348                                )?;
349                                ctx.indent += 1;
350                            }
351                        }
352                        writeln!(
353                            f,
354                            "{}→Non-incremental GroupAggregate{annotations}",
355                            ctx.indent
356                        )?;
357                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
358                        ctx.indent.reset();
359                    }
360                }
361
362                ctx.indented(|ctx| {
363                    let kvp = key_val_plan.key_plan.deref();
364                    if !kvp.is_identity() {
365                        writeln!(f, "{}Key:", ctx.indent)?;
366                        ctx.indented(|ctx| {
367                            let key_plan = mode.expr(kvp, None);
368                            key_plan.fmt_default_text(f, ctx)
369                        })?;
370                    }
371
372                    input.fmt_text(f, ctx)
373                })?;
374
375                ctx.indent.reset();
376            }
377            TopK { input, top_k_plan } => {
378                use crate::plan::top_k::TopKPlan;
379                match top_k_plan {
380                    TopKPlan::MonotonicTop1(plan) => {
381                        write!(f, "{}→", ctx.indent)?;
382                        if plan.must_consolidate {
383                            write!(f, "Consolidating ")?;
384                        }
385                        writeln!(f, "Monotonic Top1{annotations}")?;
386
387                        ctx.indented(|ctx| {
388                            if plan.group_key.len() > 0 {
389                                let group_by = CompactScalars(mode.seq(&plan.group_key, None));
390                                writeln!(f, "{}Group By {group_by}", ctx.indent)?;
391                            }
392                            if plan.order_key.len() > 0 {
393                                let order_by = separated(", ", mode.seq(&plan.order_key, None));
394                                writeln!(f, "{}Order By {order_by}", ctx.indent)?;
395                            }
396                            Ok(())
397                        })?;
398                    }
399                    TopKPlan::MonotonicTopK(plan) => {
400                        write!(f, "{}→", ctx.indent)?;
401                        if plan.must_consolidate {
402                            write!(f, "Consolidating ")?;
403                        }
404                        writeln!(f, "Monotonic TopK{annotations}")?;
405
406                        ctx.indented(|ctx| {
407                            if plan.group_key.len() > 0 {
408                                let group_by = CompactScalars(mode.seq(&plan.group_key, None));
409                                writeln!(f, "{}Group By {group_by}", ctx.indent)?;
410                            }
411                            if plan.order_key.len() > 0 {
412                                let order_by = separated(", ", mode.seq(&plan.order_key, None));
413                                writeln!(f, "{}Order By {order_by}", ctx.indent)?;
414                            }
415                            if let Some(limit) = &plan.limit {
416                                let limit = mode.expr(limit, None);
417                                writeln!(f, "{}Limit {limit}", ctx.indent)?;
418                            }
419                            Ok(())
420                        })?;
421                    }
422                    TopKPlan::Basic(plan) => {
423                        writeln!(f, "{}→Non-monotonic TopK{annotations}", ctx.indent)?;
424
425                        ctx.indented(|ctx| {
426                            if plan.group_key.len() > 0 {
427                                let group_by = CompactScalars(mode.seq(&plan.group_key, None));
428                                writeln!(f, "{}Group By {group_by}", ctx.indent)?;
429                            }
430                            if plan.order_key.len() > 0 {
431                                let order_by = separated(", ", mode.seq(&plan.order_key, None));
432                                writeln!(f, "{}Order By {order_by}", ctx.indent)?;
433                            }
434                            if let Some(limit) = &plan.limit {
435                                let limit = mode.expr(limit, None);
436                                writeln!(f, "{}Limit {limit}", ctx.indent)?;
437                            }
438                            if plan.offset != 0 {
439                                let offset = plan.offset;
440                                writeln!(f, "{}Offset {offset}", ctx.indent)?;
441                            }
442                            Ok(())
443                        })?;
444                    }
445                }
446
447                ctx.indented(|ctx| input.fmt_text(f, ctx))?;
448            }
449            Negate { input } => {
450                writeln!(f, "{}→Negate Diffs{annotations}", ctx.indent)?;
451
452                ctx.indented(|ctx| input.fmt_text(f, ctx))?;
453            }
454            Threshold {
455                input,
456                threshold_plan,
457            } => {
458                match threshold_plan {
459                    ThresholdPlan::Basic(plan) => {
460                        write!(f, "{}→Threshold Diffs ", ctx.indent)?;
461                        let ensure_arrangement = Arrangement::from(&plan.ensure_arrangement);
462                        ensure_arrangement.fmt_text(f, ctx)?;
463                        writeln!(f, "{annotations}")?;
464                    }
465                };
466
467                ctx.indented(|ctx| input.fmt_text(f, ctx))?;
468            }
469            Union {
470                inputs,
471                consolidate_output,
472            } => {
473                write!(f, "{}→", ctx.indent)?;
474                if *consolidate_output {
475                    write!(f, "Consolidating ")?;
476                }
477                writeln!(f, "Union{annotations}")?;
478
479                ctx.indented(|ctx| {
480                    for input in inputs.iter() {
481                        input.fmt_text(f, ctx)?;
482                    }
483                    Ok(())
484                })?;
485            }
486            ArrangeBy {
487                input_key: _,
488                input,
489                input_mfp,
490                forms,
491                strategy: _,
492            } => {
493                ctx.indent.set();
494                if forms.raw && forms.arranged.is_empty() {
495                    soft_assert_or_log!(forms.raw, "raw stream with no arrangements");
496                    writeln!(f, "{}→Unarranged Raw Stream{annotations}", ctx.indent)?;
497                } else {
498                    write!(f, "{}→Arrange", ctx.indent)?;
499
500                    if !forms.arranged.is_empty() {
501                        let mode = HumanizedExplain::new(ctx.config.redacted);
502                        for (key, _, _) in &forms.arranged {
503                            if !key.is_empty() {
504                                let key = mode.seq(key, None);
505                                let key = CompactScalars(key);
506                                write!(f, " ({key})")?;
507                            } else {
508                                write!(f, " (empty key)")?;
509                            }
510                        }
511                    }
512                    writeln!(f, "{annotations}")?;
513                }
514
515                if !input_mfp.is_identity() {
516                    ctx.indent += 1;
517                    writeln!(f, "{}→Fused with Parent Map/Filter/Project", ctx.indent)?;
518                    ctx.indented(|ctx| mode.expr(input_mfp, None).fmt_default_text(f, ctx))?;
519                }
520
521                ctx.indent += 1;
522                input.fmt_text(f, ctx)?;
523                ctx.indent.reset();
524            }
525        }
526
527        Ok(())
528    }
529
530    fn fmt_verbose_text(
531        &self,
532        f: &mut fmt::Formatter<'_>,
533        ctx: &mut PlanRenderingContext<'_, Plan>,
534    ) -> fmt::Result {
535        use PlanNode::*;
536
537        let mode = HumanizedExplain::new(ctx.config.redacted);
538        let annotations = PlanAnnotations::new(ctx.config.clone(), self);
539
540        match &self.node {
541            Constant { rows } => match rows {
542                Ok(rows) => {
543                    if !rows.is_empty() {
544                        writeln!(f, "{}Constant{}", ctx.indent, annotations)?;
545                        ctx.indented(|ctx| {
546                            fmt_text_constant_rows(
547                                f,
548                                rows.iter().map(|(data, _, diff)| (data, diff)),
549                                &mut ctx.indent,
550                                ctx.config.redacted,
551                            )
552                        })?;
553                    } else {
554                        writeln!(f, "{}Constant <empty>{}", ctx.indent, annotations)?;
555                    }
556                }
557                Err(err) => {
558                    if mode.redacted() {
559                        writeln!(f, "{}Error █{}", ctx.indent, annotations)?;
560                    } else {
561                        {
562                            writeln!(
563                                f,
564                                "{}Error {}{}",
565                                ctx.indent,
566                                err.to_string().quoted(),
567                                annotations
568                            )?;
569                        }
570                    }
571                }
572            },
573
574            Get { id, keys, plan } => {
575                ctx.indent.set(); // mark the current indent level
576
577                // Resolve the id as a string.
578                let id = match id {
579                    Id::Local(id) => id.to_string(),
580                    Id::Global(id) => ctx
581                        .humanizer
582                        .humanize_id(*id)
583                        .unwrap_or_else(|| id.to_string()),
584                };
585                // Render plan-specific fields.
586                use crate::plan::GetPlan;
587                match plan {
588                    GetPlan::PassArrangements => {
589                        writeln!(
590                            f,
591                            "{}Get::PassArrangements {}{}",
592                            ctx.indent, id, annotations
593                        )?;
594                        ctx.indent += 1;
595                    }
596                    GetPlan::Arrangement(key, val, mfp) => {
597                        writeln!(f, "{}Get::Arrangement {}{}", ctx.indent, id, annotations)?;
598                        ctx.indent += 1;
599                        mode.expr(mfp, None).fmt_text(f, ctx)?;
600                        {
601                            let key = mode.seq(key, None);
602                            let key = CompactScalars(key);
603                            writeln!(f, "{}key={}", ctx.indent, key)?;
604                        }
605                        if let Some(val) = val {
606                            let val = mode.expr(val, None);
607                            writeln!(f, "{}val={}", ctx.indent, val)?;
608                        }
609                    }
610                    GetPlan::Collection(mfp) => {
611                        writeln!(f, "{}Get::Collection {}{}", ctx.indent, id, annotations)?;
612                        ctx.indent += 1;
613                        mode.expr(mfp, None).fmt_text(f, ctx)?;
614                    }
615                }
616
617                // Render plan-agnostic fields (common for all plans for this variant).
618                keys.fmt_text(f, ctx)?;
619
620                ctx.indent.reset(); // reset the original indent level
621            }
622            Let { id, value, body } => {
623                let mut bindings = vec![(id, value.as_ref())];
624                let mut head = body.as_ref();
625
626                // Render Let-blocks nested in the body an outer Let-block in one step
627                // with a flattened list of bindings
628                while let Let { id, value, body } = &head.node {
629                    bindings.push((id, value.as_ref()));
630                    head = body.as_ref();
631                }
632
633                writeln!(f, "{}With", ctx.indent)?;
634                ctx.indented(|ctx| {
635                    for (id, value) in bindings.iter() {
636                        writeln!(f, "{}cte {} =", ctx.indent, *id)?;
637                        ctx.indented(|ctx| value.fmt_text(f, ctx))?;
638                    }
639                    Ok(())
640                })?;
641                writeln!(f, "{}Return{}", ctx.indent, annotations)?;
642                ctx.indented(|ctx| head.fmt_text(f, ctx))?;
643            }
644            LetRec {
645                ids,
646                values,
647                limits,
648                body,
649            } => {
650                let head = body.as_ref();
651
652                writeln!(f, "{}With Mutually Recursive", ctx.indent)?;
653                ctx.indented(|ctx| {
654                    let bindings = ids.iter().zip_eq(values).zip_eq(limits);
655                    for ((id, value), limit) in bindings {
656                        if let Some(limit) = limit {
657                            writeln!(f, "{}cte {} {} =", ctx.indent, limit, *id)?;
658                        } else {
659                            writeln!(f, "{}cte {} =", ctx.indent, *id)?;
660                        }
661                        ctx.indented(|ctx| value.fmt_text(f, ctx))?;
662                    }
663                    Ok(())
664                })?;
665                writeln!(f, "{}Return{}", ctx.indent, annotations)?;
666                ctx.indented(|ctx| head.fmt_text(f, ctx))?;
667            }
668            Mfp {
669                input,
670                mfp,
671                input_key_val,
672            } => {
673                writeln!(f, "{}Mfp{}", ctx.indent, annotations)?;
674                ctx.indented(|ctx| {
675                    mode.expr(mfp, None).fmt_text(f, ctx)?;
676                    if let Some((key, val)) = input_key_val {
677                        {
678                            let key = mode.seq(key, None);
679                            let key = CompactScalars(key);
680                            writeln!(f, "{}input_key={}", ctx.indent, key)?;
681                        }
682                        if let Some(val) = val {
683                            let val = mode.expr(val, None);
684                            writeln!(f, "{}input_val={}", ctx.indent, val)?;
685                        }
686                    }
687                    input.fmt_text(f, ctx)
688                })?;
689            }
690            FlatMap {
691                input_key,
692                input,
693                exprs,
694                func,
695                mfp_after,
696            } => {
697                let exprs = mode.seq(exprs, None);
698                let exprs = CompactScalars(exprs);
699                writeln!(
700                    f,
701                    "{}FlatMap {}({}){}",
702                    ctx.indent, func, exprs, annotations
703                )?;
704                ctx.indented(|ctx| {
705                    if let Some(key) = input_key {
706                        let key = mode.seq(key, None);
707                        let key = CompactScalars(key);
708                        writeln!(f, "{}input_key={}", ctx.indent, key)?;
709                    }
710                    if !mfp_after.is_identity() {
711                        writeln!(f, "{}mfp_after", ctx.indent)?;
712                        ctx.indented(|ctx| mode.expr(mfp_after, None).fmt_text(f, ctx))?;
713                    }
714                    input.fmt_text(f, ctx)
715                })?;
716            }
717            Join { inputs, plan } => {
718                use crate::plan::join::JoinPlan;
719                match plan {
720                    JoinPlan::Linear(plan) => {
721                        writeln!(f, "{}Join::Linear{}", ctx.indent, annotations)?;
722                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
723                    }
724                    JoinPlan::Delta(plan) => {
725                        writeln!(f, "{}Join::Delta{}", ctx.indent, annotations)?;
726                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
727                    }
728                }
729                ctx.indented(|ctx| {
730                    for input in inputs {
731                        input.fmt_text(f, ctx)?;
732                    }
733                    Ok(())
734                })?;
735            }
736            Reduce {
737                input_key,
738                input,
739                key_val_plan,
740                plan,
741                mfp_after,
742            } => {
743                use crate::plan::reduce::ReducePlan;
744                match plan {
745                    ReducePlan::Distinct => {
746                        writeln!(f, "{}Reduce::Distinct{}", ctx.indent, annotations)?;
747                    }
748                    ReducePlan::Accumulable(plan) => {
749                        writeln!(f, "{}Reduce::Accumulable{}", ctx.indent, annotations)?;
750                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
751                    }
752                    ReducePlan::Hierarchical(plan) => {
753                        writeln!(f, "{}Reduce::Hierarchical{}", ctx.indent, annotations)?;
754                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
755                    }
756                    ReducePlan::Basic(plan) => {
757                        writeln!(f, "{}Reduce::Basic{}", ctx.indent, annotations)?;
758                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
759                    }
760                }
761                ctx.indented(|ctx| {
762                    if let Some(key) = input_key {
763                        let key = mode.seq(key, None);
764                        let key = CompactScalars(key);
765                        writeln!(f, "{}input_key={}", ctx.indent, key)?;
766                    }
767                    if key_val_plan.key_plan.deref().is_identity() {
768                        writeln!(f, "{}key_plan=id", ctx.indent)?;
769                    } else {
770                        writeln!(f, "{}key_plan", ctx.indent)?;
771                        ctx.indented(|ctx| {
772                            let key_plan = mode.expr(key_val_plan.key_plan.deref(), None);
773                            key_plan.fmt_text(f, ctx)
774                        })?;
775                    }
776                    if key_val_plan.val_plan.deref().is_identity() {
777                        writeln!(f, "{}val_plan=id", ctx.indent)?;
778                    } else {
779                        writeln!(f, "{}val_plan", ctx.indent)?;
780                        ctx.indented(|ctx| {
781                            let val_plan = mode.expr(key_val_plan.val_plan.deref(), None);
782                            val_plan.fmt_text(f, ctx)
783                        })?;
784                    }
785                    if !mfp_after.is_identity() {
786                        writeln!(f, "{}mfp_after", ctx.indent)?;
787                        ctx.indented(|ctx| mode.expr(mfp_after, None).fmt_text(f, ctx))?;
788                    }
789
790                    input.fmt_text(f, ctx)
791                })?;
792            }
793            TopK { input, top_k_plan } => {
794                use crate::plan::top_k::TopKPlan;
795                match top_k_plan {
796                    TopKPlan::MonotonicTop1(plan) => {
797                        write!(f, "{}TopK::MonotonicTop1", ctx.indent)?;
798                        if plan.group_key.len() > 0 {
799                            let group_by = mode.seq(&plan.group_key, None);
800                            let group_by = CompactScalars(group_by);
801                            write!(f, " group_by=[{}]", group_by)?;
802                        }
803                        if plan.order_key.len() > 0 {
804                            let order_by = mode.seq(&plan.order_key, None);
805                            let order_by = separated(", ", order_by);
806                            write!(f, " order_by=[{}]", order_by)?;
807                        }
808                        if plan.must_consolidate {
809                            write!(f, " must_consolidate")?;
810                        }
811                    }
812                    TopKPlan::MonotonicTopK(plan) => {
813                        write!(f, "{}TopK::MonotonicTopK", ctx.indent)?;
814                        if plan.group_key.len() > 0 {
815                            let group_by = mode.seq(&plan.group_key, None);
816                            let group_by = CompactScalars(group_by);
817                            write!(f, " group_by=[{}]", group_by)?;
818                        }
819                        if plan.order_key.len() > 0 {
820                            let order_by = mode.seq(&plan.order_key, None);
821                            let order_by = separated(", ", order_by);
822                            write!(f, " order_by=[{}]", order_by)?;
823                        }
824                        if let Some(limit) = &plan.limit {
825                            let limit = mode.expr(limit, None);
826                            write!(f, " limit={}", limit)?;
827                        }
828                        if plan.must_consolidate {
829                            write!(f, " must_consolidate")?;
830                        }
831                    }
832                    TopKPlan::Basic(plan) => {
833                        write!(f, "{}TopK::Basic", ctx.indent)?;
834                        if plan.group_key.len() > 0 {
835                            let group_by = mode.seq(&plan.group_key, None);
836                            let group_by = CompactScalars(group_by);
837                            write!(f, " group_by=[{}]", group_by)?;
838                        }
839                        if plan.order_key.len() > 0 {
840                            let order_by = mode.seq(&plan.order_key, None);
841                            let order_by = separated(", ", order_by);
842                            write!(f, " order_by=[{}]", order_by)?;
843                        }
844                        if let Some(limit) = &plan.limit {
845                            let limit = mode.expr(limit, None);
846                            write!(f, " limit={}", limit)?;
847                        }
848                        if &plan.offset > &0 {
849                            write!(f, " offset={}", plan.offset)?;
850                        }
851                    }
852                }
853                writeln!(f, "{}", annotations)?;
854                ctx.indented(|ctx| input.fmt_text(f, ctx))?;
855            }
856            Negate { input } => {
857                writeln!(f, "{}Negate{}", ctx.indent, annotations)?;
858                ctx.indented(|ctx| input.fmt_text(f, ctx))?;
859            }
860            Threshold {
861                input,
862                threshold_plan,
863            } => {
864                use crate::plan::threshold::ThresholdPlan;
865                match threshold_plan {
866                    ThresholdPlan::Basic(plan) => {
867                        let ensure_arrangement = Arrangement::from(&plan.ensure_arrangement);
868                        write!(f, "{}Threshold::Basic", ctx.indent)?;
869                        write!(f, " ensure_arrangement=")?;
870                        ensure_arrangement.fmt_text(f, ctx)?;
871                        writeln!(f, "{}", annotations)?;
872                    }
873                };
874                ctx.indented(|ctx| input.fmt_text(f, ctx))?;
875            }
876            Union {
877                inputs,
878                consolidate_output,
879            } => {
880                if *consolidate_output {
881                    writeln!(
882                        f,
883                        "{}Union consolidate_output={}{}",
884                        ctx.indent, consolidate_output, annotations
885                    )?;
886                } else {
887                    writeln!(f, "{}Union{}", ctx.indent, annotations)?;
888                }
889                ctx.indented(|ctx| {
890                    for input in inputs.iter() {
891                        input.fmt_text(f, ctx)?;
892                    }
893                    Ok(())
894                })?;
895            }
896            ArrangeBy {
897                input_key,
898                input,
899                input_mfp,
900                forms,
901                strategy,
902            } => {
903                writeln!(f, "{}ArrangeBy{}", ctx.indent, annotations)?;
904                ctx.indented(|ctx| {
905                    if let Some(key) = input_key {
906                        let key = mode.seq(key, None);
907                        let key = CompactScalars(key);
908                        writeln!(f, "{}input_key=[{}]", ctx.indent, key)?;
909                    }
910                    if !matches!(strategy, ArrangementStrategy::Direct) {
911                        writeln!(f, "{}strategy={:?}", ctx.indent, strategy)?;
912                    }
913                    mode.expr(input_mfp, None).fmt_text(f, ctx)?;
914                    forms.fmt_text(f, ctx)?;
915                    // Render input
916                    input.fmt_text(f, ctx)
917                })?;
918            }
919        }
920
921        Ok(())
922    }
923}
924
925impl DisplayText<PlanRenderingContext<'_, Plan>> for AvailableCollections {
926    fn fmt_text(
927        &self,
928        f: &mut fmt::Formatter<'_>,
929        ctx: &mut PlanRenderingContext<'_, Plan>,
930    ) -> fmt::Result {
931        if ctx.config.verbose_syntax {
932            self.fmt_verbose_text(f, ctx)
933        } else {
934            self.fmt_default_text(f, ctx)
935        }
936    }
937}
938impl AvailableCollections {
939    fn fmt_default_text(
940        &self,
941        f: &mut fmt::Formatter<'_>,
942        ctx: &mut PlanRenderingContext<'_, Plan>,
943    ) -> fmt::Result {
944        let plural = if self.arranged.len() == 1 { "" } else { "s" };
945        write!(
946            f,
947            "{}Keys: {} arrangement{plural} available",
948            ctx.indent,
949            self.arranged.len()
950        )?;
951
952        if self.raw {
953            writeln!(f, ", plus raw stream")?;
954        } else {
955            writeln!(f, ", no raw stream")?;
956        }
957
958        ctx.indented(|ctx| {
959            for (i, arrangement) in self.arranged.iter().enumerate() {
960                let arrangement = Arrangement::from(arrangement);
961                write!(f, "{}Arrangement {i}: ", ctx.indent)?;
962                arrangement.fmt_text(f, ctx)?;
963                writeln!(f, "")?;
964            }
965            Ok(())
966        })?;
967
968        Ok(())
969    }
970
971    fn fmt_verbose_text(
972        &self,
973        f: &mut fmt::Formatter<'_>,
974        ctx: &mut PlanRenderingContext<'_, Plan>,
975    ) -> fmt::Result {
976        // raw field
977        let raw = &self.raw;
978        writeln!(f, "{}raw={}", ctx.indent, raw)?;
979        // arranged field
980        for (i, arrangement) in self.arranged.iter().enumerate() {
981            let arrangement = Arrangement::from(arrangement);
982            write!(f, "{}arrangements[{}]=", ctx.indent, i)?;
983            arrangement.fmt_text(f, ctx)?;
984            writeln!(f, "")?;
985        }
986        Ok(())
987    }
988}
989
990impl DisplayText<PlanRenderingContext<'_, Plan>> for LinearJoinPlan {
991    fn fmt_text(
992        &self,
993        f: &mut fmt::Formatter<'_>,
994        ctx: &mut PlanRenderingContext<'_, Plan>,
995    ) -> fmt::Result {
996        if ctx.config.verbose_syntax {
997            self.fmt_verbose_text(f, ctx)
998        } else {
999            self.fmt_default_text(f, ctx)
1000        }
1001    }
1002}
1003impl LinearJoinPlan {
1004    #[allow(clippy::needless_pass_by_ref_mut)]
1005    fn fmt_default_text(
1006        &self,
1007        f: &mut fmt::Formatter<'_>,
1008        ctx: &mut PlanRenderingContext<'_, Plan>,
1009    ) -> fmt::Result {
1010        for (i, plan) in self.stage_plans.iter().enumerate().rev() {
1011            let lookup_relation = &plan.lookup_relation;
1012            write!(f, "{}Join stage {i} in %{lookup_relation}", ctx.indent)?;
1013            if !plan.lookup_key.is_empty() {
1014                let lookup_key = CompactScalarSeq(&plan.lookup_key);
1015                writeln!(f, " with lookup key {lookup_key}",)?;
1016            } else {
1017                writeln!(f)?;
1018            }
1019            if plan.closure.maps_or_filters() {
1020                ctx.indented(|ctx| plan.closure.fmt_default_text(f, ctx))?;
1021            }
1022        }
1023        if let Some(final_closure) = &self.final_closure {
1024            if final_closure.maps_or_filters() {
1025                ctx.indented(|ctx| {
1026                    writeln!(f, "{}Final closure:", ctx.indent)?;
1027                    ctx.indented(|ctx| final_closure.fmt_default_text(f, ctx))
1028                })?;
1029            }
1030        }
1031        Ok(())
1032    }
1033
1034    fn fmt_verbose_text(
1035        &self,
1036        f: &mut fmt::Formatter<'_>,
1037        ctx: &mut PlanRenderingContext<'_, Plan>,
1038    ) -> fmt::Result {
1039        let mode = HumanizedExplain::new(ctx.config.redacted);
1040        let plan = self;
1041        if let Some(closure) = plan.final_closure.as_ref() {
1042            if !closure.is_identity() {
1043                writeln!(f, "{}final_closure", ctx.indent)?;
1044                ctx.indented(|ctx| closure.fmt_text(f, ctx))?;
1045            }
1046        }
1047        for (i, plan) in plan.stage_plans.iter().enumerate() {
1048            writeln!(f, "{}linear_stage[{}]", ctx.indent, i)?;
1049            ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1050        }
1051        if let Some(closure) = plan.initial_closure.as_ref() {
1052            if !closure.is_identity() {
1053                writeln!(f, "{}initial_closure", ctx.indent)?;
1054                ctx.indented(|ctx| closure.fmt_text(f, ctx))?;
1055            }
1056        }
1057        match &plan.source_key {
1058            Some(source_key) => {
1059                let source_key = mode.seq(source_key, None);
1060                let source_key = CompactScalars(source_key);
1061                writeln!(
1062                    f,
1063                    "{}source={{ relation={}, key=[{}] }}",
1064                    ctx.indent, &plan.source_relation, source_key
1065                )?
1066            }
1067            None => writeln!(
1068                f,
1069                "{}source={{ relation={}, key=[] }}",
1070                ctx.indent, &plan.source_relation
1071            )?,
1072        };
1073        Ok(())
1074    }
1075}
1076
1077impl DisplayText<PlanRenderingContext<'_, Plan>> for LinearStagePlan {
1078    fn fmt_text(
1079        &self,
1080        f: &mut fmt::Formatter<'_>,
1081        ctx: &mut PlanRenderingContext<'_, Plan>,
1082    ) -> fmt::Result {
1083        if ctx.config.verbose_syntax {
1084            self.fmt_verbose_text(f, ctx)
1085        } else {
1086            self.fmt_default_text(f, ctx)
1087        }
1088    }
1089}
1090impl LinearStagePlan {
1091    #[allow(clippy::needless_pass_by_ref_mut)]
1092    fn fmt_default_text(
1093        &self,
1094        f: &mut fmt::Formatter<'_>,
1095        ctx: &mut PlanRenderingContext<'_, Plan>,
1096    ) -> fmt::Result {
1097        // NB this code path should not be live, as fmt_default_text for
1098        // `LinearJoinPlan` prints out each stage already
1099        let lookup_relation = &self.lookup_relation;
1100        if !self.lookup_key.is_empty() {
1101            let lookup_key = CompactScalarSeq(&self.lookup_key);
1102            writeln!(
1103                f,
1104                "{}Lookup key {lookup_key} in %{lookup_relation}",
1105                ctx.indent
1106            )
1107        } else {
1108            writeln!(f, "{}Lookup in %{lookup_relation}", ctx.indent)
1109        }
1110    }
1111
1112    fn fmt_verbose_text(
1113        &self,
1114        f: &mut fmt::Formatter<'_>,
1115        ctx: &mut PlanRenderingContext<'_, Plan>,
1116    ) -> fmt::Result {
1117        let mode = HumanizedExplain::new(ctx.config.redacted);
1118
1119        let plan = self;
1120        if !plan.closure.is_identity() {
1121            writeln!(f, "{}closure", ctx.indent)?;
1122            ctx.indented(|ctx| plan.closure.fmt_text(f, ctx))?;
1123        }
1124        {
1125            let lookup_relation = &plan.lookup_relation;
1126            let lookup_key = CompactScalarSeq(&plan.lookup_key);
1127            writeln!(
1128                f,
1129                "{}lookup={{ relation={}, key=[{}] }}",
1130                ctx.indent, lookup_relation, lookup_key
1131            )?;
1132        }
1133        {
1134            let stream_key = mode.seq(&plan.stream_key, None);
1135            let stream_key = CompactScalars(stream_key);
1136            let stream_thinning = Indices(&plan.stream_thinning);
1137            writeln!(
1138                f,
1139                "{}stream={{ key=[{}], thinning=({}) }}",
1140                ctx.indent, stream_key, stream_thinning
1141            )?;
1142        }
1143        Ok(())
1144    }
1145}
1146
1147impl DisplayText<PlanRenderingContext<'_, Plan>> for DeltaJoinPlan {
1148    fn fmt_text(
1149        &self,
1150        f: &mut fmt::Formatter<'_>,
1151        ctx: &mut PlanRenderingContext<'_, Plan>,
1152    ) -> fmt::Result {
1153        if ctx.config.verbose_syntax {
1154            self.fmt_verbose_text(f, ctx)
1155        } else {
1156            self.fmt_default_text(f, ctx)
1157        }
1158    }
1159}
1160impl DeltaJoinPlan {
1161    fn fmt_default_text(
1162        &self,
1163        f: &mut fmt::Formatter<'_>,
1164        ctx: &mut PlanRenderingContext<'_, Plan>,
1165    ) -> fmt::Result {
1166        for (i, plan) in self.path_plans.iter().enumerate() {
1167            writeln!(f, "{}Delta join path for input %{i}", ctx.indent)?;
1168            ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1169        }
1170        Ok(())
1171    }
1172
1173    fn fmt_verbose_text(
1174        &self,
1175        f: &mut fmt::Formatter<'_>,
1176        ctx: &mut PlanRenderingContext<'_, Plan>,
1177    ) -> fmt::Result {
1178        for (i, plan) in self.path_plans.iter().enumerate() {
1179            writeln!(f, "{}plan_path[{}]", ctx.indent, i)?;
1180            ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1181        }
1182        Ok(())
1183    }
1184}
1185
1186impl DisplayText<PlanRenderingContext<'_, Plan>> for DeltaPathPlan {
1187    fn fmt_text(
1188        &self,
1189        f: &mut fmt::Formatter<'_>,
1190        ctx: &mut PlanRenderingContext<'_, Plan>,
1191    ) -> fmt::Result {
1192        if ctx.config.verbose_syntax {
1193            self.fmt_verbose_text(f, ctx)
1194        } else {
1195            self.fmt_default_text(f, ctx)
1196        }
1197    }
1198}
1199
1200impl DeltaPathPlan {
1201    #[allow(clippy::needless_pass_by_ref_mut)]
1202    fn fmt_default_text(
1203        &self,
1204        f: &mut fmt::Formatter<'_>,
1205        ctx: &mut PlanRenderingContext<'_, Plan>,
1206    ) -> fmt::Result {
1207        for (
1208            i,
1209            DeltaStagePlan {
1210                lookup_key,
1211                lookup_relation,
1212                closure,
1213                ..
1214            },
1215        ) in self.stage_plans.iter().enumerate()
1216        {
1217            if !lookup_key.is_empty() {
1218                let lookup_key = CompactScalarSeq(lookup_key);
1219                writeln!(
1220                    f,
1221                    "{}stage {i} for %{lookup_relation}: lookup key {lookup_key}",
1222                    ctx.indent
1223                )?;
1224            } else {
1225                writeln!(f, "{}stage %{i} for  %{lookup_relation}", ctx.indent)?;
1226            }
1227            if closure.maps_or_filters() {
1228                ctx.indented(|ctx| closure.fmt_default_text(f, ctx))?;
1229            }
1230        }
1231        if let Some(final_closure) = &self.final_closure {
1232            if final_closure.maps_or_filters() {
1233                ctx.indented(|ctx| {
1234                    writeln!(f, "{}Final closure:", ctx.indent)?;
1235                    ctx.indented(|ctx| final_closure.fmt_default_text(f, ctx))
1236                })?;
1237            }
1238        }
1239        Ok(())
1240    }
1241
1242    fn fmt_verbose_text(
1243        &self,
1244        f: &mut fmt::Formatter<'_>,
1245        ctx: &mut PlanRenderingContext<'_, Plan>,
1246    ) -> fmt::Result {
1247        let mode = HumanizedExplain::new(ctx.config.redacted);
1248        let plan = self;
1249        if let Some(closure) = plan.final_closure.as_ref() {
1250            if !closure.is_identity() {
1251                writeln!(f, "{}final_closure", ctx.indent)?;
1252                ctx.indented(|ctx| closure.fmt_text(f, ctx))?;
1253            }
1254        }
1255        for (i, plan) in plan.stage_plans.iter().enumerate().rev() {
1256            writeln!(f, "{}delta_stage[{}]", ctx.indent, i)?;
1257            ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1258        }
1259        if !plan.initial_closure.is_identity() {
1260            writeln!(f, "{}initial_closure", ctx.indent)?;
1261            ctx.indented(|ctx| plan.initial_closure.fmt_text(f, ctx))?;
1262        }
1263        {
1264            let source_relation = &plan.source_relation;
1265            let source_key = mode.seq(&plan.source_key, None);
1266            let source_key = CompactScalars(source_key);
1267            writeln!(
1268                f,
1269                "{}source={{ relation={}, key=[{}] }}",
1270                ctx.indent, source_relation, source_key
1271            )?;
1272        }
1273        Ok(())
1274    }
1275}
1276
1277impl DisplayText<PlanRenderingContext<'_, Plan>> for DeltaStagePlan {
1278    fn fmt_text(
1279        &self,
1280        f: &mut fmt::Formatter<'_>,
1281        ctx: &mut PlanRenderingContext<'_, Plan>,
1282    ) -> fmt::Result {
1283        if ctx.config.verbose_syntax {
1284            self.fmt_verbose_text(f, ctx)
1285        } else {
1286            self.fmt_default_text(f, ctx)
1287        }
1288    }
1289}
1290impl DeltaStagePlan {
1291    #[allow(clippy::needless_pass_by_ref_mut)]
1292    fn fmt_default_text(
1293        &self,
1294        f: &mut fmt::Formatter<'_>,
1295        ctx: &mut PlanRenderingContext<'_, Plan>,
1296    ) -> fmt::Result {
1297        // NB this code path should not be live, as fmt_default_text for
1298        // `DeltaPathPlan` prints out each stage already
1299        let lookup_relation = &self.lookup_relation;
1300        let lookup_key = CompactScalarSeq(&self.lookup_key);
1301        writeln!(
1302            f,
1303            "{}Lookup key {lookup_key} in %{lookup_relation}",
1304            ctx.indent
1305        )
1306    }
1307
1308    fn fmt_verbose_text(
1309        &self,
1310        f: &mut fmt::Formatter<'_>,
1311        ctx: &mut PlanRenderingContext<'_, Plan>,
1312    ) -> fmt::Result {
1313        let mode = HumanizedExplain::new(ctx.config.redacted);
1314        let plan = self;
1315        if !plan.closure.is_identity() {
1316            writeln!(f, "{}closure", ctx.indent)?;
1317            ctx.indented(|ctx| plan.closure.fmt_text(f, ctx))?;
1318        }
1319        {
1320            let lookup_relation = &plan.lookup_relation;
1321            let lookup_key = mode.seq(&plan.lookup_key, None);
1322            let lookup_key = CompactScalars(lookup_key);
1323            writeln!(
1324                f,
1325                "{}lookup={{ relation={}, key=[{}] }}",
1326                ctx.indent, lookup_relation, lookup_key
1327            )?;
1328        }
1329        {
1330            let stream_key = mode.seq(&plan.stream_key, None);
1331            let stream_key = CompactScalars(stream_key);
1332            let stream_thinning = mode.seq(&plan.stream_thinning, None);
1333            let stream_thinning = CompactScalars(stream_thinning);
1334            writeln!(
1335                f,
1336                "{}stream={{ key=[{}], thinning=({}) }}",
1337                ctx.indent, stream_key, stream_thinning
1338            )?;
1339        }
1340        Ok(())
1341    }
1342}
1343
1344impl DisplayText<PlanRenderingContext<'_, Plan>> for JoinClosure {
1345    fn fmt_text(
1346        &self,
1347        f: &mut fmt::Formatter<'_>,
1348        ctx: &mut PlanRenderingContext<'_, Plan>,
1349    ) -> fmt::Result {
1350        if ctx.config.verbose_syntax {
1351            self.fmt_verbose_text(f, ctx)
1352        } else {
1353            self.fmt_default_text(f, ctx)
1354        }
1355    }
1356}
1357impl JoinClosure {
1358    fn fmt_default_text(
1359        &self,
1360        f: &mut fmt::Formatter<'_>,
1361        ctx: &mut PlanRenderingContext<'_, Plan>,
1362    ) -> fmt::Result {
1363        let mode = HumanizedExplain::new(ctx.config.redacted);
1364        if !self.before.expressions.is_empty() || !self.before.predicates.is_empty() {
1365            mode.expr(self.before.deref(), None).fmt_text(f, ctx)?;
1366        }
1367        if !self.ready_equivalences.is_empty() {
1368            let equivalences = separated(
1369                " AND ",
1370                self.ready_equivalences
1371                    .iter()
1372                    .map(|equivalence| separated(" = ", mode.seq(equivalence, None))),
1373            );
1374            writeln!(f, "{}Equivalences: {equivalences}", ctx.indent)?;
1375        }
1376        Ok(())
1377    }
1378
1379    fn fmt_verbose_text(
1380        &self,
1381        f: &mut fmt::Formatter<'_>,
1382        ctx: &mut PlanRenderingContext<'_, Plan>,
1383    ) -> fmt::Result {
1384        let mode = HumanizedExplain::new(ctx.config.redacted);
1385        mode.expr(self.before.deref(), None).fmt_text(f, ctx)?;
1386        if !self.ready_equivalences.is_empty() {
1387            let equivalences = separated(
1388                " AND ",
1389                self.ready_equivalences
1390                    .iter()
1391                    .map(|equivalence| separated(" = ", mode.seq(equivalence, None))),
1392            );
1393            writeln!(f, "{}ready_equivalences={}", ctx.indent, equivalences)?;
1394        }
1395        Ok(())
1396    }
1397}
1398
1399impl DisplayText<PlanRenderingContext<'_, Plan>> for AccumulablePlan {
1400    fn fmt_text(
1401        &self,
1402        f: &mut fmt::Formatter<'_>,
1403        ctx: &mut PlanRenderingContext<'_, Plan>,
1404    ) -> fmt::Result {
1405        if ctx.config.verbose_syntax {
1406            self.fmt_verbose_text(f, ctx)
1407        } else {
1408            self.fmt_default_text(f, ctx)
1409        }
1410    }
1411}
1412impl AccumulablePlan {
1413    #[allow(clippy::needless_pass_by_ref_mut)]
1414    fn fmt_default_text(
1415        &self,
1416        f: &mut fmt::Formatter<'_>,
1417        ctx: &mut PlanRenderingContext<'_, Plan>,
1418    ) -> fmt::Result {
1419        let mode = HumanizedExplain::new(ctx.config.redacted);
1420
1421        if !self.simple_aggrs.is_empty() {
1422            let simple_aggrs = self
1423                .simple_aggrs
1424                .iter()
1425                .map(|(_i_datum, agg)| mode.expr(agg, None));
1426            let simple_aggrs = separated(", ", simple_aggrs);
1427            writeln!(f, "{}Simple aggregates: {simple_aggrs}", ctx.indent)?;
1428        }
1429
1430        if !self.distinct_aggrs.is_empty() {
1431            let distinct_aggrs = self
1432                .distinct_aggrs
1433                .iter()
1434                .map(|(_i_datum, agg)| mode.expr(agg, None));
1435            let distinct_aggrs = separated(", ", distinct_aggrs);
1436            writeln!(f, "{}Distinct aggregates: {distinct_aggrs}", ctx.indent)?;
1437        }
1438        Ok(())
1439    }
1440
1441    #[allow(clippy::needless_pass_by_ref_mut)]
1442    fn fmt_verbose_text(
1443        &self,
1444        f: &mut fmt::Formatter<'_>,
1445        ctx: &mut PlanRenderingContext<'_, Plan>,
1446    ) -> fmt::Result {
1447        let mode = HumanizedExplain::new(ctx.config.redacted);
1448        // full_aggrs (skipped because they are repeated in simple_aggrs ∪ distinct_aggrs)
1449        // for (i, aggr) in self.full_aggrs.iter().enumerate() {
1450        //     write!(f, "{}full_aggrs[{}]=", ctx.indent, i)?;
1451        //     aggr.fmt_text(f, &mut ())?;
1452        //     writeln!(f)?;
1453        // }
1454        // simple_aggrs
1455        for (i, (i_datum, agg)) in self.simple_aggrs.iter().enumerate() {
1456            let agg = mode.expr(agg, None);
1457            write!(f, "{}simple_aggrs[{}]=", ctx.indent, i)?;
1458            writeln!(f, "({}, {})", i_datum, agg)?;
1459        }
1460        // distinct_aggrs
1461        for (i, (i_datum, agg)) in self.distinct_aggrs.iter().enumerate() {
1462            let agg = mode.expr(agg, None);
1463            write!(f, "{}distinct_aggrs[{}]=", ctx.indent, i)?;
1464            writeln!(f, "({}, {})", i_datum, agg)?;
1465        }
1466        Ok(())
1467    }
1468}
1469
1470impl DisplayText<PlanRenderingContext<'_, Plan>> for HierarchicalPlan {
1471    fn fmt_text(
1472        &self,
1473        f: &mut fmt::Formatter<'_>,
1474        ctx: &mut PlanRenderingContext<'_, Plan>,
1475    ) -> fmt::Result {
1476        if ctx.config.verbose_syntax {
1477            self.fmt_verbose_text(f, ctx)
1478        } else {
1479            self.fmt_default_text(f, ctx)
1480        }
1481    }
1482}
1483impl HierarchicalPlan {
1484    #[allow(clippy::needless_pass_by_ref_mut)]
1485    fn fmt_default_text(
1486        &self,
1487        f: &mut fmt::Formatter<'_>,
1488        ctx: &mut PlanRenderingContext<'_, Plan>,
1489    ) -> fmt::Result {
1490        let mode = HumanizedExplain::new(ctx.config.redacted);
1491        let aggr_funcs = mode.seq(self.aggr_funcs(), None);
1492        let aggr_funcs = separated(", ", aggr_funcs);
1493        writeln!(f, "{}Aggregations: {aggr_funcs}", ctx.indent)
1494    }
1495
1496    #[allow(clippy::needless_pass_by_ref_mut)]
1497    fn fmt_verbose_text(
1498        &self,
1499        f: &mut fmt::Formatter<'_>,
1500        ctx: &mut PlanRenderingContext<'_, Plan>,
1501    ) -> fmt::Result {
1502        let mode = HumanizedExplain::new(ctx.config.redacted);
1503        match self {
1504            HierarchicalPlan::Monotonic(plan) => {
1505                let aggr_funcs = mode.seq(&plan.aggr_funcs, None);
1506                let aggr_funcs = separated(", ", aggr_funcs);
1507                writeln!(f, "{}aggr_funcs=[{}]", ctx.indent, aggr_funcs)?;
1508                writeln!(f, "{}monotonic", ctx.indent)?;
1509                if plan.must_consolidate {
1510                    writeln!(f, "{}must_consolidate", ctx.indent)?;
1511                }
1512            }
1513            HierarchicalPlan::Bucketed(plan) => {
1514                let aggr_funcs = mode.seq(&plan.aggr_funcs, None);
1515                let aggr_funcs = separated(", ", aggr_funcs);
1516                writeln!(f, "{}aggr_funcs=[{}]", ctx.indent, aggr_funcs)?;
1517                let buckets = separated(", ", &plan.buckets);
1518                writeln!(f, "{}buckets=[{}]", ctx.indent, buckets)?;
1519            }
1520        }
1521        Ok(())
1522    }
1523}
1524
1525impl DisplayText<PlanRenderingContext<'_, Plan>> for BasicPlan {
1526    fn fmt_text(
1527        &self,
1528        f: &mut fmt::Formatter<'_>,
1529        ctx: &mut PlanRenderingContext<'_, Plan>,
1530    ) -> fmt::Result {
1531        if ctx.config.verbose_syntax {
1532            self.fmt_verbose_text(f, ctx)
1533        } else {
1534            self.fmt_default_text(f, ctx)
1535        }
1536    }
1537}
1538impl BasicPlan {
1539    #[allow(clippy::needless_pass_by_ref_mut)]
1540    fn fmt_default_text(
1541        &self,
1542        f: &mut fmt::Formatter<'_>,
1543        ctx: &mut PlanRenderingContext<'_, Plan>,
1544    ) -> fmt::Result {
1545        let mode = HumanizedExplain::new(ctx.config.redacted);
1546        match self {
1547            BasicPlan::Single(SingleBasicPlan {
1548                expr,
1549                fused_unnest_list: _,
1550            }) => {
1551                let agg = mode.expr(expr, None);
1552                writeln!(f, "{}Aggregation: {agg}", ctx.indent)?;
1553            }
1554            BasicPlan::Multiple(aggs) => {
1555                let mode = HumanizedExplain::new(ctx.config.redacted);
1556                write!(f, "{}Aggregations:", ctx.indent)?;
1557
1558                for agg in aggs.iter() {
1559                    let agg = mode.expr(agg, None);
1560                    write!(f, " {agg}")?;
1561                }
1562                writeln!(f)?;
1563            }
1564        }
1565        Ok(())
1566    }
1567
1568    #[allow(clippy::needless_pass_by_ref_mut)]
1569    fn fmt_verbose_text(
1570        &self,
1571        f: &mut fmt::Formatter<'_>,
1572        ctx: &mut PlanRenderingContext<'_, Plan>,
1573    ) -> fmt::Result {
1574        let mode = HumanizedExplain::new(ctx.config.redacted);
1575        match self {
1576            BasicPlan::Single(SingleBasicPlan {
1577                expr,
1578                fused_unnest_list,
1579            }) => {
1580                let agg = mode.expr(expr, None);
1581                let fused_unnest_list = if *fused_unnest_list {
1582                    ", fused_unnest_list=true"
1583                } else {
1584                    ""
1585                };
1586                writeln!(f, "{}aggr=({}{})", ctx.indent, agg, fused_unnest_list)?;
1587            }
1588            BasicPlan::Multiple(aggs) => {
1589                for (i, agg) in aggs.iter().enumerate() {
1590                    let agg = mode.expr(agg, None);
1591                    writeln!(f, "{}aggrs[{}]={}", ctx.indent, i, agg)?;
1592                }
1593            }
1594        }
1595        Ok(())
1596    }
1597}
1598
1599/// Helper struct for rendering an arrangement.
1600struct Arrangement<'a> {
1601    key: &'a Vec<MirScalarExpr>,
1602    permutation: Permutation<'a>,
1603    thinning: &'a Vec<usize>,
1604}
1605
1606impl<'a> From<&'a (Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)> for Arrangement<'a> {
1607    fn from(
1608        (key, permutation, thinning): &'a (Vec<MirScalarExpr>, Vec<usize>, Vec<usize>),
1609    ) -> Self {
1610        Arrangement {
1611            key,
1612            permutation: Permutation(permutation),
1613            thinning,
1614        }
1615    }
1616}
1617
1618impl<'a> DisplayText<PlanRenderingContext<'_, Plan>> for Arrangement<'a> {
1619    fn fmt_text(
1620        &self,
1621        f: &mut fmt::Formatter<'_>,
1622        ctx: &mut PlanRenderingContext<'_, Plan>,
1623    ) -> fmt::Result {
1624        if ctx.config.verbose_syntax {
1625            self.fmt_verbose_text(f, ctx)
1626        } else {
1627            self.fmt_default_text(f, ctx)
1628        }
1629    }
1630}
1631
1632impl<'a> Arrangement<'a> {
1633    #[allow(clippy::needless_pass_by_ref_mut)]
1634    fn fmt_default_text(
1635        &self,
1636        f: &mut fmt::Formatter<'_>,
1637        ctx: &PlanRenderingContext<'_, Plan>,
1638    ) -> fmt::Result {
1639        let mode = HumanizedExplain::new(ctx.config.redacted);
1640        if !self.key.is_empty() {
1641            let key = mode.seq(self.key, None);
1642            let key = CompactScalars(key);
1643            write!(f, "{key}")
1644        } else {
1645            write!(f, "(empty key)")
1646        }
1647    }
1648
1649    #[allow(clippy::needless_pass_by_ref_mut)]
1650    fn fmt_verbose_text(
1651        &self,
1652        f: &mut fmt::Formatter<'_>,
1653        ctx: &PlanRenderingContext<'_, Plan>,
1654    ) -> fmt::Result {
1655        let mode = HumanizedExplain::new(ctx.config.redacted);
1656        // prepare key
1657        let key = mode.seq(self.key, None);
1658        let key = CompactScalars(key);
1659        // prepare perumation map
1660        let permutation = &self.permutation;
1661        // prepare thinning
1662        let thinning = Indices(self.thinning);
1663        // write the arrangement spec
1664        write!(
1665            f,
1666            "{{ key=[{}], permutation={}, thinning=({}) }}",
1667            key, permutation, thinning
1668        )
1669    }
1670}
1671
1672/// Helper struct for rendering a permutation.
1673struct Permutation<'a>(&'a Vec<usize>);
1674
1675impl<'a> fmt::Display for Permutation<'a> {
1676    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1677        let mut pairs = vec![];
1678        for (x, y) in self.0.iter().enumerate().filter(|(x, y)| x != *y) {
1679            pairs.push(format!("#{}: #{}", x, y));
1680        }
1681
1682        if pairs.len() > 0 {
1683            write!(f, "{{{}}}", separated(", ", pairs))
1684        } else {
1685            write!(f, "id")
1686        }
1687    }
1688}
1689
1690/// Annotations for physical plans.
1691struct PlanAnnotations {
1692    config: ExplainConfig,
1693    node_id: LirId,
1694}
1695
1696// The current implementation deviates from the `AnnotatedPlan` used in `Mir~`-based plans. This is
1697// fine, since at the moment the only attribute we are going to explain is the `node_id`, which at
1698// the moment is kept inline with the `Plan` variants. If at some point in the future we want to
1699// start deriving and printing attributes that are derived ad-hoc, however, we might want to adopt
1700// `AnnotatedPlan` here as well.
1701impl PlanAnnotations {
1702    fn new(config: ExplainConfig, plan: &Plan) -> Self {
1703        let node_id = plan.lir_id;
1704        Self { config, node_id }
1705    }
1706}
1707
1708impl fmt::Display for PlanAnnotations {
1709    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1710        if self.config.node_ids {
1711            f.debug_struct(" //")
1712                .field("node_id", &self.node_id)
1713                .finish()
1714        } else {
1715            // No physical plan annotations enabled.
1716            Ok(())
1717        }
1718    }
1719}