Skip to main content

mz_compute_types/explain/
text.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! `EXPLAIN ... AS TEXT` support for LIR structures.
11//!
12//! The format adheres to the following conventions:
13//! 1. In general, every line that starts with an uppercase character
14//!    corresponds to a [`Plan`] variant.
15//! 2. Whenever the variant has an attached `~Plan`, the printed name is
16//!    `$V::$P` where `$V` identifies the variant and `$P` the plan.
17//! 3. The fields of a `~Plan` struct attached to a [`Plan`] are rendered as if
18//!    they were part of the variant themself.
19//! 4. Non-recursive parameters of each sub-plan are written as `$key=$val`
20//!    pairs on the same line or as lowercase `$key` fields on indented lines.
21//! 5. A single non-recursive parameter can be written just as `$val`.
22
23use std::fmt;
24use std::ops::Deref;
25
26use itertools::Itertools;
27use mz_expr::explain::{HumanizedExplain, HumanizerMode, fmt_text_constant_rows};
28use mz_expr::{Id, MirScalarExpr};
29use mz_ore::soft_assert_or_log;
30use mz_ore::str::{IndentLike, StrExt, separated};
31use mz_repr::explain::text::DisplayText;
32use mz_repr::explain::{
33    CompactScalarSeq, CompactScalars, ExplainConfig, Indices, PlanRenderingContext,
34};
35
36use crate::plan::join::delta_join::{DeltaPathPlan, DeltaStagePlan};
37use crate::plan::join::linear_join::LinearStagePlan;
38use crate::plan::join::{DeltaJoinPlan, JoinClosure, LinearJoinPlan};
39use crate::plan::reduce::{
40    AccumulablePlan, BasicPlan, BucketedPlan, HierarchicalPlan, MonotonicPlan, SingleBasicPlan,
41};
42use crate::plan::threshold::ThresholdPlan;
43use crate::plan::{AvailableCollections, LirId, Plan, PlanNode};
44
45impl DisplayText<PlanRenderingContext<'_, Plan>> for Plan {
46    fn fmt_text(
47        &self,
48        f: &mut fmt::Formatter<'_>,
49        ctx: &mut PlanRenderingContext<'_, Plan>,
50    ) -> fmt::Result {
51        if ctx.config.verbose_syntax {
52            self.fmt_verbose_text(f, ctx)
53        } else {
54            self.fmt_default_text(f, ctx)
55        }
56    }
57}
58
59impl Plan {
60    // NOTE: This code needs to be kept in sync with the `Display` instance for
61    // `RenderPlan:ExprHumanizer`.
62    //
63    // This code determines what you see in `EXPLAIN`; that other code
64    // determine what you see when you run `mz_lir_mapping`.
65    fn fmt_default_text(
66        &self,
67        f: &mut fmt::Formatter<'_>,
68        ctx: &mut PlanRenderingContext<'_, Plan>,
69    ) -> fmt::Result {
70        use PlanNode::*;
71
72        let mode = HumanizedExplain::new(ctx.config.redacted);
73        let annotations = PlanAnnotations::new(ctx.config.clone(), self);
74
75        match &self.node {
76            Constant { rows } => {
77                write!(f, "{}→Constant ", ctx.indent)?;
78
79                match rows {
80                    Ok(rows) => write!(
81                        f,
82                        "({} row{})",
83                        rows.len(),
84                        if rows.len() == 1 { "" } else { "s" }
85                    )?,
86                    Err(err) => {
87                        if mode.redacted() {
88                            write!(f, "(error: █)")?;
89                        } else {
90                            write!(f, "(error: {})", err.to_string().quoted(),)?;
91                        }
92                    }
93                }
94
95                writeln!(f, "{annotations}")?;
96            }
97            Get { id, keys, plan } => {
98                ctx.indent.set(); // mark the current indent level
99
100                // Resolve the id as a string.
101                let id = match id {
102                    Id::Local(id) => id.to_string(),
103                    Id::Global(id) => ctx
104                        .humanizer
105                        .humanize_id(*id)
106                        .unwrap_or_else(|| id.to_string()),
107                };
108                // Render plan-specific fields.
109                use crate::plan::GetPlan;
110                match plan {
111                    GetPlan::PassArrangements => {
112                        if keys.raw && keys.arranged.is_empty() {
113                            writeln!(f, "{}→Stream {id}{annotations}", ctx.indent)?;
114                        } else {
115                            // we're not reporting on whether or not `raw` is set
116                            // we're not reporting on how many arrangements there are
117                            writeln!(f, "{}→Arranged {id}{annotations}", ctx.indent)?;
118                        }
119                    }
120                    GetPlan::Arrangement(key, Some(val), mfp) => {
121                        if !mfp.is_identity() {
122                            writeln!(f, "{}→Fused with Child Map/Filter/Project", ctx.indent)?;
123                            ctx.indent += 1;
124                            mode.expr(mfp, None).fmt_default_text(f, ctx)?;
125                            ctx.indent += 1;
126                        }
127
128                        writeln!(f, "{}→Index Lookup on {id}{annotations}", ctx.indent)?;
129                        ctx.indent += 1;
130                        let key = CompactScalars(mode.seq(key, None));
131                        write!(f, "{}Key: ({key}) ", ctx.indent)?;
132                        let val = mode.expr(val, None);
133                        writeln!(f, "Value: {val}")?;
134                    }
135                    GetPlan::Arrangement(key, None, mfp) => {
136                        if !mfp.is_identity() {
137                            writeln!(f, "{}→Fused with Child Map/Filter/Project", ctx.indent)?;
138                            ctx.indent += 1;
139                            mode.expr(mfp, None).fmt_default_text(f, ctx)?;
140                            ctx.indent += 1;
141                        }
142
143                        writeln!(f, "{}→Arranged {id}{annotations}", ctx.indent)?;
144                        ctx.indent += 1;
145                        let key = CompactScalars(mode.seq(key, None));
146                        writeln!(f, "{}Key: ({key})", ctx.indent)?;
147                    }
148                    GetPlan::Collection(mfp) => {
149                        if !mfp.is_identity() {
150                            writeln!(f, "{}→Fused with Child Map/Filter/Project", ctx.indent)?;
151                            ctx.indent += 1;
152                            mode.expr(mfp, None).fmt_default_text(f, ctx)?;
153                            ctx.indent += 1;
154                        }
155
156                        writeln!(f, "{}→Read {id}{annotations}", ctx.indent)?;
157                    }
158                }
159                ctx.indent.reset(); // reset the original indent level
160            }
161            Let { id, value, body } => {
162                let mut bindings = vec![(id, value.as_ref())];
163                let mut head = body.as_ref();
164
165                // Render Let-blocks nested in the body an outer Let-block in one step
166                // with a flattened list of bindings
167                while let Let { id, value, body } = &head.node {
168                    bindings.push((id, value.as_ref()));
169                    head = body.as_ref();
170                }
171
172                writeln!(f, "{}→With", ctx.indent)?;
173                ctx.indented(|ctx| {
174                    for (id, value) in bindings.iter() {
175                        writeln!(f, "{}cte {} =", ctx.indent, *id)?;
176                        ctx.indented(|ctx| value.fmt_text(f, ctx))?;
177                    }
178                    Ok(())
179                })?;
180                writeln!(f, "{}→Return{annotations}", ctx.indent)?;
181                ctx.indented(|ctx| head.fmt_text(f, ctx))?;
182            }
183            LetRec {
184                ids,
185                values,
186                limits,
187                body,
188            } => {
189                let head = body.as_ref();
190
191                writeln!(f, "{}→With Mutually Recursive", ctx.indent)?;
192                ctx.indented(|ctx| {
193                    let bindings = ids.iter().zip_eq(values).zip_eq(limits);
194                    for ((id, value), limit) in bindings {
195                        if let Some(limit) = limit {
196                            writeln!(f, "{}cte {} {} =", ctx.indent, limit, *id)?;
197                        } else {
198                            writeln!(f, "{}cte {} =", ctx.indent, *id)?;
199                        }
200                        ctx.indented(|ctx| value.fmt_text(f, ctx))?;
201                    }
202                    Ok(())
203                })?;
204                writeln!(f, "{}→Return{annotations}", ctx.indent)?;
205                ctx.indented(|ctx| head.fmt_text(f, ctx))?;
206            }
207            Mfp {
208                input,
209                mfp,
210                input_key_val: _,
211            } => {
212                writeln!(f, "{}→Map/Filter/Project{annotations}", ctx.indent)?;
213                ctx.indent.set();
214
215                ctx.indent += 1;
216                mode.expr(mfp, None).fmt_default_text(f, ctx)?;
217
218                // one more nesting level if we showed anything for the MFP
219                if !mfp.is_identity() {
220                    ctx.indent += 1;
221                }
222                input.fmt_text(f, ctx)?;
223                ctx.indent.reset();
224            }
225            FlatMap {
226                input_key: _,
227                input,
228                exprs,
229                func,
230                mfp_after,
231            } => {
232                ctx.indent.set();
233                if !mfp_after.expressions.is_empty() || !mfp_after.predicates.is_empty() {
234                    writeln!(f, "{}→Fused with Child Map/Filter/Project", ctx.indent)?;
235                    ctx.indent += 1;
236                    mode.expr(mfp_after, None).fmt_default_text(f, ctx)?;
237                    ctx.indent += 1;
238                }
239
240                let exprs = mode.seq(exprs, None);
241                let exprs = CompactScalars(exprs);
242                writeln!(
243                    f,
244                    "{}→Table Function {func}({exprs}){annotations}",
245                    ctx.indent
246                )?;
247                ctx.indent += 1;
248
249                input.fmt_text(f, ctx)?;
250
251                ctx.indent.reset();
252            }
253            Join { inputs, plan } => {
254                use crate::plan::join::JoinPlan;
255                match plan {
256                    JoinPlan::Linear(plan) => {
257                        write!(f, "{}→Differential Join", ctx.indent)?;
258                        write!(f, " %{}", plan.source_relation)?;
259                        for dsp in &plan.stage_plans {
260                            write!(f, " » %{}", dsp.lookup_relation)?;
261                        }
262                        writeln!(f, "{annotations}")?;
263                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
264                    }
265                    JoinPlan::Delta(plan) => {
266                        write!(f, "{}→Delta Join", ctx.indent)?;
267                        for dpp in &plan.path_plans {
268                            write!(f, " [%{}", dpp.source_relation)?;
269
270                            for dsp in &dpp.stage_plans {
271                                write!(f, " » %{}", dsp.lookup_relation)?;
272                            }
273                            write!(f, "]")?;
274                        }
275                        writeln!(f, "{annotations}")?;
276                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
277                    }
278                }
279
280                ctx.indented(|ctx| {
281                    for input in inputs {
282                        input.fmt_text(f, ctx)?;
283                    }
284                    Ok(())
285                })?;
286            }
287            Reduce {
288                input_key: _,
289                input,
290                key_val_plan,
291                plan,
292                mfp_after,
293            } => {
294                ctx.indent.set();
295                if !mfp_after.expressions.is_empty() || !mfp_after.predicates.is_empty() {
296                    writeln!(f, "{}→Fused with Child Map/Filter/Project", ctx.indent)?;
297                    ctx.indent += 1;
298                    mode.expr(mfp_after, None).fmt_default_text(f, ctx)?;
299                    ctx.indent += 1;
300                }
301
302                use crate::plan::reduce::ReducePlan;
303                match plan {
304                    ReducePlan::Distinct => {
305                        writeln!(f, "{}→Distinct GroupAggregate{annotations}", ctx.indent)?;
306                    }
307                    ReducePlan::Accumulable(plan) => {
308                        writeln!(f, "{}→Accumulable GroupAggregate{annotations}", ctx.indent)?;
309                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
310                    }
311                    ReducePlan::Hierarchical(
312                        plan @ HierarchicalPlan::Bucketed(BucketedPlan { buckets, .. }),
313                    ) => {
314                        write!(
315                            f,
316                            "{}→Bucketed Hierarchical GroupAggregate (buckets: ",
317                            ctx.indent
318                        )?;
319                        for bucket in buckets {
320                            write!(f, " {bucket}")?;
321                        }
322                        writeln!(f, "){annotations}")?;
323                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
324                    }
325                    ReducePlan::Hierarchical(
326                        plan @ HierarchicalPlan::Monotonic(MonotonicPlan {
327                            must_consolidate, ..
328                        }),
329                    ) => {
330                        write!(f, "{}→", ctx.indent)?;
331                        if *must_consolidate {
332                            write!(f, "Consolidating ")?;
333                        }
334                        writeln!(f, "Monotonic GroupAggregate{annotations}",)?;
335                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
336                    }
337                    ReducePlan::Basic(plan) => {
338                        ctx.indent.set();
339                        if let BasicPlan::Single(SingleBasicPlan {
340                            fused_unnest_list, ..
341                        }) = &plan
342                        {
343                            if *fused_unnest_list {
344                                writeln!(
345                                    f,
346                                    "{}→Fused with Child Table Function unnest_list",
347                                    ctx.indent
348                                )?;
349                                ctx.indent += 1;
350                            }
351                        }
352                        writeln!(
353                            f,
354                            "{}→Non-incremental GroupAggregate{annotations}",
355                            ctx.indent
356                        )?;
357                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
358                        ctx.indent.reset();
359                    }
360                }
361
362                ctx.indented(|ctx| {
363                    let kvp = key_val_plan.key_plan.deref();
364                    if !kvp.is_identity() {
365                        writeln!(f, "{}Key:", ctx.indent)?;
366                        ctx.indented(|ctx| {
367                            let key_plan = mode.expr(kvp, None);
368                            key_plan.fmt_default_text(f, ctx)
369                        })?;
370                    }
371
372                    input.fmt_text(f, ctx)
373                })?;
374
375                ctx.indent.reset();
376            }
377            TopK { input, top_k_plan } => {
378                use crate::plan::top_k::TopKPlan;
379                match top_k_plan {
380                    TopKPlan::MonotonicTop1(plan) => {
381                        write!(f, "{}→", ctx.indent)?;
382                        if plan.must_consolidate {
383                            write!(f, "Consolidating ")?;
384                        }
385                        writeln!(f, "Monotonic Top1{annotations}")?;
386
387                        ctx.indented(|ctx| {
388                            if plan.group_key.len() > 0 {
389                                let group_by = CompactScalars(mode.seq(&plan.group_key, None));
390                                writeln!(f, "{}Group By{group_by}", ctx.indent)?;
391                            }
392                            if plan.order_key.len() > 0 {
393                                let order_by = separated(", ", mode.seq(&plan.order_key, None));
394                                writeln!(f, "{}Order By {order_by}", ctx.indent)?;
395                            }
396                            Ok(())
397                        })?;
398                    }
399                    TopKPlan::MonotonicTopK(plan) => {
400                        write!(f, "{}→", ctx.indent)?;
401                        if plan.must_consolidate {
402                            write!(f, "Consolidating ")?;
403                        }
404                        writeln!(f, "Monotonic TopK{annotations}")?;
405
406                        ctx.indented(|ctx| {
407                            if plan.group_key.len() > 0 {
408                                let group_by = CompactScalars(mode.seq(&plan.group_key, None));
409                                writeln!(f, "{}Group By{group_by}", ctx.indent)?;
410                            }
411                            if plan.order_key.len() > 0 {
412                                let order_by = separated(", ", mode.seq(&plan.order_key, None));
413                                writeln!(f, "{}Order By {order_by}", ctx.indent)?;
414                            }
415                            if let Some(limit) = &plan.limit {
416                                let limit = mode.expr(limit, None);
417                                writeln!(f, "{}Limit {limit}", ctx.indent)?;
418                            }
419                            Ok(())
420                        })?;
421                    }
422                    TopKPlan::Basic(plan) => {
423                        writeln!(f, "{}→Non-monotonic TopK{annotations}", ctx.indent)?;
424
425                        ctx.indented(|ctx| {
426                            if plan.group_key.len() > 0 {
427                                let group_by = CompactScalars(mode.seq(&plan.group_key, None));
428                                writeln!(f, "{}Group By{group_by}", ctx.indent)?;
429                            }
430                            if plan.order_key.len() > 0 {
431                                let order_by = separated(", ", mode.seq(&plan.order_key, None));
432                                writeln!(f, "{}Order By {order_by}", ctx.indent)?;
433                            }
434                            if let Some(limit) = &plan.limit {
435                                let limit = mode.expr(limit, None);
436                                writeln!(f, "{}Limit {limit}", ctx.indent)?;
437                            }
438                            if plan.offset != 0 {
439                                let offset = plan.offset;
440                                writeln!(f, "{}Offset {offset}", ctx.indent)?;
441                            }
442                            Ok(())
443                        })?;
444                    }
445                }
446
447                ctx.indented(|ctx| input.fmt_text(f, ctx))?;
448            }
449            Negate { input } => {
450                writeln!(f, "{}→Negate Diffs{annotations}", ctx.indent)?;
451
452                ctx.indented(|ctx| input.fmt_text(f, ctx))?;
453            }
454            Threshold {
455                input,
456                threshold_plan,
457            } => {
458                match threshold_plan {
459                    ThresholdPlan::Basic(plan) => {
460                        write!(f, "{}→Threshold Diffs ", ctx.indent)?;
461                        let ensure_arrangement = Arrangement::from(&plan.ensure_arrangement);
462                        ensure_arrangement.fmt_text(f, ctx)?;
463                        writeln!(f, "{annotations}")?;
464                    }
465                };
466
467                ctx.indented(|ctx| input.fmt_text(f, ctx))?;
468            }
469            Union {
470                inputs,
471                consolidate_output,
472            } => {
473                write!(f, "{}→", ctx.indent)?;
474                if *consolidate_output {
475                    write!(f, "Consolidating ")?;
476                }
477                writeln!(f, "Union{annotations}")?;
478
479                ctx.indented(|ctx| {
480                    for input in inputs.iter() {
481                        input.fmt_text(f, ctx)?;
482                    }
483                    Ok(())
484                })?;
485            }
486            ArrangeBy {
487                input_key: _,
488                input,
489                input_mfp,
490                forms,
491            } => {
492                ctx.indent.set();
493                if forms.raw && forms.arranged.is_empty() {
494                    soft_assert_or_log!(forms.raw, "raw stream with no arrangements");
495                    writeln!(f, "{}→Unarranged Raw Stream{annotations}", ctx.indent)?;
496                } else {
497                    write!(f, "{}→Arrange", ctx.indent)?;
498
499                    if !forms.arranged.is_empty() {
500                        let mode = HumanizedExplain::new(ctx.config.redacted);
501                        for (key, _, _) in &forms.arranged {
502                            if !key.is_empty() {
503                                let key = mode.seq(key, None);
504                                let key = CompactScalars(key);
505                                write!(f, " ({key})")?;
506                            } else {
507                                write!(f, " (empty key)")?;
508                            }
509                        }
510                    }
511                    writeln!(f, "{annotations}")?;
512                }
513
514                if !input_mfp.is_identity() {
515                    ctx.indent += 1;
516                    writeln!(f, "{}→Fused with Parent Map/Filter/Project", ctx.indent)?;
517                    ctx.indented(|ctx| mode.expr(input_mfp, None).fmt_default_text(f, ctx))?;
518                }
519
520                ctx.indent += 1;
521                input.fmt_text(f, ctx)?;
522                ctx.indent.reset();
523            }
524        }
525
526        Ok(())
527    }
528
529    fn fmt_verbose_text(
530        &self,
531        f: &mut fmt::Formatter<'_>,
532        ctx: &mut PlanRenderingContext<'_, Plan>,
533    ) -> fmt::Result {
534        use PlanNode::*;
535
536        let mode = HumanizedExplain::new(ctx.config.redacted);
537        let annotations = PlanAnnotations::new(ctx.config.clone(), self);
538
539        match &self.node {
540            Constant { rows } => match rows {
541                Ok(rows) => {
542                    if !rows.is_empty() {
543                        writeln!(f, "{}Constant{}", ctx.indent, annotations)?;
544                        ctx.indented(|ctx| {
545                            fmt_text_constant_rows(
546                                f,
547                                rows.iter().map(|(data, _, diff)| (data, diff)),
548                                &mut ctx.indent,
549                                ctx.config.redacted,
550                            )
551                        })?;
552                    } else {
553                        writeln!(f, "{}Constant <empty>{}", ctx.indent, annotations)?;
554                    }
555                }
556                Err(err) => {
557                    if mode.redacted() {
558                        writeln!(f, "{}Error █{}", ctx.indent, annotations)?;
559                    } else {
560                        {
561                            writeln!(
562                                f,
563                                "{}Error {}{}",
564                                ctx.indent,
565                                err.to_string().quoted(),
566                                annotations
567                            )?;
568                        }
569                    }
570                }
571            },
572
573            Get { id, keys, plan } => {
574                ctx.indent.set(); // mark the current indent level
575
576                // Resolve the id as a string.
577                let id = match id {
578                    Id::Local(id) => id.to_string(),
579                    Id::Global(id) => ctx
580                        .humanizer
581                        .humanize_id(*id)
582                        .unwrap_or_else(|| id.to_string()),
583                };
584                // Render plan-specific fields.
585                use crate::plan::GetPlan;
586                match plan {
587                    GetPlan::PassArrangements => {
588                        writeln!(
589                            f,
590                            "{}Get::PassArrangements {}{}",
591                            ctx.indent, id, annotations
592                        )?;
593                        ctx.indent += 1;
594                    }
595                    GetPlan::Arrangement(key, val, mfp) => {
596                        writeln!(f, "{}Get::Arrangement {}{}", ctx.indent, id, annotations)?;
597                        ctx.indent += 1;
598                        mode.expr(mfp, None).fmt_text(f, ctx)?;
599                        {
600                            let key = mode.seq(key, None);
601                            let key = CompactScalars(key);
602                            writeln!(f, "{}key={}", ctx.indent, key)?;
603                        }
604                        if let Some(val) = val {
605                            let val = mode.expr(val, None);
606                            writeln!(f, "{}val={}", ctx.indent, val)?;
607                        }
608                    }
609                    GetPlan::Collection(mfp) => {
610                        writeln!(f, "{}Get::Collection {}{}", ctx.indent, id, annotations)?;
611                        ctx.indent += 1;
612                        mode.expr(mfp, None).fmt_text(f, ctx)?;
613                    }
614                }
615
616                // Render plan-agnostic fields (common for all plans for this variant).
617                keys.fmt_text(f, ctx)?;
618
619                ctx.indent.reset(); // reset the original indent level
620            }
621            Let { id, value, body } => {
622                let mut bindings = vec![(id, value.as_ref())];
623                let mut head = body.as_ref();
624
625                // Render Let-blocks nested in the body an outer Let-block in one step
626                // with a flattened list of bindings
627                while let Let { id, value, body } = &head.node {
628                    bindings.push((id, value.as_ref()));
629                    head = body.as_ref();
630                }
631
632                writeln!(f, "{}With", ctx.indent)?;
633                ctx.indented(|ctx| {
634                    for (id, value) in bindings.iter() {
635                        writeln!(f, "{}cte {} =", ctx.indent, *id)?;
636                        ctx.indented(|ctx| value.fmt_text(f, ctx))?;
637                    }
638                    Ok(())
639                })?;
640                writeln!(f, "{}Return{}", ctx.indent, annotations)?;
641                ctx.indented(|ctx| head.fmt_text(f, ctx))?;
642            }
643            LetRec {
644                ids,
645                values,
646                limits,
647                body,
648            } => {
649                let head = body.as_ref();
650
651                writeln!(f, "{}With Mutually Recursive", ctx.indent)?;
652                ctx.indented(|ctx| {
653                    let bindings = ids.iter().zip_eq(values).zip_eq(limits);
654                    for ((id, value), limit) in bindings {
655                        if let Some(limit) = limit {
656                            writeln!(f, "{}cte {} {} =", ctx.indent, limit, *id)?;
657                        } else {
658                            writeln!(f, "{}cte {} =", ctx.indent, *id)?;
659                        }
660                        ctx.indented(|ctx| value.fmt_text(f, ctx))?;
661                    }
662                    Ok(())
663                })?;
664                writeln!(f, "{}Return{}", ctx.indent, annotations)?;
665                ctx.indented(|ctx| head.fmt_text(f, ctx))?;
666            }
667            Mfp {
668                input,
669                mfp,
670                input_key_val,
671            } => {
672                writeln!(f, "{}Mfp{}", ctx.indent, annotations)?;
673                ctx.indented(|ctx| {
674                    mode.expr(mfp, None).fmt_text(f, ctx)?;
675                    if let Some((key, val)) = input_key_val {
676                        {
677                            let key = mode.seq(key, None);
678                            let key = CompactScalars(key);
679                            writeln!(f, "{}input_key={}", ctx.indent, key)?;
680                        }
681                        if let Some(val) = val {
682                            let val = mode.expr(val, None);
683                            writeln!(f, "{}input_val={}", ctx.indent, val)?;
684                        }
685                    }
686                    input.fmt_text(f, ctx)
687                })?;
688            }
689            FlatMap {
690                input_key,
691                input,
692                exprs,
693                func,
694                mfp_after,
695            } => {
696                let exprs = mode.seq(exprs, None);
697                let exprs = CompactScalars(exprs);
698                writeln!(
699                    f,
700                    "{}FlatMap {}({}){}",
701                    ctx.indent, func, exprs, annotations
702                )?;
703                ctx.indented(|ctx| {
704                    if let Some(key) = input_key {
705                        let key = mode.seq(key, None);
706                        let key = CompactScalars(key);
707                        writeln!(f, "{}input_key={}", ctx.indent, key)?;
708                    }
709                    if !mfp_after.is_identity() {
710                        writeln!(f, "{}mfp_after", ctx.indent)?;
711                        ctx.indented(|ctx| mode.expr(mfp_after, None).fmt_text(f, ctx))?;
712                    }
713                    input.fmt_text(f, ctx)
714                })?;
715            }
716            Join { inputs, plan } => {
717                use crate::plan::join::JoinPlan;
718                match plan {
719                    JoinPlan::Linear(plan) => {
720                        writeln!(f, "{}Join::Linear{}", ctx.indent, annotations)?;
721                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
722                    }
723                    JoinPlan::Delta(plan) => {
724                        writeln!(f, "{}Join::Delta{}", ctx.indent, annotations)?;
725                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
726                    }
727                }
728                ctx.indented(|ctx| {
729                    for input in inputs {
730                        input.fmt_text(f, ctx)?;
731                    }
732                    Ok(())
733                })?;
734            }
735            Reduce {
736                input_key,
737                input,
738                key_val_plan,
739                plan,
740                mfp_after,
741            } => {
742                use crate::plan::reduce::ReducePlan;
743                match plan {
744                    ReducePlan::Distinct => {
745                        writeln!(f, "{}Reduce::Distinct{}", ctx.indent, annotations)?;
746                    }
747                    ReducePlan::Accumulable(plan) => {
748                        writeln!(f, "{}Reduce::Accumulable{}", ctx.indent, annotations)?;
749                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
750                    }
751                    ReducePlan::Hierarchical(plan) => {
752                        writeln!(f, "{}Reduce::Hierarchical{}", ctx.indent, annotations)?;
753                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
754                    }
755                    ReducePlan::Basic(plan) => {
756                        writeln!(f, "{}Reduce::Basic{}", ctx.indent, annotations)?;
757                        ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
758                    }
759                }
760                ctx.indented(|ctx| {
761                    if let Some(key) = input_key {
762                        let key = mode.seq(key, None);
763                        let key = CompactScalars(key);
764                        writeln!(f, "{}input_key={}", ctx.indent, key)?;
765                    }
766                    if key_val_plan.key_plan.deref().is_identity() {
767                        writeln!(f, "{}key_plan=id", ctx.indent)?;
768                    } else {
769                        writeln!(f, "{}key_plan", ctx.indent)?;
770                        ctx.indented(|ctx| {
771                            let key_plan = mode.expr(key_val_plan.key_plan.deref(), None);
772                            key_plan.fmt_text(f, ctx)
773                        })?;
774                    }
775                    if key_val_plan.val_plan.deref().is_identity() {
776                        writeln!(f, "{}val_plan=id", ctx.indent)?;
777                    } else {
778                        writeln!(f, "{}val_plan", ctx.indent)?;
779                        ctx.indented(|ctx| {
780                            let val_plan = mode.expr(key_val_plan.val_plan.deref(), None);
781                            val_plan.fmt_text(f, ctx)
782                        })?;
783                    }
784                    if !mfp_after.is_identity() {
785                        writeln!(f, "{}mfp_after", ctx.indent)?;
786                        ctx.indented(|ctx| mode.expr(mfp_after, None).fmt_text(f, ctx))?;
787                    }
788
789                    input.fmt_text(f, ctx)
790                })?;
791            }
792            TopK { input, top_k_plan } => {
793                use crate::plan::top_k::TopKPlan;
794                match top_k_plan {
795                    TopKPlan::MonotonicTop1(plan) => {
796                        write!(f, "{}TopK::MonotonicTop1", ctx.indent)?;
797                        if plan.group_key.len() > 0 {
798                            let group_by = mode.seq(&plan.group_key, None);
799                            let group_by = CompactScalars(group_by);
800                            write!(f, " group_by=[{}]", group_by)?;
801                        }
802                        if plan.order_key.len() > 0 {
803                            let order_by = mode.seq(&plan.order_key, None);
804                            let order_by = separated(", ", order_by);
805                            write!(f, " order_by=[{}]", order_by)?;
806                        }
807                        if plan.must_consolidate {
808                            write!(f, " must_consolidate")?;
809                        }
810                    }
811                    TopKPlan::MonotonicTopK(plan) => {
812                        write!(f, "{}TopK::MonotonicTopK", ctx.indent)?;
813                        if plan.group_key.len() > 0 {
814                            let group_by = mode.seq(&plan.group_key, None);
815                            let group_by = CompactScalars(group_by);
816                            write!(f, " group_by=[{}]", group_by)?;
817                        }
818                        if plan.order_key.len() > 0 {
819                            let order_by = mode.seq(&plan.order_key, None);
820                            let order_by = separated(", ", order_by);
821                            write!(f, " order_by=[{}]", order_by)?;
822                        }
823                        if let Some(limit) = &plan.limit {
824                            let limit = mode.expr(limit, None);
825                            write!(f, " limit={}", limit)?;
826                        }
827                        if plan.must_consolidate {
828                            write!(f, " must_consolidate")?;
829                        }
830                    }
831                    TopKPlan::Basic(plan) => {
832                        write!(f, "{}TopK::Basic", ctx.indent)?;
833                        if plan.group_key.len() > 0 {
834                            let group_by = mode.seq(&plan.group_key, None);
835                            let group_by = CompactScalars(group_by);
836                            write!(f, " group_by=[{}]", group_by)?;
837                        }
838                        if plan.order_key.len() > 0 {
839                            let order_by = mode.seq(&plan.order_key, None);
840                            let order_by = separated(", ", order_by);
841                            write!(f, " order_by=[{}]", order_by)?;
842                        }
843                        if let Some(limit) = &plan.limit {
844                            let limit = mode.expr(limit, None);
845                            write!(f, " limit={}", limit)?;
846                        }
847                        if &plan.offset > &0 {
848                            write!(f, " offset={}", plan.offset)?;
849                        }
850                    }
851                }
852                writeln!(f, "{}", annotations)?;
853                ctx.indented(|ctx| input.fmt_text(f, ctx))?;
854            }
855            Negate { input } => {
856                writeln!(f, "{}Negate{}", ctx.indent, annotations)?;
857                ctx.indented(|ctx| input.fmt_text(f, ctx))?;
858            }
859            Threshold {
860                input,
861                threshold_plan,
862            } => {
863                use crate::plan::threshold::ThresholdPlan;
864                match threshold_plan {
865                    ThresholdPlan::Basic(plan) => {
866                        let ensure_arrangement = Arrangement::from(&plan.ensure_arrangement);
867                        write!(f, "{}Threshold::Basic", ctx.indent)?;
868                        write!(f, " ensure_arrangement=")?;
869                        ensure_arrangement.fmt_text(f, ctx)?;
870                        writeln!(f, "{}", annotations)?;
871                    }
872                };
873                ctx.indented(|ctx| input.fmt_text(f, ctx))?;
874            }
875            Union {
876                inputs,
877                consolidate_output,
878            } => {
879                if *consolidate_output {
880                    writeln!(
881                        f,
882                        "{}Union consolidate_output={}{}",
883                        ctx.indent, consolidate_output, annotations
884                    )?;
885                } else {
886                    writeln!(f, "{}Union{}", ctx.indent, annotations)?;
887                }
888                ctx.indented(|ctx| {
889                    for input in inputs.iter() {
890                        input.fmt_text(f, ctx)?;
891                    }
892                    Ok(())
893                })?;
894            }
895            ArrangeBy {
896                input_key,
897                input,
898                input_mfp,
899                forms,
900            } => {
901                writeln!(f, "{}ArrangeBy{}", ctx.indent, annotations)?;
902                ctx.indented(|ctx| {
903                    if let Some(key) = input_key {
904                        let key = mode.seq(key, None);
905                        let key = CompactScalars(key);
906                        writeln!(f, "{}input_key=[{}]", ctx.indent, key)?;
907                    }
908                    mode.expr(input_mfp, None).fmt_text(f, ctx)?;
909                    forms.fmt_text(f, ctx)?;
910                    // Render input
911                    input.fmt_text(f, ctx)
912                })?;
913            }
914        }
915
916        Ok(())
917    }
918}
919
920impl DisplayText<PlanRenderingContext<'_, Plan>> for AvailableCollections {
921    fn fmt_text(
922        &self,
923        f: &mut fmt::Formatter<'_>,
924        ctx: &mut PlanRenderingContext<'_, Plan>,
925    ) -> fmt::Result {
926        if ctx.config.verbose_syntax {
927            self.fmt_verbose_text(f, ctx)
928        } else {
929            self.fmt_default_text(f, ctx)
930        }
931    }
932}
933impl AvailableCollections {
934    fn fmt_default_text(
935        &self,
936        f: &mut fmt::Formatter<'_>,
937        ctx: &mut PlanRenderingContext<'_, Plan>,
938    ) -> fmt::Result {
939        let plural = if self.arranged.len() == 1 { "" } else { "s" };
940        write!(
941            f,
942            "{}Keys: {} arrangement{plural} available",
943            ctx.indent,
944            self.arranged.len()
945        )?;
946
947        if self.raw {
948            writeln!(f, ", plus raw stream")?;
949        } else {
950            writeln!(f, ", no raw stream")?;
951        }
952
953        ctx.indented(|ctx| {
954            for (i, arrangement) in self.arranged.iter().enumerate() {
955                let arrangement = Arrangement::from(arrangement);
956                write!(f, "{}Arrangement {i}: ", ctx.indent)?;
957                arrangement.fmt_text(f, ctx)?;
958                writeln!(f, "")?;
959            }
960            Ok(())
961        })?;
962
963        Ok(())
964    }
965
966    fn fmt_verbose_text(
967        &self,
968        f: &mut fmt::Formatter<'_>,
969        ctx: &mut PlanRenderingContext<'_, Plan>,
970    ) -> fmt::Result {
971        // raw field
972        let raw = &self.raw;
973        writeln!(f, "{}raw={}", ctx.indent, raw)?;
974        // arranged field
975        for (i, arrangement) in self.arranged.iter().enumerate() {
976            let arrangement = Arrangement::from(arrangement);
977            write!(f, "{}arrangements[{}]=", ctx.indent, i)?;
978            arrangement.fmt_text(f, ctx)?;
979            writeln!(f, "")?;
980        }
981        Ok(())
982    }
983}
984
985impl DisplayText<PlanRenderingContext<'_, Plan>> for LinearJoinPlan {
986    fn fmt_text(
987        &self,
988        f: &mut fmt::Formatter<'_>,
989        ctx: &mut PlanRenderingContext<'_, Plan>,
990    ) -> fmt::Result {
991        if ctx.config.verbose_syntax {
992            self.fmt_verbose_text(f, ctx)
993        } else {
994            self.fmt_default_text(f, ctx)
995        }
996    }
997}
998impl LinearJoinPlan {
999    #[allow(clippy::needless_pass_by_ref_mut)]
1000    fn fmt_default_text(
1001        &self,
1002        f: &mut fmt::Formatter<'_>,
1003        ctx: &mut PlanRenderingContext<'_, Plan>,
1004    ) -> fmt::Result {
1005        for (i, plan) in self.stage_plans.iter().enumerate().rev() {
1006            let lookup_relation = &plan.lookup_relation;
1007            write!(f, "{}Join stage {i} in %{lookup_relation}", ctx.indent)?;
1008            if !plan.lookup_key.is_empty() {
1009                let lookup_key = CompactScalarSeq(&plan.lookup_key);
1010                writeln!(f, " with lookup key {lookup_key}",)?;
1011            } else {
1012                writeln!(f)?;
1013            }
1014            if plan.closure.maps_or_filters() {
1015                ctx.indented(|ctx| plan.closure.fmt_default_text(f, ctx))?;
1016            }
1017        }
1018        if let Some(final_closure) = &self.final_closure {
1019            if final_closure.maps_or_filters() {
1020                ctx.indented(|ctx| {
1021                    writeln!(f, "{}Final closure:", ctx.indent)?;
1022                    ctx.indented(|ctx| final_closure.fmt_default_text(f, ctx))
1023                })?;
1024            }
1025        }
1026        Ok(())
1027    }
1028
1029    fn fmt_verbose_text(
1030        &self,
1031        f: &mut fmt::Formatter<'_>,
1032        ctx: &mut PlanRenderingContext<'_, Plan>,
1033    ) -> fmt::Result {
1034        let mode = HumanizedExplain::new(ctx.config.redacted);
1035        let plan = self;
1036        if let Some(closure) = plan.final_closure.as_ref() {
1037            if !closure.is_identity() {
1038                writeln!(f, "{}final_closure", ctx.indent)?;
1039                ctx.indented(|ctx| closure.fmt_text(f, ctx))?;
1040            }
1041        }
1042        for (i, plan) in plan.stage_plans.iter().enumerate() {
1043            writeln!(f, "{}linear_stage[{}]", ctx.indent, i)?;
1044            ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1045        }
1046        if let Some(closure) = plan.initial_closure.as_ref() {
1047            if !closure.is_identity() {
1048                writeln!(f, "{}initial_closure", ctx.indent)?;
1049                ctx.indented(|ctx| closure.fmt_text(f, ctx))?;
1050            }
1051        }
1052        match &plan.source_key {
1053            Some(source_key) => {
1054                let source_key = mode.seq(source_key, None);
1055                let source_key = CompactScalars(source_key);
1056                writeln!(
1057                    f,
1058                    "{}source={{ relation={}, key=[{}] }}",
1059                    ctx.indent, &plan.source_relation, source_key
1060                )?
1061            }
1062            None => writeln!(
1063                f,
1064                "{}source={{ relation={}, key=[] }}",
1065                ctx.indent, &plan.source_relation
1066            )?,
1067        };
1068        Ok(())
1069    }
1070}
1071
1072impl DisplayText<PlanRenderingContext<'_, Plan>> for LinearStagePlan {
1073    fn fmt_text(
1074        &self,
1075        f: &mut fmt::Formatter<'_>,
1076        ctx: &mut PlanRenderingContext<'_, Plan>,
1077    ) -> fmt::Result {
1078        if ctx.config.verbose_syntax {
1079            self.fmt_verbose_text(f, ctx)
1080        } else {
1081            self.fmt_default_text(f, ctx)
1082        }
1083    }
1084}
1085impl LinearStagePlan {
1086    #[allow(clippy::needless_pass_by_ref_mut)]
1087    fn fmt_default_text(
1088        &self,
1089        f: &mut fmt::Formatter<'_>,
1090        ctx: &mut PlanRenderingContext<'_, Plan>,
1091    ) -> fmt::Result {
1092        // NB this code path should not be live, as fmt_default_text for
1093        // `LinearJoinPlan` prints out each stage already
1094        let lookup_relation = &self.lookup_relation;
1095        if !self.lookup_key.is_empty() {
1096            let lookup_key = CompactScalarSeq(&self.lookup_key);
1097            writeln!(
1098                f,
1099                "{}Lookup key {lookup_key} in %{lookup_relation}",
1100                ctx.indent
1101            )
1102        } else {
1103            writeln!(f, "{}Lookup in %{lookup_relation}", ctx.indent)
1104        }
1105    }
1106
1107    fn fmt_verbose_text(
1108        &self,
1109        f: &mut fmt::Formatter<'_>,
1110        ctx: &mut PlanRenderingContext<'_, Plan>,
1111    ) -> fmt::Result {
1112        let mode = HumanizedExplain::new(ctx.config.redacted);
1113
1114        let plan = self;
1115        if !plan.closure.is_identity() {
1116            writeln!(f, "{}closure", ctx.indent)?;
1117            ctx.indented(|ctx| plan.closure.fmt_text(f, ctx))?;
1118        }
1119        {
1120            let lookup_relation = &plan.lookup_relation;
1121            let lookup_key = CompactScalarSeq(&plan.lookup_key);
1122            writeln!(
1123                f,
1124                "{}lookup={{ relation={}, key=[{}] }}",
1125                ctx.indent, lookup_relation, lookup_key
1126            )?;
1127        }
1128        {
1129            let stream_key = mode.seq(&plan.stream_key, None);
1130            let stream_key = CompactScalars(stream_key);
1131            let stream_thinning = Indices(&plan.stream_thinning);
1132            writeln!(
1133                f,
1134                "{}stream={{ key=[{}], thinning=({}) }}",
1135                ctx.indent, stream_key, stream_thinning
1136            )?;
1137        }
1138        Ok(())
1139    }
1140}
1141
1142impl DisplayText<PlanRenderingContext<'_, Plan>> for DeltaJoinPlan {
1143    fn fmt_text(
1144        &self,
1145        f: &mut fmt::Formatter<'_>,
1146        ctx: &mut PlanRenderingContext<'_, Plan>,
1147    ) -> fmt::Result {
1148        if ctx.config.verbose_syntax {
1149            self.fmt_verbose_text(f, ctx)
1150        } else {
1151            self.fmt_default_text(f, ctx)
1152        }
1153    }
1154}
1155impl DeltaJoinPlan {
1156    fn fmt_default_text(
1157        &self,
1158        f: &mut fmt::Formatter<'_>,
1159        ctx: &mut PlanRenderingContext<'_, Plan>,
1160    ) -> fmt::Result {
1161        for (i, plan) in self.path_plans.iter().enumerate() {
1162            writeln!(f, "{}Delta join path for input %{i}", ctx.indent)?;
1163            ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1164        }
1165        Ok(())
1166    }
1167
1168    fn fmt_verbose_text(
1169        &self,
1170        f: &mut fmt::Formatter<'_>,
1171        ctx: &mut PlanRenderingContext<'_, Plan>,
1172    ) -> fmt::Result {
1173        for (i, plan) in self.path_plans.iter().enumerate() {
1174            writeln!(f, "{}plan_path[{}]", ctx.indent, i)?;
1175            ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1176        }
1177        Ok(())
1178    }
1179}
1180
1181impl DisplayText<PlanRenderingContext<'_, Plan>> for DeltaPathPlan {
1182    fn fmt_text(
1183        &self,
1184        f: &mut fmt::Formatter<'_>,
1185        ctx: &mut PlanRenderingContext<'_, Plan>,
1186    ) -> fmt::Result {
1187        if ctx.config.verbose_syntax {
1188            self.fmt_verbose_text(f, ctx)
1189        } else {
1190            self.fmt_default_text(f, ctx)
1191        }
1192    }
1193}
1194
1195impl DeltaPathPlan {
1196    #[allow(clippy::needless_pass_by_ref_mut)]
1197    fn fmt_default_text(
1198        &self,
1199        f: &mut fmt::Formatter<'_>,
1200        ctx: &mut PlanRenderingContext<'_, Plan>,
1201    ) -> fmt::Result {
1202        for (
1203            i,
1204            DeltaStagePlan {
1205                lookup_key,
1206                lookup_relation,
1207                closure,
1208                ..
1209            },
1210        ) in self.stage_plans.iter().enumerate()
1211        {
1212            if !lookup_key.is_empty() {
1213                let lookup_key = CompactScalarSeq(lookup_key);
1214                writeln!(
1215                    f,
1216                    "{}stage {i} for %{lookup_relation}: lookup key {lookup_key}",
1217                    ctx.indent
1218                )?;
1219            } else {
1220                writeln!(f, "{}stage %{i} for  %{lookup_relation}", ctx.indent)?;
1221            }
1222            if closure.maps_or_filters() {
1223                ctx.indented(|ctx| closure.fmt_default_text(f, ctx))?;
1224            }
1225        }
1226        if let Some(final_closure) = &self.final_closure {
1227            if final_closure.maps_or_filters() {
1228                ctx.indented(|ctx| {
1229                    writeln!(f, "{}Final closure:", ctx.indent)?;
1230                    ctx.indented(|ctx| final_closure.fmt_default_text(f, ctx))
1231                })?;
1232            }
1233        }
1234        Ok(())
1235    }
1236
1237    fn fmt_verbose_text(
1238        &self,
1239        f: &mut fmt::Formatter<'_>,
1240        ctx: &mut PlanRenderingContext<'_, Plan>,
1241    ) -> fmt::Result {
1242        let mode = HumanizedExplain::new(ctx.config.redacted);
1243        let plan = self;
1244        if let Some(closure) = plan.final_closure.as_ref() {
1245            if !closure.is_identity() {
1246                writeln!(f, "{}final_closure", ctx.indent)?;
1247                ctx.indented(|ctx| closure.fmt_text(f, ctx))?;
1248            }
1249        }
1250        for (i, plan) in plan.stage_plans.iter().enumerate().rev() {
1251            writeln!(f, "{}delta_stage[{}]", ctx.indent, i)?;
1252            ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1253        }
1254        if !plan.initial_closure.is_identity() {
1255            writeln!(f, "{}initial_closure", ctx.indent)?;
1256            ctx.indented(|ctx| plan.initial_closure.fmt_text(f, ctx))?;
1257        }
1258        {
1259            let source_relation = &plan.source_relation;
1260            let source_key = mode.seq(&plan.source_key, None);
1261            let source_key = CompactScalars(source_key);
1262            writeln!(
1263                f,
1264                "{}source={{ relation={}, key=[{}] }}",
1265                ctx.indent, source_relation, source_key
1266            )?;
1267        }
1268        Ok(())
1269    }
1270}
1271
1272impl DisplayText<PlanRenderingContext<'_, Plan>> for DeltaStagePlan {
1273    fn fmt_text(
1274        &self,
1275        f: &mut fmt::Formatter<'_>,
1276        ctx: &mut PlanRenderingContext<'_, Plan>,
1277    ) -> fmt::Result {
1278        if ctx.config.verbose_syntax {
1279            self.fmt_verbose_text(f, ctx)
1280        } else {
1281            self.fmt_default_text(f, ctx)
1282        }
1283    }
1284}
1285impl DeltaStagePlan {
1286    #[allow(clippy::needless_pass_by_ref_mut)]
1287    fn fmt_default_text(
1288        &self,
1289        f: &mut fmt::Formatter<'_>,
1290        ctx: &mut PlanRenderingContext<'_, Plan>,
1291    ) -> fmt::Result {
1292        // NB this code path should not be live, as fmt_default_text for
1293        // `DeltaPathPlan` prints out each stage already
1294        let lookup_relation = &self.lookup_relation;
1295        let lookup_key = CompactScalarSeq(&self.lookup_key);
1296        writeln!(
1297            f,
1298            "{}Lookup key {lookup_key} in %{lookup_relation}",
1299            ctx.indent
1300        )
1301    }
1302
1303    fn fmt_verbose_text(
1304        &self,
1305        f: &mut fmt::Formatter<'_>,
1306        ctx: &mut PlanRenderingContext<'_, Plan>,
1307    ) -> fmt::Result {
1308        let mode = HumanizedExplain::new(ctx.config.redacted);
1309        let plan = self;
1310        if !plan.closure.is_identity() {
1311            writeln!(f, "{}closure", ctx.indent)?;
1312            ctx.indented(|ctx| plan.closure.fmt_text(f, ctx))?;
1313        }
1314        {
1315            let lookup_relation = &plan.lookup_relation;
1316            let lookup_key = mode.seq(&plan.lookup_key, None);
1317            let lookup_key = CompactScalars(lookup_key);
1318            writeln!(
1319                f,
1320                "{}lookup={{ relation={}, key=[{}] }}",
1321                ctx.indent, lookup_relation, lookup_key
1322            )?;
1323        }
1324        {
1325            let stream_key = mode.seq(&plan.stream_key, None);
1326            let stream_key = CompactScalars(stream_key);
1327            let stream_thinning = mode.seq(&plan.stream_thinning, None);
1328            let stream_thinning = CompactScalars(stream_thinning);
1329            writeln!(
1330                f,
1331                "{}stream={{ key=[{}], thinning=({}) }}",
1332                ctx.indent, stream_key, stream_thinning
1333            )?;
1334        }
1335        Ok(())
1336    }
1337}
1338
1339impl DisplayText<PlanRenderingContext<'_, Plan>> for JoinClosure {
1340    fn fmt_text(
1341        &self,
1342        f: &mut fmt::Formatter<'_>,
1343        ctx: &mut PlanRenderingContext<'_, Plan>,
1344    ) -> fmt::Result {
1345        if ctx.config.verbose_syntax {
1346            self.fmt_verbose_text(f, ctx)
1347        } else {
1348            self.fmt_default_text(f, ctx)
1349        }
1350    }
1351}
1352impl JoinClosure {
1353    fn fmt_default_text(
1354        &self,
1355        f: &mut fmt::Formatter<'_>,
1356        ctx: &mut PlanRenderingContext<'_, Plan>,
1357    ) -> fmt::Result {
1358        let mode = HumanizedExplain::new(ctx.config.redacted);
1359        if !self.before.expressions.is_empty() || !self.before.predicates.is_empty() {
1360            mode.expr(self.before.deref(), None).fmt_text(f, ctx)?;
1361        }
1362        if !self.ready_equivalences.is_empty() {
1363            let equivalences = separated(
1364                " AND ",
1365                self.ready_equivalences
1366                    .iter()
1367                    .map(|equivalence| separated(" = ", mode.seq(equivalence, None))),
1368            );
1369            writeln!(f, "{}Equivalences: {equivalences}", ctx.indent)?;
1370        }
1371        Ok(())
1372    }
1373
1374    fn fmt_verbose_text(
1375        &self,
1376        f: &mut fmt::Formatter<'_>,
1377        ctx: &mut PlanRenderingContext<'_, Plan>,
1378    ) -> fmt::Result {
1379        let mode = HumanizedExplain::new(ctx.config.redacted);
1380        mode.expr(self.before.deref(), None).fmt_text(f, ctx)?;
1381        if !self.ready_equivalences.is_empty() {
1382            let equivalences = separated(
1383                " AND ",
1384                self.ready_equivalences
1385                    .iter()
1386                    .map(|equivalence| separated(" = ", mode.seq(equivalence, None))),
1387            );
1388            writeln!(f, "{}ready_equivalences={}", ctx.indent, equivalences)?;
1389        }
1390        Ok(())
1391    }
1392}
1393
1394impl DisplayText<PlanRenderingContext<'_, Plan>> for AccumulablePlan {
1395    fn fmt_text(
1396        &self,
1397        f: &mut fmt::Formatter<'_>,
1398        ctx: &mut PlanRenderingContext<'_, Plan>,
1399    ) -> fmt::Result {
1400        if ctx.config.verbose_syntax {
1401            self.fmt_verbose_text(f, ctx)
1402        } else {
1403            self.fmt_default_text(f, ctx)
1404        }
1405    }
1406}
1407impl AccumulablePlan {
1408    #[allow(clippy::needless_pass_by_ref_mut)]
1409    fn fmt_default_text(
1410        &self,
1411        f: &mut fmt::Formatter<'_>,
1412        ctx: &mut PlanRenderingContext<'_, Plan>,
1413    ) -> fmt::Result {
1414        let mode = HumanizedExplain::new(ctx.config.redacted);
1415
1416        if !self.simple_aggrs.is_empty() {
1417            let simple_aggrs = self
1418                .simple_aggrs
1419                .iter()
1420                .map(|(_i_datum, agg)| mode.expr(agg, None));
1421            let simple_aggrs = separated(", ", simple_aggrs);
1422            writeln!(f, "{}Simple aggregates: {simple_aggrs}", ctx.indent)?;
1423        }
1424
1425        if !self.distinct_aggrs.is_empty() {
1426            let distinct_aggrs = self
1427                .distinct_aggrs
1428                .iter()
1429                .map(|(_i_datum, agg)| mode.expr(agg, None));
1430            let distinct_aggrs = separated(", ", distinct_aggrs);
1431            writeln!(f, "{}Distinct aggregates: {distinct_aggrs}", ctx.indent)?;
1432        }
1433        Ok(())
1434    }
1435
1436    #[allow(clippy::needless_pass_by_ref_mut)]
1437    fn fmt_verbose_text(
1438        &self,
1439        f: &mut fmt::Formatter<'_>,
1440        ctx: &mut PlanRenderingContext<'_, Plan>,
1441    ) -> fmt::Result {
1442        let mode = HumanizedExplain::new(ctx.config.redacted);
1443        // full_aggrs (skipped because they are repeated in simple_aggrs ∪ distinct_aggrs)
1444        // for (i, aggr) in self.full_aggrs.iter().enumerate() {
1445        //     write!(f, "{}full_aggrs[{}]=", ctx.indent, i)?;
1446        //     aggr.fmt_text(f, &mut ())?;
1447        //     writeln!(f)?;
1448        // }
1449        // simple_aggrs
1450        for (i, (i_datum, agg)) in self.simple_aggrs.iter().enumerate() {
1451            let agg = mode.expr(agg, None);
1452            write!(f, "{}simple_aggrs[{}]=", ctx.indent, i)?;
1453            writeln!(f, "({}, {})", i_datum, agg)?;
1454        }
1455        // distinct_aggrs
1456        for (i, (i_datum, agg)) in self.distinct_aggrs.iter().enumerate() {
1457            let agg = mode.expr(agg, None);
1458            write!(f, "{}distinct_aggrs[{}]=", ctx.indent, i)?;
1459            writeln!(f, "({}, {})", i_datum, agg)?;
1460        }
1461        Ok(())
1462    }
1463}
1464
1465impl DisplayText<PlanRenderingContext<'_, Plan>> for HierarchicalPlan {
1466    fn fmt_text(
1467        &self,
1468        f: &mut fmt::Formatter<'_>,
1469        ctx: &mut PlanRenderingContext<'_, Plan>,
1470    ) -> fmt::Result {
1471        if ctx.config.verbose_syntax {
1472            self.fmt_verbose_text(f, ctx)
1473        } else {
1474            self.fmt_default_text(f, ctx)
1475        }
1476    }
1477}
1478impl HierarchicalPlan {
1479    #[allow(clippy::needless_pass_by_ref_mut)]
1480    fn fmt_default_text(
1481        &self,
1482        f: &mut fmt::Formatter<'_>,
1483        ctx: &mut PlanRenderingContext<'_, Plan>,
1484    ) -> fmt::Result {
1485        let mode = HumanizedExplain::new(ctx.config.redacted);
1486        let aggr_funcs = mode.seq(self.aggr_funcs(), None);
1487        let aggr_funcs = separated(", ", aggr_funcs);
1488        writeln!(f, "{}Aggregations: {aggr_funcs}", ctx.indent)
1489    }
1490
1491    #[allow(clippy::needless_pass_by_ref_mut)]
1492    fn fmt_verbose_text(
1493        &self,
1494        f: &mut fmt::Formatter<'_>,
1495        ctx: &mut PlanRenderingContext<'_, Plan>,
1496    ) -> fmt::Result {
1497        let mode = HumanizedExplain::new(ctx.config.redacted);
1498        match self {
1499            HierarchicalPlan::Monotonic(plan) => {
1500                let aggr_funcs = mode.seq(&plan.aggr_funcs, None);
1501                let aggr_funcs = separated(", ", aggr_funcs);
1502                writeln!(f, "{}aggr_funcs=[{}]", ctx.indent, aggr_funcs)?;
1503                writeln!(f, "{}monotonic", ctx.indent)?;
1504                if plan.must_consolidate {
1505                    writeln!(f, "{}must_consolidate", ctx.indent)?;
1506                }
1507            }
1508            HierarchicalPlan::Bucketed(plan) => {
1509                let aggr_funcs = mode.seq(&plan.aggr_funcs, None);
1510                let aggr_funcs = separated(", ", aggr_funcs);
1511                writeln!(f, "{}aggr_funcs=[{}]", ctx.indent, aggr_funcs)?;
1512                let buckets = separated(", ", &plan.buckets);
1513                writeln!(f, "{}buckets=[{}]", ctx.indent, buckets)?;
1514            }
1515        }
1516        Ok(())
1517    }
1518}
1519
1520impl DisplayText<PlanRenderingContext<'_, Plan>> for BasicPlan {
1521    fn fmt_text(
1522        &self,
1523        f: &mut fmt::Formatter<'_>,
1524        ctx: &mut PlanRenderingContext<'_, Plan>,
1525    ) -> fmt::Result {
1526        if ctx.config.verbose_syntax {
1527            self.fmt_verbose_text(f, ctx)
1528        } else {
1529            self.fmt_default_text(f, ctx)
1530        }
1531    }
1532}
1533impl BasicPlan {
1534    #[allow(clippy::needless_pass_by_ref_mut)]
1535    fn fmt_default_text(
1536        &self,
1537        f: &mut fmt::Formatter<'_>,
1538        ctx: &mut PlanRenderingContext<'_, Plan>,
1539    ) -> fmt::Result {
1540        let mode = HumanizedExplain::new(ctx.config.redacted);
1541        match self {
1542            BasicPlan::Single(SingleBasicPlan {
1543                expr,
1544                fused_unnest_list: _,
1545            }) => {
1546                let agg = mode.expr(expr, None);
1547                writeln!(f, "{}Aggregation: {agg}", ctx.indent)?;
1548            }
1549            BasicPlan::Multiple(aggs) => {
1550                let mode = HumanizedExplain::new(ctx.config.redacted);
1551                write!(f, "{}Aggregations:", ctx.indent)?;
1552
1553                for agg in aggs.iter() {
1554                    let agg = mode.expr(agg, None);
1555                    write!(f, " {agg}")?;
1556                }
1557                writeln!(f)?;
1558            }
1559        }
1560        Ok(())
1561    }
1562
1563    #[allow(clippy::needless_pass_by_ref_mut)]
1564    fn fmt_verbose_text(
1565        &self,
1566        f: &mut fmt::Formatter<'_>,
1567        ctx: &mut PlanRenderingContext<'_, Plan>,
1568    ) -> fmt::Result {
1569        let mode = HumanizedExplain::new(ctx.config.redacted);
1570        match self {
1571            BasicPlan::Single(SingleBasicPlan {
1572                expr,
1573                fused_unnest_list,
1574            }) => {
1575                let agg = mode.expr(expr, None);
1576                let fused_unnest_list = if *fused_unnest_list {
1577                    ", fused_unnest_list=true"
1578                } else {
1579                    ""
1580                };
1581                writeln!(f, "{}aggr=({}{})", ctx.indent, agg, fused_unnest_list)?;
1582            }
1583            BasicPlan::Multiple(aggs) => {
1584                for (i, agg) in aggs.iter().enumerate() {
1585                    let agg = mode.expr(agg, None);
1586                    writeln!(f, "{}aggrs[{}]={}", ctx.indent, i, agg)?;
1587                }
1588            }
1589        }
1590        Ok(())
1591    }
1592}
1593
1594/// Helper struct for rendering an arrangement.
1595struct Arrangement<'a> {
1596    key: &'a Vec<MirScalarExpr>,
1597    permutation: Permutation<'a>,
1598    thinning: &'a Vec<usize>,
1599}
1600
1601impl<'a> From<&'a (Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)> for Arrangement<'a> {
1602    fn from(
1603        (key, permutation, thinning): &'a (Vec<MirScalarExpr>, Vec<usize>, Vec<usize>),
1604    ) -> Self {
1605        Arrangement {
1606            key,
1607            permutation: Permutation(permutation),
1608            thinning,
1609        }
1610    }
1611}
1612
1613impl<'a> DisplayText<PlanRenderingContext<'_, Plan>> for Arrangement<'a> {
1614    fn fmt_text(
1615        &self,
1616        f: &mut fmt::Formatter<'_>,
1617        ctx: &mut PlanRenderingContext<'_, Plan>,
1618    ) -> fmt::Result {
1619        if ctx.config.verbose_syntax {
1620            self.fmt_verbose_text(f, ctx)
1621        } else {
1622            self.fmt_default_text(f, ctx)
1623        }
1624    }
1625}
1626
1627impl<'a> Arrangement<'a> {
1628    #[allow(clippy::needless_pass_by_ref_mut)]
1629    fn fmt_default_text(
1630        &self,
1631        f: &mut fmt::Formatter<'_>,
1632        ctx: &PlanRenderingContext<'_, Plan>,
1633    ) -> fmt::Result {
1634        let mode = HumanizedExplain::new(ctx.config.redacted);
1635        if !self.key.is_empty() {
1636            let key = mode.seq(self.key, None);
1637            let key = CompactScalars(key);
1638            write!(f, "{key}")
1639        } else {
1640            write!(f, "(empty key)")
1641        }
1642    }
1643
1644    #[allow(clippy::needless_pass_by_ref_mut)]
1645    fn fmt_verbose_text(
1646        &self,
1647        f: &mut fmt::Formatter<'_>,
1648        ctx: &PlanRenderingContext<'_, Plan>,
1649    ) -> fmt::Result {
1650        let mode = HumanizedExplain::new(ctx.config.redacted);
1651        // prepare key
1652        let key = mode.seq(self.key, None);
1653        let key = CompactScalars(key);
1654        // prepare perumation map
1655        let permutation = &self.permutation;
1656        // prepare thinning
1657        let thinning = Indices(self.thinning);
1658        // write the arrangement spec
1659        write!(
1660            f,
1661            "{{ key=[{}], permutation={}, thinning=({}) }}",
1662            key, permutation, thinning
1663        )
1664    }
1665}
1666
1667/// Helper struct for rendering a permutation.
1668struct Permutation<'a>(&'a Vec<usize>);
1669
1670impl<'a> fmt::Display for Permutation<'a> {
1671    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1672        let mut pairs = vec![];
1673        for (x, y) in self.0.iter().enumerate().filter(|(x, y)| x != *y) {
1674            pairs.push(format!("#{}: #{}", x, y));
1675        }
1676
1677        if pairs.len() > 0 {
1678            write!(f, "{{{}}}", separated(", ", pairs))
1679        } else {
1680            write!(f, "id")
1681        }
1682    }
1683}
1684
1685/// Annotations for physical plans.
1686struct PlanAnnotations {
1687    config: ExplainConfig,
1688    node_id: LirId,
1689}
1690
1691// The current implementation deviates from the `AnnotatedPlan` used in `Mir~`-based plans. This is
1692// fine, since at the moment the only attribute we are going to explain is the `node_id`, which at
1693// the moment is kept inline with the `Plan` variants. If at some point in the future we want to
1694// start deriving and printing attributes that are derived ad-hoc, however, we might want to adopt
1695// `AnnotatedPlan` here as well.
1696impl PlanAnnotations {
1697    fn new(config: ExplainConfig, plan: &Plan) -> Self {
1698        let node_id = plan.lir_id;
1699        Self { config, node_id }
1700    }
1701}
1702
1703impl fmt::Display for PlanAnnotations {
1704    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1705        if self.config.node_ids {
1706            f.debug_struct(" //")
1707                .field("node_id", &self.node_id)
1708                .finish()
1709        } else {
1710            // No physical plan annotations enabled.
1711            Ok(())
1712        }
1713    }
1714}