mz_compute_types/explain/
text.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! `EXPLAIN ... AS TEXT` support for LIR structures.
11//!
12//! The format adheres to the following conventions:
13//! 1. In general, every line that starts with an uppercase character
14//!    corresponds to a [`Plan`] variant.
15//! 2. Whenever the variant has an attached `~Plan`, the printed name is
16//!    `$V::$P` where `$V` identifies the variant and `$P` the plan.
17//! 3. The fields of a `~Plan` struct attached to a [`Plan`] are rendered as if
18//!    they were part of the variant themself.
19//! 4. Non-recursive parameters of each sub-plan are written as `$key=$val`
20//!    pairs on the same line or as lowercase `$key` fields on indented lines.
21//! 5. A single non-recursive parameter can be written just as `$val`.
22
23use std::fmt;
24use std::ops::Deref;
25
26use itertools::{Itertools, izip};
27use mz_expr::explain::{HumanizedExplain, HumanizerMode, fmt_text_constant_rows};
28use mz_expr::{Id, MirScalarExpr};
29use mz_ore::soft_assert_or_log;
30use mz_ore::str::{IndentLike, StrExt, separated};
31use mz_repr::explain::text::DisplayText;
32use mz_repr::explain::{
33    CompactScalarSeq, CompactScalars, ExplainConfig, Indices, PlanRenderingContext,
34};
35
36use crate::plan::join::delta_join::{DeltaPathPlan, DeltaStagePlan};
37use crate::plan::join::linear_join::LinearStagePlan;
38use crate::plan::join::{DeltaJoinPlan, JoinClosure, LinearJoinPlan};
39use crate::plan::reduce::{
40    AccumulablePlan, BasicPlan, BucketedPlan, CollationPlan, HierarchicalPlan, MonotonicPlan,
41    SingleBasicPlan,
42};
43use crate::plan::threshold::ThresholdPlan;
44use crate::plan::{AvailableCollections, LirId, Plan, PlanNode};
45
46impl DisplayText<PlanRenderingContext<'_, Plan>> for Plan {
47    fn fmt_text(
48        &self,
49        f: &mut fmt::Formatter<'_>,
50        ctx: &mut PlanRenderingContext<'_, Plan>,
51    ) -> fmt::Result {
52        if ctx.config.verbose_syntax {
53            self.fmt_verbose_text(f, ctx)
54        } else {
55            self.fmt_default_text(f, ctx)
56        }
57    }
58}
59
60impl Plan {
61    // NOTE: This code needs to be kept in sync with the `Display` instance for
62    // `RenderPlan:ExprHumanizer`.
63    //
64    // This code determines what you see in `EXPLAIN`; that other code
65    // determine what you see when you run `mz_lir_mapping`.
66    fn fmt_default_text(
67        &self,
68        f: &mut fmt::Formatter<'_>,
69        ctx: &mut PlanRenderingContext<'_, Plan>,
70    ) -> fmt::Result {
71        use PlanNode::*;
72
73        let mode = HumanizedExplain::new(ctx.config.redacted);
74        let annotations = PlanAnnotations::new(ctx.config.clone(), self);
75
76        match &self.node {
77            Constant { rows } => {
78                write!(f, "{}→Constant ", ctx.indent)?;
79
80                match rows {
81                    Ok(rows) => write!(
82                        f,
83                        "({} row{})",
84                        rows.len(),
85                        if rows.len() == 1 { "" } else { "s" }
86                    )?,
87                    Err(err) => {
88                        if mode.redacted() {
89                            write!(f, "(error: █)")?;
90                        } else {
91                            write!(f, "(error: {})", err.to_string().quoted(),)?;
92                        }
93                    }
94                }
95
96                writeln!(f, "{annotations}")?;
97            }
98            Get { id, keys, plan } => {
99                ctx.indent.set(); // mark the current indent level
100
101                // Resolve the id as a string.
102                let id = match id {
103                    Id::Local(id) => id.to_string(),
104                    Id::Global(id) => ctx
105                        .humanizer
106                        .humanize_id(*id)
107                        .unwrap_or_else(|| id.to_string()),
108                };
109                // Render plan-specific fields.
110                use crate::plan::GetPlan;
111                match plan {
112                    GetPlan::PassArrangements => {
113                        if keys.raw && keys.arranged.is_empty() {
114                            writeln!(f, "{}→Stream {id}{annotations}", ctx.indent)?;
115                        } else {
116                            // we're not reporting on whether or not `raw` is set
117                            // we're not reporting on how many arrangements there are
118                            writeln!(f, "{}→Arranged {id}{annotations}", ctx.indent)?;
119                        }
120                    }
121                    GetPlan::Arrangement(key, Some(val), mfp) => {
122                        if !mfp.is_identity() {
123                            writeln!(f, "{}→Fused Map/Filter/Project", ctx.indent)?;
124                            ctx.indent += 1;
125                            mode.expr(mfp, None).fmt_default_text(f, ctx)?;
126                            ctx.indent += 1;
127                        }
128
129                        writeln!(f, "{}→Index Lookup on {id}{annotations}", ctx.indent)?;
130                        ctx.indent += 1;
131                        let key = CompactScalars(mode.seq(key, None));
132                        write!(f, "{}Key: ({key}) ", ctx.indent)?;
133                        let val = mode.expr(val, None);
134                        writeln!(f, "Value: {val}")?;
135                    }
136                    GetPlan::Arrangement(key, None, mfp) => {
137                        if !mfp.is_identity() {
138                            writeln!(f, "{}→Fused Map/Filter/Project", ctx.indent)?;
139                            ctx.indent += 1;
140                            mode.expr(mfp, None).fmt_default_text(f, ctx)?;
141                            ctx.indent += 1;
142                        }
143
144                        writeln!(f, "{}→Arranged {id}{annotations}", ctx.indent)?;
145                        ctx.indent += 1;
146                        let key = CompactScalars(mode.seq(key, None));
147                        writeln!(f, "{}Key: ({key})", ctx.indent)?;
148                    }
149                    GetPlan::Collection(mfp) => {
150                        if !mfp.is_identity() {
151                            writeln!(f, "{}→Fused Map/Filter/Project", ctx.indent)?;
152                            ctx.indent += 1;
153                            mode.expr(mfp, None).fmt_default_text(f, ctx)?;
154                            ctx.indent += 1;
155                        }
156
157                        writeln!(f, "{}→Read {id}{annotations}", ctx.indent)?;
158                    }
159                }
160                ctx.indent.reset(); // reset the original indent level
161            }
162            Let { id, value, body } => {
163                let mut bindings = vec![(id, value.as_ref())];
164                let mut head = body.as_ref();
165
166                // Render Let-blocks nested in the body an outer Let-block in one step
167                // with a flattened list of bindings
168                while let Let { id, value, body } = &head.node {
169                    bindings.push((id, value.as_ref()));
170                    head = body.as_ref();
171                }
172
173                writeln!(f, "{}→With", ctx.indent)?;
174                ctx.indented(|ctx| {
175                    for (id, value) in bindings.iter() {
176                        writeln!(f, "{}cte {} =", ctx.indent, *id)?;
177                        ctx.indented(|ctx| value.fmt_text(f, ctx))?;
178                    }
179                    Ok(())
180                })?;
181                writeln!(f, "{}→Return{annotations}", ctx.indent)?;
182                ctx.indented(|ctx| head.fmt_text(f, ctx))?;
183            }
184            LetRec {
185                ids,
186                values,
187                limits,
188                body,
189            } => {
190                let bindings = izip!(ids.iter(), values, limits).collect_vec();
191                let head = body.as_ref();
192
193                writeln!(f, "{}→With Mutually Recursive", ctx.indent)?;
194                ctx.indented(|ctx| {
195                    for (id, value, limit) in bindings.iter() {
196                        if let Some(limit) = limit {
197                            writeln!(f, "{}cte {} {} =", ctx.indent, limit, *id)?;
198                        } else {
199                            writeln!(f, "{}cte {} =", ctx.indent, *id)?;
200                        }
201                        ctx.indented(|ctx| value.fmt_text(f, ctx))?;
202                    }
203                    Ok(())
204                })?;
205                writeln!(f, "{}→Return{annotations}", ctx.indent)?;
206                ctx.indented(|ctx| head.fmt_text(f, ctx))?;
207            }
208            Mfp {
209                input,
210                mfp,
211                input_key_val: _,
212            } => {
213                writeln!(f, "{}→Map/Filter/Project{annotations}", ctx.indent)?;
214                ctx.indent.set();
215
216                ctx.indent += 1;
217                mode.expr(mfp, None).fmt_default_text(f, ctx)?;
218
219                // one more nesting level if we showed anything for the MFP
220                if !mfp.is_identity() {
221                    ctx.indent += 1;
222                }
223                input.fmt_text(f, ctx)?;
224                ctx.indent.reset();
225            }
226            FlatMap {
227                input_key: _,
228                input,
229                exprs,
230                func,
231                mfp_after,
232            } => {
233                ctx.indent.set();
234                if !mfp_after.expressions.is_empty() || !mfp_after.predicates.is_empty() {
235                    writeln!(f, "{}→Fused Map/Filter/Project", ctx.indent)?;
236                    ctx.indent += 1;
237                    mode.expr(mfp_after, None).fmt_default_text(f, ctx)?;
238                    ctx.indent += 1;
239                }
240
241                let exprs = mode.seq(exprs, None);
242                let exprs = CompactScalars(exprs);
243                writeln!(
244                    f,
245                    "{}→Table Function {func}({exprs}){annotations}",
246                    ctx.indent
247                )?;
248                ctx.indent += 1;
249
250                input.fmt_text(f, ctx)?;
251
252                ctx.indent.reset();
253            }
254            Join { inputs, plan } => {
255                use crate::plan::join::JoinPlan;
256                match plan {
257                    JoinPlan::Linear(plan) => {
258                        write!(f, "{}→Differential Join", ctx.indent)?;
259                        write!(f, " %{}", plan.source_relation)?;
260                        for dsp in &plan.stage_plans {
261                            write!(f, " » %{}", dsp.lookup_relation)?;
262                        }
263                        writeln!(f, "{annotations}")?;
264                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
265                    }
266                    JoinPlan::Delta(plan) => {
267                        write!(f, "{}→Delta Join", ctx.indent)?;
268                        for dpp in &plan.path_plans {
269                            write!(f, " [%{}", dpp.source_relation)?;
270
271                            for dsp in &dpp.stage_plans {
272                                write!(f, " » %{}", dsp.lookup_relation)?;
273                            }
274                            write!(f, "]")?;
275                        }
276                        writeln!(f, "{annotations}")?;
277                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
278                    }
279                }
280
281                ctx.indented(|ctx| {
282                    for input in inputs {
283                        input.fmt_text(f, ctx)?;
284                    }
285                    Ok(())
286                })?;
287            }
288            Reduce {
289                input_key: _,
290                input,
291                key_val_plan,
292                plan,
293                mfp_after,
294            } => {
295                ctx.indent.set();
296                if !mfp_after.expressions.is_empty() || !mfp_after.predicates.is_empty() {
297                    writeln!(f, "{}→Fused Map/Filter/Project", ctx.indent)?;
298                    ctx.indent += 1;
299                    mode.expr(mfp_after, None).fmt_default_text(f, ctx)?;
300                    ctx.indent += 1;
301                }
302
303                use crate::plan::reduce::ReducePlan;
304                match plan {
305                    ReducePlan::Distinct => {
306                        writeln!(f, "{}→Distinct GroupAggregate{annotations}", ctx.indent)?;
307                    }
308                    ReducePlan::Accumulable(plan) => {
309                        writeln!(f, "{}→Accumulable GroupAggregate{annotations}", ctx.indent)?;
310                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
311                    }
312                    ReducePlan::Hierarchical(
313                        plan @ HierarchicalPlan::Bucketed(BucketedPlan { buckets, .. }),
314                    ) => {
315                        write!(
316                            f,
317                            "{}→Bucketed Hierarchical GroupAggregate (buckets: ",
318                            ctx.indent
319                        )?;
320                        for bucket in buckets {
321                            write!(f, " {bucket}")?;
322                        }
323                        writeln!(f, "){annotations}")?;
324                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
325                    }
326                    ReducePlan::Hierarchical(
327                        plan @ HierarchicalPlan::Monotonic(MonotonicPlan {
328                            must_consolidate, ..
329                        }),
330                    ) => {
331                        write!(f, "{}→", ctx.indent)?;
332                        if *must_consolidate {
333                            write!(f, "Consolidating ")?;
334                        }
335                        writeln!(f, "Monotonic GroupAggregate{annotations}",)?;
336                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
337                    }
338                    ReducePlan::Basic(plan) => {
339                        ctx.indent.set();
340                        if let BasicPlan::Single(SingleBasicPlan {
341                            fused_unnest_list, ..
342                        }) = &plan
343                        {
344                            if *fused_unnest_list {
345                                writeln!(f, "{}→Fused Table Function unnest_list", ctx.indent)?;
346                                ctx.indent += 1;
347                            }
348                        }
349                        writeln!(
350                            f,
351                            "{}→Non-incremental GroupAggregate{annotations}",
352                            ctx.indent
353                        )?;
354                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
355                        ctx.indent.reset();
356                    }
357                    ReducePlan::Collation(plan) => {
358                        writeln!(
359                            f,
360                            "{}→Collated Multi-GroupAggregate{annotations}",
361                            ctx.indent
362                        )?;
363                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
364                    }
365                }
366
367                ctx.indented(|ctx| {
368                    let kvp = key_val_plan.key_plan.deref();
369                    if !kvp.is_identity() {
370                        writeln!(f, "{}Key:", ctx.indent)?;
371                        ctx.indented(|ctx| {
372                            let key_plan = mode.expr(kvp, None);
373                            key_plan.fmt_default_text(f, ctx)
374                        })?;
375                    }
376
377                    input.fmt_text(f, ctx)
378                })?;
379
380                ctx.indent.reset();
381            }
382            TopK { input, top_k_plan } => {
383                use crate::plan::top_k::TopKPlan;
384                match top_k_plan {
385                    TopKPlan::MonotonicTop1(plan) => {
386                        write!(f, "{}→", ctx.indent)?;
387                        if plan.must_consolidate {
388                            write!(f, "Consolidating ")?;
389                        }
390                        writeln!(f, "Monotonic Top1{annotations}")?;
391
392                        ctx.indented(|ctx| {
393                            if plan.group_key.len() > 0 {
394                                let group_by = CompactScalars(mode.seq(&plan.group_key, None));
395                                writeln!(f, "{}Group By{group_by}", ctx.indent)?;
396                            }
397                            if plan.order_key.len() > 0 {
398                                let order_by = separated(", ", mode.seq(&plan.order_key, None));
399                                writeln!(f, "{}Order By {order_by}", ctx.indent)?;
400                            }
401                            Ok(())
402                        })?;
403                    }
404                    TopKPlan::MonotonicTopK(plan) => {
405                        write!(f, "{}→", ctx.indent)?;
406                        if plan.must_consolidate {
407                            write!(f, "Consolidating ")?;
408                        }
409                        writeln!(f, "Monotonic TopK{annotations}")?;
410
411                        ctx.indented(|ctx| {
412                            if plan.group_key.len() > 0 {
413                                let group_by = CompactScalars(mode.seq(&plan.group_key, None));
414                                writeln!(f, "{}Group By{group_by}", ctx.indent)?;
415                            }
416                            if plan.order_key.len() > 0 {
417                                let order_by = separated(", ", mode.seq(&plan.order_key, None));
418                                writeln!(f, "{}Order By {order_by}", ctx.indent)?;
419                            }
420                            if let Some(limit) = &plan.limit {
421                                let limit = mode.expr(limit, None);
422                                writeln!(f, "{}Limit {limit}", ctx.indent)?;
423                            }
424                            Ok(())
425                        })?;
426                    }
427                    TopKPlan::Basic(plan) => {
428                        writeln!(f, "{}→Non-monotonic TopK{annotations}", ctx.indent)?;
429
430                        ctx.indented(|ctx| {
431                            if plan.group_key.len() > 0 {
432                                let group_by = CompactScalars(mode.seq(&plan.group_key, None));
433                                writeln!(f, "{}Group By{group_by}", ctx.indent)?;
434                            }
435                            if plan.order_key.len() > 0 {
436                                let order_by = separated(", ", mode.seq(&plan.order_key, None));
437                                writeln!(f, "{}Order By {order_by}", ctx.indent)?;
438                            }
439                            if let Some(limit) = &plan.limit {
440                                let limit = mode.expr(limit, None);
441                                writeln!(f, "{}Limit {limit}", ctx.indent)?;
442                            }
443                            if plan.offset != 0 {
444                                let offset = plan.offset;
445                                writeln!(f, "{}Offset {offset}", ctx.indent)?;
446                            }
447                            Ok(())
448                        })?;
449                    }
450                }
451
452                ctx.indented(|ctx| input.fmt_text(f, ctx))?;
453            }
454            Negate { input } => {
455                writeln!(f, "{}→Negate Diffs{annotations}", ctx.indent)?;
456
457                ctx.indented(|ctx| input.fmt_text(f, ctx))?;
458            }
459            Threshold {
460                input,
461                threshold_plan,
462            } => {
463                match threshold_plan {
464                    ThresholdPlan::Basic(plan) => {
465                        write!(f, "{}→Threshold Diffs ", ctx.indent)?;
466                        let ensure_arrangement = Arrangement::from(&plan.ensure_arrangement);
467                        ensure_arrangement.fmt_text(f, ctx)?;
468                        writeln!(f, "{annotations}")?;
469                    }
470                };
471
472                ctx.indented(|ctx| input.fmt_text(f, ctx))?;
473            }
474            Union {
475                inputs,
476                consolidate_output,
477            } => {
478                write!(f, "{}→", ctx.indent)?;
479                if *consolidate_output {
480                    write!(f, "Consolidating ")?;
481                }
482                writeln!(f, "Union{annotations}")?;
483
484                ctx.indented(|ctx| {
485                    for input in inputs.iter() {
486                        input.fmt_text(f, ctx)?;
487                    }
488                    Ok(())
489                })?;
490            }
491            ArrangeBy {
492                input_key: _,
493                input,
494                input_mfp,
495                forms,
496            } => {
497                if forms.raw && forms.arranged.is_empty() {
498                    soft_assert_or_log!(forms.raw, "raw stream with no arrangements");
499                    writeln!(f, "{}→Unarranged Raw Stream{annotations}", ctx.indent)?;
500                } else {
501                    write!(f, "{}→Arrange", ctx.indent)?;
502
503                    if !forms.arranged.is_empty() {
504                        let mode = HumanizedExplain::new(ctx.config.redacted);
505                        for (key, _, _) in &forms.arranged {
506                            if !key.is_empty() {
507                                let key = mode.seq(key, None);
508                                let key = CompactScalars(key);
509                                write!(f, " ({key})")?;
510                            } else {
511                                write!(f, " (empty key)")?;
512                            }
513                        }
514                    }
515                    writeln!(f, "{annotations}")?;
516                }
517
518                ctx.indented(|ctx| {
519                    if !input_mfp.expressions.is_empty() || !input_mfp.predicates.is_empty() {
520                        writeln!(f, "{}Pre-process Map/Filter/Project", ctx.indent)?;
521                        ctx.indented(|ctx| mode.expr(input_mfp, None).fmt_default_text(f, ctx))?;
522                    }
523
524                    input.fmt_text(f, ctx)
525                })?;
526            }
527        }
528
529        Ok(())
530    }
531
532    fn fmt_verbose_text(
533        &self,
534        f: &mut fmt::Formatter<'_>,
535        ctx: &mut PlanRenderingContext<'_, Plan>,
536    ) -> fmt::Result {
537        use PlanNode::*;
538
539        let mode = HumanizedExplain::new(ctx.config.redacted);
540        let annotations = PlanAnnotations::new(ctx.config.clone(), self);
541
542        match &self.node {
543            Constant { rows } => match rows {
544                Ok(rows) => {
545                    if !rows.is_empty() {
546                        writeln!(f, "{}Constant{}", ctx.indent, annotations)?;
547                        ctx.indented(|ctx| {
548                            fmt_text_constant_rows(
549                                f,
550                                rows.iter().map(|(data, _, diff)| (data, diff)),
551                                &mut ctx.indent,
552                                ctx.config.redacted,
553                            )
554                        })?;
555                    } else {
556                        writeln!(f, "{}Constant <empty>{}", ctx.indent, annotations)?;
557                    }
558                }
559                Err(err) => {
560                    if mode.redacted() {
561                        writeln!(f, "{}Error █{}", ctx.indent, annotations)?;
562                    } else {
563                        {
564                            writeln!(
565                                f,
566                                "{}Error {}{}",
567                                ctx.indent,
568                                err.to_string().quoted(),
569                                annotations
570                            )?;
571                        }
572                    }
573                }
574            },
575
576            Get { id, keys, plan } => {
577                ctx.indent.set(); // mark the current indent level
578
579                // Resolve the id as a string.
580                let id = match id {
581                    Id::Local(id) => id.to_string(),
582                    Id::Global(id) => ctx
583                        .humanizer
584                        .humanize_id(*id)
585                        .unwrap_or_else(|| id.to_string()),
586                };
587                // Render plan-specific fields.
588                use crate::plan::GetPlan;
589                match plan {
590                    GetPlan::PassArrangements => {
591                        writeln!(
592                            f,
593                            "{}Get::PassArrangements {}{}",
594                            ctx.indent, id, annotations
595                        )?;
596                        ctx.indent += 1;
597                    }
598                    GetPlan::Arrangement(key, val, mfp) => {
599                        writeln!(f, "{}Get::Arrangement {}{}", ctx.indent, id, annotations)?;
600                        ctx.indent += 1;
601                        mode.expr(mfp, None).fmt_text(f, ctx)?;
602                        {
603                            let key = mode.seq(key, None);
604                            let key = CompactScalars(key);
605                            writeln!(f, "{}key={}", ctx.indent, key)?;
606                        }
607                        if let Some(val) = val {
608                            let val = mode.expr(val, None);
609                            writeln!(f, "{}val={}", ctx.indent, val)?;
610                        }
611                    }
612                    GetPlan::Collection(mfp) => {
613                        writeln!(f, "{}Get::Collection {}{}", ctx.indent, id, annotations)?;
614                        ctx.indent += 1;
615                        mode.expr(mfp, None).fmt_text(f, ctx)?;
616                    }
617                }
618
619                // Render plan-agnostic fields (common for all plans for this variant).
620                keys.fmt_text(f, ctx)?;
621
622                ctx.indent.reset(); // reset the original indent level
623            }
624            Let { id, value, body } => {
625                let mut bindings = vec![(id, value.as_ref())];
626                let mut head = body.as_ref();
627
628                // Render Let-blocks nested in the body an outer Let-block in one step
629                // with a flattened list of bindings
630                while let Let { id, value, body } = &head.node {
631                    bindings.push((id, value.as_ref()));
632                    head = body.as_ref();
633                }
634
635                writeln!(f, "{}With", ctx.indent)?;
636                ctx.indented(|ctx| {
637                    for (id, value) in bindings.iter() {
638                        writeln!(f, "{}cte {} =", ctx.indent, *id)?;
639                        ctx.indented(|ctx| value.fmt_text(f, ctx))?;
640                    }
641                    Ok(())
642                })?;
643                writeln!(f, "{}Return{}", ctx.indent, annotations)?;
644                ctx.indented(|ctx| head.fmt_text(f, ctx))?;
645            }
646            LetRec {
647                ids,
648                values,
649                limits,
650                body,
651            } => {
652                let bindings = izip!(ids.iter(), values, limits).collect_vec();
653                let head = body.as_ref();
654
655                writeln!(f, "{}With Mutually Recursive", ctx.indent)?;
656                ctx.indented(|ctx| {
657                    for (id, value, limit) in bindings.iter() {
658                        if let Some(limit) = limit {
659                            writeln!(f, "{}cte {} {} =", ctx.indent, limit, *id)?;
660                        } else {
661                            writeln!(f, "{}cte {} =", ctx.indent, *id)?;
662                        }
663                        ctx.indented(|ctx| value.fmt_text(f, ctx))?;
664                    }
665                    Ok(())
666                })?;
667                writeln!(f, "{}Return{}", ctx.indent, annotations)?;
668                ctx.indented(|ctx| head.fmt_text(f, ctx))?;
669            }
670            Mfp {
671                input,
672                mfp,
673                input_key_val,
674            } => {
675                writeln!(f, "{}Mfp{}", ctx.indent, annotations)?;
676                ctx.indented(|ctx| {
677                    mode.expr(mfp, None).fmt_text(f, ctx)?;
678                    if let Some((key, val)) = input_key_val {
679                        {
680                            let key = mode.seq(key, None);
681                            let key = CompactScalars(key);
682                            writeln!(f, "{}input_key={}", ctx.indent, key)?;
683                        }
684                        if let Some(val) = val {
685                            let val = mode.expr(val, None);
686                            writeln!(f, "{}input_val={}", ctx.indent, val)?;
687                        }
688                    }
689                    input.fmt_text(f, ctx)
690                })?;
691            }
692            FlatMap {
693                input_key,
694                input,
695                exprs,
696                func,
697                mfp_after,
698            } => {
699                let exprs = mode.seq(exprs, None);
700                let exprs = CompactScalars(exprs);
701                writeln!(
702                    f,
703                    "{}FlatMap {}({}){}",
704                    ctx.indent, func, exprs, annotations
705                )?;
706                ctx.indented(|ctx| {
707                    if let Some(key) = input_key {
708                        let key = mode.seq(key, None);
709                        let key = CompactScalars(key);
710                        writeln!(f, "{}input_key={}", ctx.indent, key)?;
711                    }
712                    if !mfp_after.is_identity() {
713                        writeln!(f, "{}mfp_after", ctx.indent)?;
714                        ctx.indented(|ctx| mode.expr(mfp_after, None).fmt_text(f, ctx))?;
715                    }
716                    input.fmt_text(f, ctx)
717                })?;
718            }
719            Join { inputs, plan } => {
720                use crate::plan::join::JoinPlan;
721                match plan {
722                    JoinPlan::Linear(plan) => {
723                        writeln!(f, "{}Join::Linear{}", ctx.indent, annotations)?;
724                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
725                    }
726                    JoinPlan::Delta(plan) => {
727                        writeln!(f, "{}Join::Delta{}", ctx.indent, annotations)?;
728                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
729                    }
730                }
731                ctx.indented(|ctx| {
732                    for input in inputs {
733                        input.fmt_text(f, ctx)?;
734                    }
735                    Ok(())
736                })?;
737            }
738            Reduce {
739                input_key,
740                input,
741                key_val_plan,
742                plan,
743                mfp_after,
744            } => {
745                use crate::plan::reduce::ReducePlan;
746                match plan {
747                    ReducePlan::Distinct => {
748                        writeln!(f, "{}Reduce::Distinct{}", ctx.indent, annotations)?;
749                    }
750                    ReducePlan::Accumulable(plan) => {
751                        writeln!(f, "{}Reduce::Accumulable{}", ctx.indent, annotations)?;
752                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
753                    }
754                    ReducePlan::Hierarchical(plan) => {
755                        writeln!(f, "{}Reduce::Hierarchical{}", ctx.indent, annotations)?;
756                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
757                    }
758                    ReducePlan::Basic(plan) => {
759                        writeln!(f, "{}Reduce::Basic{}", ctx.indent, annotations)?;
760                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
761                    }
762                    ReducePlan::Collation(plan) => {
763                        writeln!(f, "{}Reduce::Collation{}", ctx.indent, annotations)?;
764                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
765                    }
766                }
767                ctx.indented(|ctx| {
768                    if let Some(key) = input_key {
769                        let key = mode.seq(key, None);
770                        let key = CompactScalars(key);
771                        writeln!(f, "{}input_key={}", ctx.indent, key)?;
772                    }
773                    if key_val_plan.key_plan.deref().is_identity() {
774                        writeln!(f, "{}key_plan=id", ctx.indent)?;
775                    } else {
776                        writeln!(f, "{}key_plan", ctx.indent)?;
777                        ctx.indented(|ctx| {
778                            let key_plan = mode.expr(key_val_plan.key_plan.deref(), None);
779                            key_plan.fmt_text(f, ctx)
780                        })?;
781                    }
782                    if key_val_plan.val_plan.deref().is_identity() {
783                        writeln!(f, "{}val_plan=id", ctx.indent)?;
784                    } else {
785                        writeln!(f, "{}val_plan", ctx.indent)?;
786                        ctx.indented(|ctx| {
787                            let val_plan = mode.expr(key_val_plan.val_plan.deref(), None);
788                            val_plan.fmt_text(f, ctx)
789                        })?;
790                    }
791                    if !mfp_after.is_identity() {
792                        writeln!(f, "{}mfp_after", ctx.indent)?;
793                        ctx.indented(|ctx| mode.expr(mfp_after, None).fmt_text(f, ctx))?;
794                    }
795
796                    input.fmt_text(f, ctx)
797                })?;
798            }
799            TopK { input, top_k_plan } => {
800                use crate::plan::top_k::TopKPlan;
801                match top_k_plan {
802                    TopKPlan::MonotonicTop1(plan) => {
803                        write!(f, "{}TopK::MonotonicTop1", ctx.indent)?;
804                        if plan.group_key.len() > 0 {
805                            let group_by = mode.seq(&plan.group_key, None);
806                            let group_by = CompactScalars(group_by);
807                            write!(f, " group_by=[{}]", group_by)?;
808                        }
809                        if plan.order_key.len() > 0 {
810                            let order_by = mode.seq(&plan.order_key, None);
811                            let order_by = separated(", ", order_by);
812                            write!(f, " order_by=[{}]", order_by)?;
813                        }
814                        if plan.must_consolidate {
815                            write!(f, " must_consolidate")?;
816                        }
817                    }
818                    TopKPlan::MonotonicTopK(plan) => {
819                        write!(f, "{}TopK::MonotonicTopK", ctx.indent)?;
820                        if plan.group_key.len() > 0 {
821                            let group_by = mode.seq(&plan.group_key, None);
822                            let group_by = CompactScalars(group_by);
823                            write!(f, " group_by=[{}]", group_by)?;
824                        }
825                        if plan.order_key.len() > 0 {
826                            let order_by = mode.seq(&plan.order_key, None);
827                            let order_by = separated(", ", order_by);
828                            write!(f, " order_by=[{}]", order_by)?;
829                        }
830                        if let Some(limit) = &plan.limit {
831                            let limit = mode.expr(limit, None);
832                            write!(f, " limit={}", limit)?;
833                        }
834                        if plan.must_consolidate {
835                            write!(f, " must_consolidate")?;
836                        }
837                    }
838                    TopKPlan::Basic(plan) => {
839                        write!(f, "{}TopK::Basic", ctx.indent)?;
840                        if plan.group_key.len() > 0 {
841                            let group_by = mode.seq(&plan.group_key, None);
842                            let group_by = CompactScalars(group_by);
843                            write!(f, " group_by=[{}]", group_by)?;
844                        }
845                        if plan.order_key.len() > 0 {
846                            let order_by = mode.seq(&plan.order_key, None);
847                            let order_by = separated(", ", order_by);
848                            write!(f, " order_by=[{}]", order_by)?;
849                        }
850                        if let Some(limit) = &plan.limit {
851                            let limit = mode.expr(limit, None);
852                            write!(f, " limit={}", limit)?;
853                        }
854                        if &plan.offset > &0 {
855                            write!(f, " offset={}", plan.offset)?;
856                        }
857                    }
858                }
859                writeln!(f, "{}", annotations)?;
860                ctx.indented(|ctx| input.fmt_text(f, ctx))?;
861            }
862            Negate { input } => {
863                writeln!(f, "{}Negate{}", ctx.indent, annotations)?;
864                ctx.indented(|ctx| input.fmt_text(f, ctx))?;
865            }
866            Threshold {
867                input,
868                threshold_plan,
869            } => {
870                use crate::plan::threshold::ThresholdPlan;
871                match threshold_plan {
872                    ThresholdPlan::Basic(plan) => {
873                        let ensure_arrangement = Arrangement::from(&plan.ensure_arrangement);
874                        write!(f, "{}Threshold::Basic", ctx.indent)?;
875                        write!(f, " ensure_arrangement=")?;
876                        ensure_arrangement.fmt_text(f, ctx)?;
877                        writeln!(f, "{}", annotations)?;
878                    }
879                };
880                ctx.indented(|ctx| input.fmt_text(f, ctx))?;
881            }
882            Union {
883                inputs,
884                consolidate_output,
885            } => {
886                if *consolidate_output {
887                    writeln!(
888                        f,
889                        "{}Union consolidate_output={}{}",
890                        ctx.indent, consolidate_output, annotations
891                    )?;
892                } else {
893                    writeln!(f, "{}Union{}", ctx.indent, annotations)?;
894                }
895                ctx.indented(|ctx| {
896                    for input in inputs.iter() {
897                        input.fmt_text(f, ctx)?;
898                    }
899                    Ok(())
900                })?;
901            }
902            ArrangeBy {
903                input_key,
904                input,
905                input_mfp,
906                forms,
907            } => {
908                writeln!(f, "{}ArrangeBy{}", ctx.indent, annotations)?;
909                ctx.indented(|ctx| {
910                    if let Some(key) = input_key {
911                        let key = mode.seq(key, None);
912                        let key = CompactScalars(key);
913                        writeln!(f, "{}input_key=[{}]", ctx.indent, key)?;
914                    }
915                    mode.expr(input_mfp, None).fmt_text(f, ctx)?;
916                    forms.fmt_text(f, ctx)?;
917                    // Render input
918                    input.fmt_text(f, ctx)
919                })?;
920            }
921        }
922
923        Ok(())
924    }
925}
926
927impl DisplayText<PlanRenderingContext<'_, Plan>> for AvailableCollections {
928    fn fmt_text(
929        &self,
930        f: &mut fmt::Formatter<'_>,
931        ctx: &mut PlanRenderingContext<'_, Plan>,
932    ) -> fmt::Result {
933        if ctx.config.verbose_syntax {
934            self.fmt_verbose_text(f, ctx)
935        } else {
936            self.fmt_default_text(f, ctx)
937        }
938    }
939}
940impl AvailableCollections {
941    fn fmt_default_text(
942        &self,
943        f: &mut fmt::Formatter<'_>,
944        ctx: &mut PlanRenderingContext<'_, Plan>,
945    ) -> fmt::Result {
946        let plural = if self.arranged.len() == 1 { "" } else { "s" };
947        write!(
948            f,
949            "{}Keys: {} arrangement{plural} available",
950            ctx.indent,
951            self.arranged.len()
952        )?;
953
954        if self.raw {
955            writeln!(f, ", plus raw stream")?;
956        } else {
957            writeln!(f, ", no raw stream")?;
958        }
959
960        ctx.indented(|ctx| {
961            for (i, arrangement) in self.arranged.iter().enumerate() {
962                let arrangement = Arrangement::from(arrangement);
963                write!(f, "{}Arrangement {i}: ", ctx.indent)?;
964                arrangement.fmt_text(f, ctx)?;
965                writeln!(f, "")?;
966            }
967            Ok(())
968        })?;
969
970        Ok(())
971    }
972
973    fn fmt_verbose_text(
974        &self,
975        f: &mut fmt::Formatter<'_>,
976        ctx: &mut PlanRenderingContext<'_, Plan>,
977    ) -> fmt::Result {
978        // raw field
979        let raw = &self.raw;
980        writeln!(f, "{}raw={}", ctx.indent, raw)?;
981        // arranged field
982        for (i, arrangement) in self.arranged.iter().enumerate() {
983            let arrangement = Arrangement::from(arrangement);
984            write!(f, "{}arrangements[{}]=", ctx.indent, i)?;
985            arrangement.fmt_text(f, ctx)?;
986            writeln!(f, "")?;
987        }
988        Ok(())
989    }
990}
991
992impl DisplayText<PlanRenderingContext<'_, Plan>> for LinearJoinPlan {
993    fn fmt_text(
994        &self,
995        f: &mut fmt::Formatter<'_>,
996        ctx: &mut PlanRenderingContext<'_, Plan>,
997    ) -> fmt::Result {
998        if ctx.config.verbose_syntax {
999            self.fmt_verbose_text(f, ctx)
1000        } else {
1001            self.fmt_default_text(f, ctx)
1002        }
1003    }
1004}
1005impl LinearJoinPlan {
1006    #[allow(clippy::needless_pass_by_ref_mut)]
1007    fn fmt_default_text(
1008        &self,
1009        f: &mut fmt::Formatter<'_>,
1010        ctx: &mut PlanRenderingContext<'_, Plan>,
1011    ) -> fmt::Result {
1012        for (i, plan) in self.stage_plans.iter().enumerate().rev() {
1013            let lookup_relation = &plan.lookup_relation;
1014            write!(f, "{}Join stage {i} in %{lookup_relation}", ctx.indent)?;
1015            if !plan.lookup_key.is_empty() {
1016                let lookup_key = CompactScalarSeq(&plan.lookup_key);
1017                writeln!(f, " with lookup key {lookup_key}",)?;
1018            } else {
1019                writeln!(f)?;
1020            }
1021            if plan.closure.maps_or_filters() {
1022                ctx.indented(|ctx| plan.closure.fmt_default_text(f, ctx))?;
1023            }
1024        }
1025        if let Some(final_closure) = &self.final_closure {
1026            if final_closure.maps_or_filters() {
1027                ctx.indented(|ctx| {
1028                    writeln!(f, "{}Final closure:", ctx.indent)?;
1029                    ctx.indented(|ctx| final_closure.fmt_default_text(f, ctx))
1030                })?;
1031            }
1032        }
1033        Ok(())
1034    }
1035
1036    fn fmt_verbose_text(
1037        &self,
1038        f: &mut fmt::Formatter<'_>,
1039        ctx: &mut PlanRenderingContext<'_, Plan>,
1040    ) -> fmt::Result {
1041        let mode = HumanizedExplain::new(ctx.config.redacted);
1042        let plan = self;
1043        if let Some(closure) = plan.final_closure.as_ref() {
1044            if !closure.is_identity() {
1045                writeln!(f, "{}final_closure", ctx.indent)?;
1046                ctx.indented(|ctx| closure.fmt_text(f, ctx))?;
1047            }
1048        }
1049        for (i, plan) in plan.stage_plans.iter().enumerate() {
1050            writeln!(f, "{}linear_stage[{}]", ctx.indent, i)?;
1051            ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1052        }
1053        if let Some(closure) = plan.initial_closure.as_ref() {
1054            if !closure.is_identity() {
1055                writeln!(f, "{}initial_closure", ctx.indent)?;
1056                ctx.indented(|ctx| closure.fmt_text(f, ctx))?;
1057            }
1058        }
1059        match &plan.source_key {
1060            Some(source_key) => {
1061                let source_key = mode.seq(source_key, None);
1062                let source_key = CompactScalars(source_key);
1063                writeln!(
1064                    f,
1065                    "{}source={{ relation={}, key=[{}] }}",
1066                    ctx.indent, &plan.source_relation, source_key
1067                )?
1068            }
1069            None => writeln!(
1070                f,
1071                "{}source={{ relation={}, key=[] }}",
1072                ctx.indent, &plan.source_relation
1073            )?,
1074        };
1075        Ok(())
1076    }
1077}
1078
1079impl DisplayText<PlanRenderingContext<'_, Plan>> for LinearStagePlan {
1080    fn fmt_text(
1081        &self,
1082        f: &mut fmt::Formatter<'_>,
1083        ctx: &mut PlanRenderingContext<'_, Plan>,
1084    ) -> fmt::Result {
1085        if ctx.config.verbose_syntax {
1086            self.fmt_verbose_text(f, ctx)
1087        } else {
1088            self.fmt_default_text(f, ctx)
1089        }
1090    }
1091}
1092impl LinearStagePlan {
1093    #[allow(clippy::needless_pass_by_ref_mut)]
1094    fn fmt_default_text(
1095        &self,
1096        f: &mut fmt::Formatter<'_>,
1097        ctx: &mut PlanRenderingContext<'_, Plan>,
1098    ) -> fmt::Result {
1099        // NB this code path should not be live, as fmt_default_text for
1100        // `LinearJoinPlan` prints out each stage already
1101        let lookup_relation = &self.lookup_relation;
1102        if !self.lookup_key.is_empty() {
1103            let lookup_key = CompactScalarSeq(&self.lookup_key);
1104            writeln!(
1105                f,
1106                "{}Lookup key {lookup_key} in %{lookup_relation}",
1107                ctx.indent
1108            )
1109        } else {
1110            writeln!(f, "{}Lookup in %{lookup_relation}", ctx.indent)
1111        }
1112    }
1113
1114    fn fmt_verbose_text(
1115        &self,
1116        f: &mut fmt::Formatter<'_>,
1117        ctx: &mut PlanRenderingContext<'_, Plan>,
1118    ) -> fmt::Result {
1119        let mode = HumanizedExplain::new(ctx.config.redacted);
1120
1121        let plan = self;
1122        if !plan.closure.is_identity() {
1123            writeln!(f, "{}closure", ctx.indent)?;
1124            ctx.indented(|ctx| plan.closure.fmt_text(f, ctx))?;
1125        }
1126        {
1127            let lookup_relation = &plan.lookup_relation;
1128            let lookup_key = CompactScalarSeq(&plan.lookup_key);
1129            writeln!(
1130                f,
1131                "{}lookup={{ relation={}, key=[{}] }}",
1132                ctx.indent, lookup_relation, lookup_key
1133            )?;
1134        }
1135        {
1136            let stream_key = mode.seq(&plan.stream_key, None);
1137            let stream_key = CompactScalars(stream_key);
1138            let stream_thinning = Indices(&plan.stream_thinning);
1139            writeln!(
1140                f,
1141                "{}stream={{ key=[{}], thinning=({}) }}",
1142                ctx.indent, stream_key, stream_thinning
1143            )?;
1144        }
1145        Ok(())
1146    }
1147}
1148
1149impl DisplayText<PlanRenderingContext<'_, Plan>> for DeltaJoinPlan {
1150    fn fmt_text(
1151        &self,
1152        f: &mut fmt::Formatter<'_>,
1153        ctx: &mut PlanRenderingContext<'_, Plan>,
1154    ) -> fmt::Result {
1155        if ctx.config.verbose_syntax {
1156            self.fmt_verbose_text(f, ctx)
1157        } else {
1158            self.fmt_default_text(f, ctx)
1159        }
1160    }
1161}
1162impl DeltaJoinPlan {
1163    fn fmt_default_text(
1164        &self,
1165        f: &mut fmt::Formatter<'_>,
1166        ctx: &mut PlanRenderingContext<'_, Plan>,
1167    ) -> fmt::Result {
1168        for (i, plan) in self.path_plans.iter().enumerate() {
1169            writeln!(f, "{}Delta join path for input %{i}", ctx.indent)?;
1170            ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1171        }
1172        Ok(())
1173    }
1174
1175    fn fmt_verbose_text(
1176        &self,
1177        f: &mut fmt::Formatter<'_>,
1178        ctx: &mut PlanRenderingContext<'_, Plan>,
1179    ) -> fmt::Result {
1180        for (i, plan) in self.path_plans.iter().enumerate() {
1181            writeln!(f, "{}plan_path[{}]", ctx.indent, i)?;
1182            ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1183        }
1184        Ok(())
1185    }
1186}
1187
1188impl DisplayText<PlanRenderingContext<'_, Plan>> for DeltaPathPlan {
1189    fn fmt_text(
1190        &self,
1191        f: &mut fmt::Formatter<'_>,
1192        ctx: &mut PlanRenderingContext<'_, Plan>,
1193    ) -> fmt::Result {
1194        if ctx.config.verbose_syntax {
1195            self.fmt_verbose_text(f, ctx)
1196        } else {
1197            self.fmt_default_text(f, ctx)
1198        }
1199    }
1200}
1201
1202impl DeltaPathPlan {
1203    #[allow(clippy::needless_pass_by_ref_mut)]
1204    fn fmt_default_text(
1205        &self,
1206        f: &mut fmt::Formatter<'_>,
1207        ctx: &mut PlanRenderingContext<'_, Plan>,
1208    ) -> fmt::Result {
1209        for (
1210            i,
1211            DeltaStagePlan {
1212                lookup_key,
1213                lookup_relation,
1214                closure,
1215                ..
1216            },
1217        ) in self.stage_plans.iter().enumerate()
1218        {
1219            if !lookup_key.is_empty() {
1220                let lookup_key = CompactScalarSeq(lookup_key);
1221                writeln!(
1222                    f,
1223                    "{}stage {i} for %{lookup_relation}: lookup key {lookup_key}",
1224                    ctx.indent
1225                )?;
1226            } else {
1227                writeln!(f, "{}stage %{i} for  %{lookup_relation}", ctx.indent)?;
1228            }
1229            if closure.maps_or_filters() {
1230                ctx.indented(|ctx| closure.fmt_default_text(f, ctx))?;
1231            }
1232        }
1233        if let Some(final_closure) = &self.final_closure {
1234            if final_closure.maps_or_filters() {
1235                ctx.indented(|ctx| {
1236                    writeln!(f, "{}Final closure:", ctx.indent)?;
1237                    ctx.indented(|ctx| final_closure.fmt_default_text(f, ctx))
1238                })?;
1239            }
1240        }
1241        Ok(())
1242    }
1243
1244    fn fmt_verbose_text(
1245        &self,
1246        f: &mut fmt::Formatter<'_>,
1247        ctx: &mut PlanRenderingContext<'_, Plan>,
1248    ) -> fmt::Result {
1249        let mode = HumanizedExplain::new(ctx.config.redacted);
1250        let plan = self;
1251        if let Some(closure) = plan.final_closure.as_ref() {
1252            if !closure.is_identity() {
1253                writeln!(f, "{}final_closure", ctx.indent)?;
1254                ctx.indented(|ctx| closure.fmt_text(f, ctx))?;
1255            }
1256        }
1257        for (i, plan) in plan.stage_plans.iter().enumerate().rev() {
1258            writeln!(f, "{}delta_stage[{}]", ctx.indent, i)?;
1259            ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1260        }
1261        if !plan.initial_closure.is_identity() {
1262            writeln!(f, "{}initial_closure", ctx.indent)?;
1263            ctx.indented(|ctx| plan.initial_closure.fmt_text(f, ctx))?;
1264        }
1265        {
1266            let source_relation = &plan.source_relation;
1267            let source_key = mode.seq(&plan.source_key, None);
1268            let source_key = CompactScalars(source_key);
1269            writeln!(
1270                f,
1271                "{}source={{ relation={}, key=[{}] }}",
1272                ctx.indent, source_relation, source_key
1273            )?;
1274        }
1275        Ok(())
1276    }
1277}
1278
1279impl DisplayText<PlanRenderingContext<'_, Plan>> for DeltaStagePlan {
1280    fn fmt_text(
1281        &self,
1282        f: &mut fmt::Formatter<'_>,
1283        ctx: &mut PlanRenderingContext<'_, Plan>,
1284    ) -> fmt::Result {
1285        if ctx.config.verbose_syntax {
1286            self.fmt_verbose_text(f, ctx)
1287        } else {
1288            self.fmt_default_text(f, ctx)
1289        }
1290    }
1291}
1292impl DeltaStagePlan {
1293    #[allow(clippy::needless_pass_by_ref_mut)]
1294    fn fmt_default_text(
1295        &self,
1296        f: &mut fmt::Formatter<'_>,
1297        ctx: &mut PlanRenderingContext<'_, Plan>,
1298    ) -> fmt::Result {
1299        // NB this code path should not be live, as fmt_default_text for
1300        // `DeltaPathPlan` prints out each stage already
1301        let lookup_relation = &self.lookup_relation;
1302        let lookup_key = CompactScalarSeq(&self.lookup_key);
1303        writeln!(
1304            f,
1305            "{}Lookup key {lookup_key} in %{lookup_relation}",
1306            ctx.indent
1307        )
1308    }
1309
1310    fn fmt_verbose_text(
1311        &self,
1312        f: &mut fmt::Formatter<'_>,
1313        ctx: &mut PlanRenderingContext<'_, Plan>,
1314    ) -> fmt::Result {
1315        let mode = HumanizedExplain::new(ctx.config.redacted);
1316        let plan = self;
1317        if !plan.closure.is_identity() {
1318            writeln!(f, "{}closure", ctx.indent)?;
1319            ctx.indented(|ctx| plan.closure.fmt_text(f, ctx))?;
1320        }
1321        {
1322            let lookup_relation = &plan.lookup_relation;
1323            let lookup_key = mode.seq(&plan.lookup_key, None);
1324            let lookup_key = CompactScalars(lookup_key);
1325            writeln!(
1326                f,
1327                "{}lookup={{ relation={}, key=[{}] }}",
1328                ctx.indent, lookup_relation, lookup_key
1329            )?;
1330        }
1331        {
1332            let stream_key = mode.seq(&plan.stream_key, None);
1333            let stream_key = CompactScalars(stream_key);
1334            let stream_thinning = mode.seq(&plan.stream_thinning, None);
1335            let stream_thinning = CompactScalars(stream_thinning);
1336            writeln!(
1337                f,
1338                "{}stream={{ key=[{}], thinning=({}) }}",
1339                ctx.indent, stream_key, stream_thinning
1340            )?;
1341        }
1342        Ok(())
1343    }
1344}
1345
1346impl DisplayText<PlanRenderingContext<'_, Plan>> for JoinClosure {
1347    fn fmt_text(
1348        &self,
1349        f: &mut fmt::Formatter<'_>,
1350        ctx: &mut PlanRenderingContext<'_, Plan>,
1351    ) -> fmt::Result {
1352        if ctx.config.verbose_syntax {
1353            self.fmt_verbose_text(f, ctx)
1354        } else {
1355            self.fmt_default_text(f, ctx)
1356        }
1357    }
1358}
1359impl JoinClosure {
1360    fn fmt_default_text(
1361        &self,
1362        f: &mut fmt::Formatter<'_>,
1363        ctx: &mut PlanRenderingContext<'_, Plan>,
1364    ) -> fmt::Result {
1365        let mode = HumanizedExplain::new(ctx.config.redacted);
1366        if !self.before.expressions.is_empty() || !self.before.predicates.is_empty() {
1367            mode.expr(self.before.deref(), None).fmt_text(f, ctx)?;
1368        }
1369        if !self.ready_equivalences.is_empty() {
1370            let equivalences = separated(
1371                " AND ",
1372                self.ready_equivalences
1373                    .iter()
1374                    .map(|equivalence| separated(" = ", mode.seq(equivalence, None))),
1375            );
1376            writeln!(f, "{}Equivalences: {equivalences}", ctx.indent)?;
1377        }
1378        Ok(())
1379    }
1380
1381    fn fmt_verbose_text(
1382        &self,
1383        f: &mut fmt::Formatter<'_>,
1384        ctx: &mut PlanRenderingContext<'_, Plan>,
1385    ) -> fmt::Result {
1386        let mode = HumanizedExplain::new(ctx.config.redacted);
1387        mode.expr(self.before.deref(), None).fmt_text(f, ctx)?;
1388        if !self.ready_equivalences.is_empty() {
1389            let equivalences = separated(
1390                " AND ",
1391                self.ready_equivalences
1392                    .iter()
1393                    .map(|equivalence| separated(" = ", mode.seq(equivalence, None))),
1394            );
1395            writeln!(f, "{}ready_equivalences={}", ctx.indent, equivalences)?;
1396        }
1397        Ok(())
1398    }
1399}
1400
1401impl DisplayText<PlanRenderingContext<'_, Plan>> for AccumulablePlan {
1402    fn fmt_text(
1403        &self,
1404        f: &mut fmt::Formatter<'_>,
1405        ctx: &mut PlanRenderingContext<'_, Plan>,
1406    ) -> fmt::Result {
1407        if ctx.config.verbose_syntax {
1408            self.fmt_verbose_text(f, ctx)
1409        } else {
1410            self.fmt_default_text(f, ctx)
1411        }
1412    }
1413}
1414impl AccumulablePlan {
1415    #[allow(clippy::needless_pass_by_ref_mut)]
1416    fn fmt_default_text(
1417        &self,
1418        f: &mut fmt::Formatter<'_>,
1419        ctx: &mut PlanRenderingContext<'_, Plan>,
1420    ) -> fmt::Result {
1421        let mode = HumanizedExplain::new(ctx.config.redacted);
1422
1423        if !self.simple_aggrs.is_empty() {
1424            let simple_aggrs = self
1425                .simple_aggrs
1426                .iter()
1427                .map(|(_i_aggs, _i_datum, agg)| mode.expr(agg, None));
1428            let simple_aggrs = separated(", ", simple_aggrs);
1429            writeln!(f, "{}Simple aggregates: {simple_aggrs}", ctx.indent)?;
1430        }
1431
1432        if !self.distinct_aggrs.is_empty() {
1433            let distinct_aggrs = self
1434                .distinct_aggrs
1435                .iter()
1436                .map(|(_i_aggs, _i_datum, agg)| mode.expr(agg, None));
1437            let distinct_aggrs = separated(", ", distinct_aggrs);
1438            writeln!(f, "{}Distinct aggregates: {distinct_aggrs}", ctx.indent)?;
1439        }
1440        Ok(())
1441    }
1442
1443    #[allow(clippy::needless_pass_by_ref_mut)]
1444    fn fmt_verbose_text(
1445        &self,
1446        f: &mut fmt::Formatter<'_>,
1447        ctx: &mut PlanRenderingContext<'_, Plan>,
1448    ) -> fmt::Result {
1449        let mode = HumanizedExplain::new(ctx.config.redacted);
1450        // full_aggrs (skipped because they are repeated in simple_aggrs ∪ distinct_aggrs)
1451        // for (i, aggr) in self.full_aggrs.iter().enumerate() {
1452        //     write!(f, "{}full_aggrs[{}]=", ctx.indent, i)?;
1453        //     aggr.fmt_text(f, &mut ())?;
1454        //     writeln!(f)?;
1455        // }
1456        // simple_aggrs
1457        for (i, (i_aggs, i_datum, agg)) in self.simple_aggrs.iter().enumerate() {
1458            let agg = mode.expr(agg, None);
1459            write!(f, "{}simple_aggrs[{}]=", ctx.indent, i)?;
1460            writeln!(f, "({}, {}, {})", i_aggs, i_datum, agg)?;
1461        }
1462        // distinct_aggrs
1463        for (i, (i_aggs, i_datum, agg)) in self.distinct_aggrs.iter().enumerate() {
1464            let agg = mode.expr(agg, None);
1465            write!(f, "{}distinct_aggrs[{}]=", ctx.indent, i)?;
1466            writeln!(f, "({}, {}, {})", i_aggs, i_datum, agg)?;
1467        }
1468        Ok(())
1469    }
1470}
1471
1472impl DisplayText<PlanRenderingContext<'_, Plan>> for HierarchicalPlan {
1473    fn fmt_text(
1474        &self,
1475        f: &mut fmt::Formatter<'_>,
1476        ctx: &mut PlanRenderingContext<'_, Plan>,
1477    ) -> fmt::Result {
1478        if ctx.config.verbose_syntax {
1479            self.fmt_verbose_text(f, ctx)
1480        } else {
1481            self.fmt_default_text(f, ctx)
1482        }
1483    }
1484}
1485impl HierarchicalPlan {
1486    #[allow(clippy::needless_pass_by_ref_mut)]
1487    fn fmt_default_text(
1488        &self,
1489        f: &mut fmt::Formatter<'_>,
1490        ctx: &mut PlanRenderingContext<'_, Plan>,
1491    ) -> fmt::Result {
1492        let mode = HumanizedExplain::new(ctx.config.redacted);
1493        let aggr_funcs = mode.seq(self.aggr_funcs(), None);
1494        let aggr_funcs = separated(", ", aggr_funcs);
1495        writeln!(f, "{}Aggregations: {aggr_funcs}", ctx.indent)
1496    }
1497
1498    #[allow(clippy::needless_pass_by_ref_mut)]
1499    fn fmt_verbose_text(
1500        &self,
1501        f: &mut fmt::Formatter<'_>,
1502        ctx: &mut PlanRenderingContext<'_, Plan>,
1503    ) -> fmt::Result {
1504        let mode = HumanizedExplain::new(ctx.config.redacted);
1505        match self {
1506            HierarchicalPlan::Monotonic(plan) => {
1507                let aggr_funcs = mode.seq(&plan.aggr_funcs, None);
1508                let aggr_funcs = separated(", ", aggr_funcs);
1509                writeln!(f, "{}aggr_funcs=[{}]", ctx.indent, aggr_funcs)?;
1510                let skips = separated(", ", &plan.skips);
1511                writeln!(f, "{}skips=[{}]", ctx.indent, skips)?;
1512                writeln!(f, "{}monotonic", ctx.indent)?;
1513                if plan.must_consolidate {
1514                    writeln!(f, "{}must_consolidate", ctx.indent)?;
1515                }
1516            }
1517            HierarchicalPlan::Bucketed(plan) => {
1518                let aggr_funcs = mode.seq(&plan.aggr_funcs, None);
1519                let aggr_funcs = separated(", ", aggr_funcs);
1520                writeln!(f, "{}aggr_funcs=[{}]", ctx.indent, aggr_funcs)?;
1521                let skips = separated(", ", &plan.skips);
1522                writeln!(f, "{}skips=[{}]", ctx.indent, skips)?;
1523                let buckets = separated(", ", &plan.buckets);
1524                writeln!(f, "{}buckets=[{}]", ctx.indent, buckets)?;
1525            }
1526        }
1527        Ok(())
1528    }
1529}
1530
1531impl DisplayText<PlanRenderingContext<'_, Plan>> for BasicPlan {
1532    fn fmt_text(
1533        &self,
1534        f: &mut fmt::Formatter<'_>,
1535        ctx: &mut PlanRenderingContext<'_, Plan>,
1536    ) -> fmt::Result {
1537        if ctx.config.verbose_syntax {
1538            self.fmt_verbose_text(f, ctx)
1539        } else {
1540            self.fmt_default_text(f, ctx)
1541        }
1542    }
1543}
1544impl BasicPlan {
1545    #[allow(clippy::needless_pass_by_ref_mut)]
1546    fn fmt_default_text(
1547        &self,
1548        f: &mut fmt::Formatter<'_>,
1549        ctx: &mut PlanRenderingContext<'_, Plan>,
1550    ) -> fmt::Result {
1551        let mode = HumanizedExplain::new(ctx.config.redacted);
1552        match self {
1553            BasicPlan::Single(SingleBasicPlan {
1554                index: _,
1555                expr,
1556                fused_unnest_list: _,
1557            }) => {
1558                let agg = mode.expr(expr, None);
1559                writeln!(f, "{}Aggregation: {agg}", ctx.indent)?;
1560            }
1561            BasicPlan::Multiple(aggs) => {
1562                let mode = HumanizedExplain::new(ctx.config.redacted);
1563                write!(f, "{}Aggregations:", ctx.indent)?;
1564
1565                for (_, agg) in aggs.iter() {
1566                    let agg = mode.expr(agg, None);
1567                    write!(f, " {agg}")?;
1568                }
1569                writeln!(f)?;
1570            }
1571        }
1572        Ok(())
1573    }
1574
1575    #[allow(clippy::needless_pass_by_ref_mut)]
1576    fn fmt_verbose_text(
1577        &self,
1578        f: &mut fmt::Formatter<'_>,
1579        ctx: &mut PlanRenderingContext<'_, Plan>,
1580    ) -> fmt::Result {
1581        let mode = HumanizedExplain::new(ctx.config.redacted);
1582        match self {
1583            BasicPlan::Single(SingleBasicPlan {
1584                index,
1585                expr,
1586                fused_unnest_list,
1587            }) => {
1588                let agg = mode.expr(expr, None);
1589                let fused_unnest_list = if *fused_unnest_list {
1590                    ", fused_unnest_list=true"
1591                } else {
1592                    ""
1593                };
1594                writeln!(
1595                    f,
1596                    "{}aggr=({}, {}{})",
1597                    ctx.indent, index, agg, fused_unnest_list
1598                )?;
1599            }
1600            BasicPlan::Multiple(aggs) => {
1601                for (i, (i_datum, agg)) in aggs.iter().enumerate() {
1602                    let agg = mode.expr(agg, None);
1603                    writeln!(f, "{}aggrs[{}]=({}, {})", ctx.indent, i, i_datum, agg)?;
1604                }
1605            }
1606        }
1607        Ok(())
1608    }
1609}
1610
1611impl DisplayText<PlanRenderingContext<'_, Plan>> for CollationPlan {
1612    fn fmt_text(
1613        &self,
1614        f: &mut fmt::Formatter<'_>,
1615        ctx: &mut PlanRenderingContext<'_, Plan>,
1616    ) -> fmt::Result {
1617        if ctx.config.verbose_syntax {
1618            self.fmt_verbose_text(f, ctx)
1619        } else {
1620            self.fmt_default_text(f, ctx)
1621        }
1622    }
1623}
1624
1625impl CollationPlan {
1626    #[allow(clippy::needless_pass_by_ref_mut)]
1627    fn fmt_default_text(
1628        &self,
1629        f: &mut fmt::Formatter<'_>,
1630        ctx: &mut PlanRenderingContext<'_, Plan>,
1631    ) -> fmt::Result {
1632        if let Some(plan) = &self.accumulable {
1633            writeln!(f, "{}Accumulable sub-aggregation", ctx.indent)?;
1634            ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1635        }
1636        if let Some(plan) = &self.hierarchical {
1637            writeln!(f, "{}Hierarchical sub-aggregation", ctx.indent)?;
1638            ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1639        }
1640        if let Some(plan) = &self.basic {
1641            writeln!(f, "{}Non-incremental sub-aggregation", ctx.indent)?;
1642            ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1643        }
1644        Ok(())
1645    }
1646
1647    fn fmt_verbose_text(
1648        &self,
1649        f: &mut fmt::Formatter<'_>,
1650        ctx: &mut PlanRenderingContext<'_, Plan>,
1651    ) -> fmt::Result {
1652        {
1653            use crate::plan::reduce::ReductionType;
1654            let aggregate_types = &self
1655                .aggregate_types
1656                .iter()
1657                .map(|reduction_type| match reduction_type {
1658                    ReductionType::Accumulable => "a".to_string(),
1659                    ReductionType::Hierarchical => "h".to_string(),
1660                    ReductionType::Basic => "b".to_string(),
1661                })
1662                .collect::<Vec<_>>();
1663            let aggregate_types = separated(", ", aggregate_types);
1664            writeln!(f, "{}aggregate_types=[{}]", ctx.indent, aggregate_types)?;
1665        }
1666        if let Some(plan) = &self.accumulable {
1667            writeln!(f, "{}accumulable", ctx.indent)?;
1668            ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1669        }
1670        if let Some(plan) = &self.hierarchical {
1671            writeln!(f, "{}hierarchical", ctx.indent)?;
1672            ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1673        }
1674        if let Some(plan) = &self.basic {
1675            writeln!(f, "{}basic", ctx.indent)?;
1676            ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1677        }
1678        Ok(())
1679    }
1680}
1681
1682/// Helper struct for rendering an arrangement.
1683struct Arrangement<'a> {
1684    key: &'a Vec<MirScalarExpr>,
1685    permutation: Permutation<'a>,
1686    thinning: &'a Vec<usize>,
1687}
1688
1689impl<'a> From<&'a (Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)> for Arrangement<'a> {
1690    fn from(
1691        (key, permutation, thinning): &'a (Vec<MirScalarExpr>, Vec<usize>, Vec<usize>),
1692    ) -> Self {
1693        Arrangement {
1694            key,
1695            permutation: Permutation(permutation),
1696            thinning,
1697        }
1698    }
1699}
1700
1701impl<'a> DisplayText<PlanRenderingContext<'_, Plan>> for Arrangement<'a> {
1702    fn fmt_text(
1703        &self,
1704        f: &mut fmt::Formatter<'_>,
1705        ctx: &mut PlanRenderingContext<'_, Plan>,
1706    ) -> fmt::Result {
1707        if ctx.config.verbose_syntax {
1708            self.fmt_verbose_text(f, ctx)
1709        } else {
1710            self.fmt_default_text(f, ctx)
1711        }
1712    }
1713}
1714
1715impl<'a> Arrangement<'a> {
1716    #[allow(clippy::needless_pass_by_ref_mut)]
1717    fn fmt_default_text(
1718        &self,
1719        f: &mut fmt::Formatter<'_>,
1720        ctx: &PlanRenderingContext<'_, Plan>,
1721    ) -> fmt::Result {
1722        let mode = HumanizedExplain::new(ctx.config.redacted);
1723        if !self.key.is_empty() {
1724            let key = mode.seq(self.key, None);
1725            let key = CompactScalars(key);
1726            write!(f, "{key}")
1727        } else {
1728            write!(f, "(empty key)")
1729        }
1730    }
1731
1732    #[allow(clippy::needless_pass_by_ref_mut)]
1733    fn fmt_verbose_text(
1734        &self,
1735        f: &mut fmt::Formatter<'_>,
1736        ctx: &PlanRenderingContext<'_, Plan>,
1737    ) -> fmt::Result {
1738        let mode = HumanizedExplain::new(ctx.config.redacted);
1739        // prepare key
1740        let key = mode.seq(self.key, None);
1741        let key = CompactScalars(key);
1742        // prepare perumation map
1743        let permutation = &self.permutation;
1744        // prepare thinning
1745        let thinning = Indices(self.thinning);
1746        // write the arrangement spec
1747        write!(
1748            f,
1749            "{{ key=[{}], permutation={}, thinning=({}) }}",
1750            key, permutation, thinning
1751        )
1752    }
1753}
1754
1755/// Helper struct for rendering a permutation.
1756struct Permutation<'a>(&'a Vec<usize>);
1757
1758impl<'a> fmt::Display for Permutation<'a> {
1759    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1760        let mut pairs = vec![];
1761        for (x, y) in self.0.iter().enumerate().filter(|(x, y)| x != *y) {
1762            pairs.push(format!("#{}: #{}", x, y));
1763        }
1764
1765        if pairs.len() > 0 {
1766            write!(f, "{{{}}}", separated(", ", pairs))
1767        } else {
1768            write!(f, "id")
1769        }
1770    }
1771}
1772
1773/// Annotations for physical plans.
1774struct PlanAnnotations {
1775    config: ExplainConfig,
1776    node_id: LirId,
1777}
1778
1779// The current implementation deviates from the `AnnotatedPlan` used in `Mir~`-based plans. This is
1780// fine, since at the moment the only attribute we are going to explain is the `node_id`, which at
1781// the moment is kept inline with the `Plan` variants. If at some point in the future we want to
1782// start deriving and printing attributes that are derived ad-hoc, however, we might want to adopt
1783// `AnnotatedPlan` here as well.
1784impl PlanAnnotations {
1785    fn new(config: ExplainConfig, plan: &Plan) -> Self {
1786        let node_id = plan.lir_id;
1787        Self { config, node_id }
1788    }
1789}
1790
1791impl fmt::Display for PlanAnnotations {
1792    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1793        if self.config.node_ids {
1794            f.debug_struct(" //")
1795                .field("node_id", &self.node_id)
1796                .finish()
1797        } else {
1798            // No physical plan annotations enabled.
1799            Ok(())
1800        }
1801    }
1802}