1use std::fmt;
24use std::ops::Deref;
25
26use itertools::Itertools;
27use mz_expr::explain::{HumanizedExplain, HumanizerMode, fmt_text_constant_rows};
28use mz_expr::{Id, MirScalarExpr};
29use mz_ore::soft_assert_or_log;
30use mz_ore::str::{IndentLike, StrExt, separated};
31use mz_repr::explain::text::DisplayText;
32use mz_repr::explain::{
33 CompactScalarSeq, CompactScalars, ExplainConfig, ExprHumanizer, Indices, PlanRenderingContext,
34};
35
36use crate::plan::join::delta_join::{DeltaPathPlan, DeltaStagePlan};
37use crate::plan::join::linear_join::LinearStagePlan;
38use crate::plan::join::{DeltaJoinPlan, JoinClosure, LinearJoinPlan};
39use crate::plan::reduce::{
40 AccumulablePlan, BasicPlan, BucketedPlan, HierarchicalPlan, MonotonicPlan, SingleBasicPlan,
41};
42use crate::plan::threshold::ThresholdPlan;
43use crate::plan::{ArrangementStrategy, AvailableCollections, LirId, Plan, PlanNode};
44
45impl DisplayText<PlanRenderingContext<'_, Plan>> for Plan {
46 fn fmt_text(
47 &self,
48 f: &mut fmt::Formatter<'_>,
49 ctx: &mut PlanRenderingContext<'_, Plan>,
50 ) -> fmt::Result {
51 if ctx.config.verbose_syntax {
52 self.fmt_verbose_text(f, ctx)
53 } else {
54 self.fmt_default_text(f, ctx)
55 }
56 }
57}
58
59impl Plan {
60 fn fmt_default_text(
66 &self,
67 f: &mut fmt::Formatter<'_>,
68 ctx: &mut PlanRenderingContext<'_, Plan>,
69 ) -> fmt::Result {
70 use PlanNode::*;
71
72 let mode = HumanizedExplain::new(ctx.config.redacted);
73 let annotations = PlanAnnotations::new(ctx.config.clone(), self);
74
75 match &self.node {
76 Constant { rows } => {
77 write!(f, "{}→Constant ", ctx.indent)?;
78
79 match rows {
80 Ok(rows) => write!(
81 f,
82 "({} row{})",
83 rows.len(),
84 if rows.len() == 1 { "" } else { "s" }
85 )?,
86 Err(err) => {
87 if mode.redacted() {
88 write!(f, "(error: █)")?;
89 } else {
90 write!(f, "(error: {})", err.to_string().quoted(),)?;
91 }
92 }
93 }
94
95 writeln!(f, "{annotations}")?;
96 }
97 Get { id, keys, plan } => {
98 ctx.indent.set(); let id = match id {
102 Id::Local(id) => id.to_string(),
103 Id::Global(id) => ctx
104 .humanizer
105 .humanize_id(*id)
106 .unwrap_or_else(|| id.to_string()),
107 };
108 use crate::plan::GetPlan;
110 match plan {
111 GetPlan::PassArrangements => {
112 if keys.raw && keys.arranged.is_empty() {
113 writeln!(f, "{}→Stream {id}{annotations}", ctx.indent)?;
114 } else {
115 writeln!(f, "{}→Arranged {id}{annotations}", ctx.indent)?;
118 }
119 }
120 GetPlan::Arrangement(key, Some(val), mfp) => {
121 if !mfp.is_identity() {
122 writeln!(f, "{}→Fused with Child Map/Filter/Project", ctx.indent)?;
123 ctx.indent += 1;
124 mode.expr(mfp, None).fmt_default_text(f, ctx)?;
125 ctx.indent += 1;
126 }
127
128 writeln!(f, "{}→Index Lookup on {id}{annotations}", ctx.indent)?;
129 ctx.indent += 1;
130 let key = CompactScalars(mode.seq(key, None));
131 write!(f, "{}Key: ({key}) ", ctx.indent)?;
132 let val = mode.expr(val, None);
133 writeln!(f, "Value: {val}")?;
134 }
135 GetPlan::Arrangement(key, None, mfp) => {
136 if !mfp.is_identity() {
137 writeln!(f, "{}→Fused with Child Map/Filter/Project", ctx.indent)?;
138 ctx.indent += 1;
139 mode.expr(mfp, None).fmt_default_text(f, ctx)?;
140 ctx.indent += 1;
141 }
142
143 writeln!(f, "{}→Arranged {id}{annotations}", ctx.indent)?;
144 ctx.indent += 1;
145 let key = CompactScalars(mode.seq(key, None));
146 writeln!(f, "{}Key: ({key})", ctx.indent)?;
147 }
148 GetPlan::Collection(mfp) => {
149 if !mfp.is_identity() {
150 writeln!(f, "{}→Fused with Child Map/Filter/Project", ctx.indent)?;
151 ctx.indent += 1;
152 mode.expr(mfp, None).fmt_default_text(f, ctx)?;
153 ctx.indent += 1;
154 }
155
156 writeln!(f, "{}→Read {id}{annotations}", ctx.indent)?;
157 }
158 }
159 ctx.indent.reset(); }
161 Let { id, value, body } => {
162 let mut bindings = vec![(id, value.as_ref())];
163 let mut head = body.as_ref();
164
165 while let Let { id, value, body } = &head.node {
168 bindings.push((id, value.as_ref()));
169 head = body.as_ref();
170 }
171
172 writeln!(f, "{}→With", ctx.indent)?;
173 ctx.indented(|ctx| {
174 for (id, value) in bindings.iter() {
175 writeln!(f, "{}cte {} =", ctx.indent, *id)?;
176 ctx.indented(|ctx| value.fmt_text(f, ctx))?;
177 }
178 Ok(())
179 })?;
180 writeln!(f, "{}→Return{annotations}", ctx.indent)?;
181 ctx.indented(|ctx| head.fmt_text(f, ctx))?;
182 }
183 LetRec {
184 ids,
185 values,
186 limits,
187 body,
188 } => {
189 let head = body.as_ref();
190
191 writeln!(f, "{}→With Mutually Recursive", ctx.indent)?;
192 ctx.indented(|ctx| {
193 let bindings = ids.iter().zip_eq(values).zip_eq(limits);
194 for ((id, value), limit) in bindings {
195 if let Some(limit) = limit {
196 writeln!(f, "{}cte {} {} =", ctx.indent, limit, *id)?;
197 } else {
198 writeln!(f, "{}cte {} =", ctx.indent, *id)?;
199 }
200 ctx.indented(|ctx| value.fmt_text(f, ctx))?;
201 }
202 Ok(())
203 })?;
204 writeln!(f, "{}→Return{annotations}", ctx.indent)?;
205 ctx.indented(|ctx| head.fmt_text(f, ctx))?;
206 }
207 Mfp {
208 input,
209 mfp,
210 input_key_val: _,
211 } => {
212 writeln!(f, "{}→Map/Filter/Project{annotations}", ctx.indent)?;
213 ctx.indent.set();
214
215 ctx.indent += 1;
216 mode.expr(mfp, None).fmt_default_text(f, ctx)?;
217
218 if !mfp.is_identity() {
220 ctx.indent += 1;
221 }
222 input.fmt_text(f, ctx)?;
223 ctx.indent.reset();
224 }
225 FlatMap {
226 input_key: _,
227 input,
228 exprs,
229 func,
230 mfp_after,
231 } => {
232 ctx.indent.set();
233 if !mfp_after.expressions.is_empty() || !mfp_after.predicates.is_empty() {
234 writeln!(f, "{}→Fused with Child Map/Filter/Project", ctx.indent)?;
235 ctx.indent += 1;
236 mode.expr(mfp_after, None).fmt_default_text(f, ctx)?;
237 ctx.indent += 1;
238 }
239
240 let exprs = mode.seq(exprs, None);
241 let exprs = CompactScalars(exprs);
242 writeln!(
243 f,
244 "{}→Table Function {func}({exprs}){annotations}",
245 ctx.indent
246 )?;
247 ctx.indent += 1;
248
249 input.fmt_text(f, ctx)?;
250
251 ctx.indent.reset();
252 }
253 Join { inputs, plan } => {
254 use crate::plan::join::JoinPlan;
255 match plan {
256 JoinPlan::Linear(plan) => {
257 let label = if plan.has_cross_stage() {
258 "→Differential Cross Join"
259 } else {
260 "→Differential Join"
261 };
262 write!(f, "{}{label} ", ctx.indent)?;
263 fmt_join_chain(
264 f,
265 ctx.humanizer,
266 &mode,
267 inputs,
268 plan.source_relation,
269 plan.source_key.as_ref(),
270 plan.stage_plans
271 .iter()
272 .map(|s| (s.lookup_relation, &s.lookup_key)),
273 )?;
274 writeln!(f, "{annotations}")?;
275 ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
276 }
277 JoinPlan::Delta(plan) => {
278 let label = if plan.has_cross_stage() {
279 "→Delta Cross Join"
280 } else {
281 "→Delta Join"
282 };
283 write!(f, "{}{label}", ctx.indent)?;
284 for dpp in &plan.path_plans {
285 write!(f, " [")?;
286 fmt_join_chain(
287 f,
288 ctx.humanizer,
289 &mode,
290 inputs,
291 dpp.source_relation,
292 Some(&dpp.source_key),
293 dpp.stage_plans
294 .iter()
295 .map(|s| (s.lookup_relation, &s.lookup_key)),
296 )?;
297 write!(f, "]")?;
298 }
299 writeln!(f, "{annotations}")?;
300 ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
301 }
302 }
303
304 ctx.indented(|ctx| {
305 for input in inputs {
306 input.fmt_text(f, ctx)?;
307 }
308 Ok(())
309 })?;
310 }
311 Reduce {
312 input_key: _,
313 input,
314 key_val_plan,
315 plan,
316 mfp_after,
317 temporal_bucketing_strategy,
318 } => {
319 ctx.indent.set();
320 if !mfp_after.expressions.is_empty() || !mfp_after.predicates.is_empty() {
321 writeln!(f, "{}→Fused with Child Map/Filter/Project", ctx.indent)?;
322 ctx.indent += 1;
323 mode.expr(mfp_after, None).fmt_default_text(f, ctx)?;
324 ctx.indent += 1;
325 }
326
327 let temporally_bucketed = matches!(
328 temporal_bucketing_strategy,
329 ArrangementStrategy::TemporalBucketing
330 );
331
332 use crate::plan::reduce::ReducePlan;
333 match plan {
334 ReducePlan::Distinct => {
335 write!(f, "{}→", ctx.indent)?;
336 if temporally_bucketed {
337 write!(f, "Temporally-Bucketed ")?;
338 }
339 writeln!(f, "Distinct GroupAggregate{annotations}")?;
340 }
341 ReducePlan::Accumulable(plan) => {
342 write!(f, "{}→", ctx.indent)?;
343 if temporally_bucketed {
344 write!(f, "Temporally-Bucketed ")?;
345 }
346 writeln!(f, "Accumulable GroupAggregate{annotations}")?;
347 ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
348 }
349 ReducePlan::Hierarchical(
350 plan @ HierarchicalPlan::Bucketed(BucketedPlan { buckets, .. }),
351 ) => {
352 write!(f, "{}→", ctx.indent)?;
353 if temporally_bucketed {
354 write!(f, "Temporally-Bucketed ")?;
355 }
356 write!(f, "Bucketed Hierarchical GroupAggregate (buckets:")?;
357 for bucket in buckets {
358 write!(f, " {bucket}")?;
359 }
360 writeln!(f, "){annotations}")?;
361 ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
362 }
363 ReducePlan::Hierarchical(
364 plan @ HierarchicalPlan::Monotonic(MonotonicPlan {
365 must_consolidate, ..
366 }),
367 ) => {
368 write!(f, "{}→", ctx.indent)?;
369 if temporally_bucketed {
370 write!(f, "Temporally-Bucketed ")?;
371 }
372 if *must_consolidate {
373 write!(f, "Consolidating ")?;
374 }
375 writeln!(f, "Monotonic GroupAggregate{annotations}",)?;
376 ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
377 }
378 ReducePlan::Basic(plan) => {
379 ctx.indent.set();
380 if let BasicPlan::Single(SingleBasicPlan {
381 fused_unnest_list, ..
382 }) = &plan
383 {
384 if *fused_unnest_list {
385 writeln!(
386 f,
387 "{}→Fused with Child Table Function unnest_list",
388 ctx.indent
389 )?;
390 ctx.indent += 1;
391 }
392 }
393 write!(f, "{}→", ctx.indent)?;
394 if temporally_bucketed {
395 write!(f, "Temporally-Bucketed ")?;
396 }
397 writeln!(f, "Non-incremental GroupAggregate{annotations}")?;
398 ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
399 ctx.indent.reset();
400 }
401 }
402
403 ctx.indented(|ctx| {
404 let kvp = key_val_plan.key_plan.deref();
405 if !kvp.is_identity() {
406 writeln!(f, "{}Key:", ctx.indent)?;
407 ctx.indented(|ctx| {
408 let key_plan = mode.expr(kvp, None);
409 key_plan.fmt_default_text(f, ctx)
410 })?;
411 }
412
413 input.fmt_text(f, ctx)
414 })?;
415
416 ctx.indent.reset();
417 }
418 TopK {
419 input,
420 top_k_plan,
421 temporal_bucketing_strategy,
422 } => {
423 let temporally_bucketed = matches!(
424 temporal_bucketing_strategy,
425 ArrangementStrategy::TemporalBucketing
426 );
427 use crate::plan::top_k::TopKPlan;
428 match top_k_plan {
429 TopKPlan::MonotonicTop1(plan) => {
430 write!(f, "{}→", ctx.indent)?;
431 if temporally_bucketed {
432 write!(f, "Temporally-Bucketed ")?;
433 }
434 if plan.must_consolidate {
435 write!(f, "Consolidating ")?;
436 }
437 writeln!(f, "Monotonic Top1{annotations}")?;
438
439 ctx.indented(|ctx| {
440 if plan.group_key.len() > 0 {
441 let group_by = CompactScalars(mode.seq(&plan.group_key, None));
442 writeln!(f, "{}Group By {group_by}", ctx.indent)?;
443 }
444 if plan.order_key.len() > 0 {
445 let order_by = separated(", ", mode.seq(&plan.order_key, None));
446 writeln!(f, "{}Order By {order_by}", ctx.indent)?;
447 }
448 Ok(())
449 })?;
450 }
451 TopKPlan::MonotonicTopK(plan) => {
452 write!(f, "{}→", ctx.indent)?;
453 if temporally_bucketed {
454 write!(f, "Temporally-Bucketed ")?;
455 }
456 if plan.must_consolidate {
457 write!(f, "Consolidating ")?;
458 }
459 writeln!(f, "Monotonic TopK{annotations}")?;
460
461 ctx.indented(|ctx| {
462 if plan.group_key.len() > 0 {
463 let group_by = CompactScalars(mode.seq(&plan.group_key, None));
464 writeln!(f, "{}Group By {group_by}", ctx.indent)?;
465 }
466 if plan.order_key.len() > 0 {
467 let order_by = separated(", ", mode.seq(&plan.order_key, None));
468 writeln!(f, "{}Order By {order_by}", ctx.indent)?;
469 }
470 if let Some(limit) = &plan.limit {
471 let limit = mode.expr(limit, None);
472 writeln!(f, "{}Limit {limit}", ctx.indent)?;
473 }
474 Ok(())
475 })?;
476 }
477 TopKPlan::Basic(plan) => {
478 write!(f, "{}→", ctx.indent)?;
479 if temporally_bucketed {
480 write!(f, "Temporally-Bucketed ")?;
481 }
482 writeln!(f, "Non-monotonic TopK{annotations}")?;
483
484 ctx.indented(|ctx| {
485 if plan.group_key.len() > 0 {
486 let group_by = CompactScalars(mode.seq(&plan.group_key, None));
487 writeln!(f, "{}Group By {group_by}", ctx.indent)?;
488 }
489 if plan.order_key.len() > 0 {
490 let order_by = separated(", ", mode.seq(&plan.order_key, None));
491 writeln!(f, "{}Order By {order_by}", ctx.indent)?;
492 }
493 if let Some(limit) = &plan.limit {
494 let limit = mode.expr(limit, None);
495 writeln!(f, "{}Limit {limit}", ctx.indent)?;
496 }
497 if plan.offset != 0 {
498 let offset = plan.offset;
499 writeln!(f, "{}Offset {offset}", ctx.indent)?;
500 }
501 Ok(())
502 })?;
503 }
504 }
505
506 ctx.indented(|ctx| input.fmt_text(f, ctx))?;
507 }
508 Negate { input } => {
509 writeln!(f, "{}→Negate Diffs{annotations}", ctx.indent)?;
510
511 ctx.indented(|ctx| input.fmt_text(f, ctx))?;
512 }
513 Threshold {
514 input,
515 threshold_plan,
516 } => {
517 match threshold_plan {
518 ThresholdPlan::Basic(plan) => {
519 write!(f, "{}→Threshold Diffs ", ctx.indent)?;
520 let ensure_arrangement = Arrangement::from(&plan.ensure_arrangement);
521 ensure_arrangement.fmt_text(f, ctx)?;
522 writeln!(f, "{annotations}")?;
523 }
524 };
525
526 ctx.indented(|ctx| input.fmt_text(f, ctx))?;
527 }
528 Union {
529 inputs,
530 consolidate_output,
531 temporal_bucketing_strategies,
532 } => {
533 let any_temporally_bucketed = temporal_bucketing_strategies
534 .iter()
535 .any(|s| matches!(s, ArrangementStrategy::TemporalBucketing));
536 write!(f, "{}→", ctx.indent)?;
537 if any_temporally_bucketed {
538 write!(f, "Temporally-Bucketed ")?;
539 }
540 if *consolidate_output {
541 write!(f, "Consolidating ")?;
542 }
543 writeln!(f, "Union{annotations}")?;
544
545 ctx.indented(|ctx| {
546 for input in inputs.iter() {
547 input.fmt_text(f, ctx)?;
548 }
549 Ok(())
550 })?;
551 }
552 ArrangeBy {
553 input_key: _,
554 input,
555 input_mfp,
556 forms,
557 strategy: _,
558 } => {
559 ctx.indent.set();
560 if forms.raw && forms.arranged.is_empty() {
561 soft_assert_or_log!(forms.raw, "raw stream with no arrangements");
562 writeln!(f, "{}→Unarranged Raw Stream{annotations}", ctx.indent)?;
563 } else {
564 write!(f, "{}→Arrange", ctx.indent)?;
565
566 if !forms.arranged.is_empty() {
567 let mode = HumanizedExplain::new(ctx.config.redacted);
568 for (key, _, _) in &forms.arranged {
569 if !key.is_empty() {
570 let key = mode.seq(key, None);
571 let key = CompactScalars(key);
572 write!(f, " ({key})")?;
573 } else {
574 write!(f, " (empty key)")?;
575 }
576 }
577 }
578 writeln!(f, "{annotations}")?;
579 }
580
581 if !input_mfp.is_identity() {
582 ctx.indent += 1;
583 writeln!(f, "{}→Fused with Parent Map/Filter/Project", ctx.indent)?;
584 ctx.indented(|ctx| mode.expr(input_mfp, None).fmt_default_text(f, ctx))?;
585 }
586
587 ctx.indent += 1;
588 input.fmt_text(f, ctx)?;
589 ctx.indent.reset();
590 }
591 }
592
593 Ok(())
594 }
595
596 fn fmt_verbose_text(
597 &self,
598 f: &mut fmt::Formatter<'_>,
599 ctx: &mut PlanRenderingContext<'_, Plan>,
600 ) -> fmt::Result {
601 use PlanNode::*;
602
603 let mode = HumanizedExplain::new(ctx.config.redacted);
604 let annotations = PlanAnnotations::new(ctx.config.clone(), self);
605
606 match &self.node {
607 Constant { rows } => match rows {
608 Ok(rows) => {
609 if !rows.is_empty() {
610 writeln!(f, "{}Constant{}", ctx.indent, annotations)?;
611 ctx.indented(|ctx| {
612 fmt_text_constant_rows(
613 f,
614 rows.iter().map(|(data, _, diff)| (data, diff)),
615 &mut ctx.indent,
616 ctx.config.redacted,
617 )
618 })?;
619 } else {
620 writeln!(f, "{}Constant <empty>{}", ctx.indent, annotations)?;
621 }
622 }
623 Err(err) => {
624 if mode.redacted() {
625 writeln!(f, "{}Error █{}", ctx.indent, annotations)?;
626 } else {
627 {
628 writeln!(
629 f,
630 "{}Error {}{}",
631 ctx.indent,
632 err.to_string().quoted(),
633 annotations
634 )?;
635 }
636 }
637 }
638 },
639
640 Get { id, keys, plan } => {
641 ctx.indent.set(); let id = match id {
645 Id::Local(id) => id.to_string(),
646 Id::Global(id) => ctx
647 .humanizer
648 .humanize_id(*id)
649 .unwrap_or_else(|| id.to_string()),
650 };
651 use crate::plan::GetPlan;
653 match plan {
654 GetPlan::PassArrangements => {
655 writeln!(
656 f,
657 "{}Get::PassArrangements {}{}",
658 ctx.indent, id, annotations
659 )?;
660 ctx.indent += 1;
661 }
662 GetPlan::Arrangement(key, val, mfp) => {
663 writeln!(f, "{}Get::Arrangement {}{}", ctx.indent, id, annotations)?;
664 ctx.indent += 1;
665 mode.expr(mfp, None).fmt_text(f, ctx)?;
666 {
667 let key = mode.seq(key, None);
668 let key = CompactScalars(key);
669 writeln!(f, "{}key={}", ctx.indent, key)?;
670 }
671 if let Some(val) = val {
672 let val = mode.expr(val, None);
673 writeln!(f, "{}val={}", ctx.indent, val)?;
674 }
675 }
676 GetPlan::Collection(mfp) => {
677 writeln!(f, "{}Get::Collection {}{}", ctx.indent, id, annotations)?;
678 ctx.indent += 1;
679 mode.expr(mfp, None).fmt_text(f, ctx)?;
680 }
681 }
682
683 keys.fmt_text(f, ctx)?;
685
686 ctx.indent.reset(); }
688 Let { id, value, body } => {
689 let mut bindings = vec![(id, value.as_ref())];
690 let mut head = body.as_ref();
691
692 while let Let { id, value, body } = &head.node {
695 bindings.push((id, value.as_ref()));
696 head = body.as_ref();
697 }
698
699 writeln!(f, "{}With", ctx.indent)?;
700 ctx.indented(|ctx| {
701 for (id, value) in bindings.iter() {
702 writeln!(f, "{}cte {} =", ctx.indent, *id)?;
703 ctx.indented(|ctx| value.fmt_text(f, ctx))?;
704 }
705 Ok(())
706 })?;
707 writeln!(f, "{}Return{}", ctx.indent, annotations)?;
708 ctx.indented(|ctx| head.fmt_text(f, ctx))?;
709 }
710 LetRec {
711 ids,
712 values,
713 limits,
714 body,
715 } => {
716 let head = body.as_ref();
717
718 writeln!(f, "{}With Mutually Recursive", ctx.indent)?;
719 ctx.indented(|ctx| {
720 let bindings = ids.iter().zip_eq(values).zip_eq(limits);
721 for ((id, value), limit) in bindings {
722 if let Some(limit) = limit {
723 writeln!(f, "{}cte {} {} =", ctx.indent, limit, *id)?;
724 } else {
725 writeln!(f, "{}cte {} =", ctx.indent, *id)?;
726 }
727 ctx.indented(|ctx| value.fmt_text(f, ctx))?;
728 }
729 Ok(())
730 })?;
731 writeln!(f, "{}Return{}", ctx.indent, annotations)?;
732 ctx.indented(|ctx| head.fmt_text(f, ctx))?;
733 }
734 Mfp {
735 input,
736 mfp,
737 input_key_val,
738 } => {
739 writeln!(f, "{}Mfp{}", ctx.indent, annotations)?;
740 ctx.indented(|ctx| {
741 mode.expr(mfp, None).fmt_text(f, ctx)?;
742 if let Some((key, val)) = input_key_val {
743 {
744 let key = mode.seq(key, None);
745 let key = CompactScalars(key);
746 writeln!(f, "{}input_key={}", ctx.indent, key)?;
747 }
748 if let Some(val) = val {
749 let val = mode.expr(val, None);
750 writeln!(f, "{}input_val={}", ctx.indent, val)?;
751 }
752 }
753 input.fmt_text(f, ctx)
754 })?;
755 }
756 FlatMap {
757 input_key,
758 input,
759 exprs,
760 func,
761 mfp_after,
762 } => {
763 let exprs = mode.seq(exprs, None);
764 let exprs = CompactScalars(exprs);
765 writeln!(
766 f,
767 "{}FlatMap {}({}){}",
768 ctx.indent, func, exprs, annotations
769 )?;
770 ctx.indented(|ctx| {
771 if let Some(key) = input_key {
772 let key = mode.seq(key, None);
773 let key = CompactScalars(key);
774 writeln!(f, "{}input_key={}", ctx.indent, key)?;
775 }
776 if !mfp_after.is_identity() {
777 writeln!(f, "{}mfp_after", ctx.indent)?;
778 ctx.indented(|ctx| mode.expr(mfp_after, None).fmt_text(f, ctx))?;
779 }
780 input.fmt_text(f, ctx)
781 })?;
782 }
783 Join { inputs, plan } => {
784 use crate::plan::join::JoinPlan;
785 match plan {
786 JoinPlan::Linear(plan) => {
787 writeln!(f, "{}Join::Linear{}", ctx.indent, annotations)?;
788 ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
789 }
790 JoinPlan::Delta(plan) => {
791 writeln!(f, "{}Join::Delta{}", ctx.indent, annotations)?;
792 ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
793 }
794 }
795 ctx.indented(|ctx| {
796 for input in inputs {
797 input.fmt_text(f, ctx)?;
798 }
799 Ok(())
800 })?;
801 }
802 Reduce {
803 input_key,
804 input,
805 key_val_plan,
806 plan,
807 mfp_after,
808 temporal_bucketing_strategy,
809 } => {
810 use crate::plan::reduce::ReducePlan;
811 match plan {
812 ReducePlan::Distinct => {
813 writeln!(f, "{}Reduce::Distinct{}", ctx.indent, annotations)?;
814 }
815 ReducePlan::Accumulable(plan) => {
816 writeln!(f, "{}Reduce::Accumulable{}", ctx.indent, annotations)?;
817 ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
818 }
819 ReducePlan::Hierarchical(plan) => {
820 writeln!(f, "{}Reduce::Hierarchical{}", ctx.indent, annotations)?;
821 ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
822 }
823 ReducePlan::Basic(plan) => {
824 writeln!(f, "{}Reduce::Basic{}", ctx.indent, annotations)?;
825 ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
826 }
827 }
828 ctx.indented(|ctx| {
829 if let Some(key) = input_key {
830 let key = mode.seq(key, None);
831 let key = CompactScalars(key);
832 writeln!(f, "{}input_key={}", ctx.indent, key)?;
833 }
834 if !matches!(temporal_bucketing_strategy, ArrangementStrategy::Direct) {
835 writeln!(
836 f,
837 "{}temporal_bucketing_strategy={}",
838 ctx.indent, temporal_bucketing_strategy
839 )?;
840 }
841 if key_val_plan.key_plan.deref().is_identity() {
842 writeln!(f, "{}key_plan=id", ctx.indent)?;
843 } else {
844 writeln!(f, "{}key_plan", ctx.indent)?;
845 ctx.indented(|ctx| {
846 let key_plan = mode.expr(key_val_plan.key_plan.deref(), None);
847 key_plan.fmt_text(f, ctx)
848 })?;
849 }
850 if key_val_plan.val_plan.deref().is_identity() {
851 writeln!(f, "{}val_plan=id", ctx.indent)?;
852 } else {
853 writeln!(f, "{}val_plan", ctx.indent)?;
854 ctx.indented(|ctx| {
855 let val_plan = mode.expr(key_val_plan.val_plan.deref(), None);
856 val_plan.fmt_text(f, ctx)
857 })?;
858 }
859 if !mfp_after.is_identity() {
860 writeln!(f, "{}mfp_after", ctx.indent)?;
861 ctx.indented(|ctx| mode.expr(mfp_after, None).fmt_text(f, ctx))?;
862 }
863
864 input.fmt_text(f, ctx)
865 })?;
866 }
867 TopK {
868 input,
869 top_k_plan,
870 temporal_bucketing_strategy,
871 } => {
872 use crate::plan::top_k::TopKPlan;
873 match top_k_plan {
874 TopKPlan::MonotonicTop1(plan) => {
875 write!(f, "{}TopK::MonotonicTop1", ctx.indent)?;
876 if plan.group_key.len() > 0 {
877 let group_by = mode.seq(&plan.group_key, None);
878 let group_by = CompactScalars(group_by);
879 write!(f, " group_by=[{}]", group_by)?;
880 }
881 if plan.order_key.len() > 0 {
882 let order_by = mode.seq(&plan.order_key, None);
883 let order_by = separated(", ", order_by);
884 write!(f, " order_by=[{}]", order_by)?;
885 }
886 if plan.must_consolidate {
887 write!(f, " must_consolidate")?;
888 }
889 }
890 TopKPlan::MonotonicTopK(plan) => {
891 write!(f, "{}TopK::MonotonicTopK", ctx.indent)?;
892 if plan.group_key.len() > 0 {
893 let group_by = mode.seq(&plan.group_key, None);
894 let group_by = CompactScalars(group_by);
895 write!(f, " group_by=[{}]", group_by)?;
896 }
897 if plan.order_key.len() > 0 {
898 let order_by = mode.seq(&plan.order_key, None);
899 let order_by = separated(", ", order_by);
900 write!(f, " order_by=[{}]", order_by)?;
901 }
902 if let Some(limit) = &plan.limit {
903 let limit = mode.expr(limit, None);
904 write!(f, " limit={}", limit)?;
905 }
906 if plan.must_consolidate {
907 write!(f, " must_consolidate")?;
908 }
909 }
910 TopKPlan::Basic(plan) => {
911 write!(f, "{}TopK::Basic", ctx.indent)?;
912 if plan.group_key.len() > 0 {
913 let group_by = mode.seq(&plan.group_key, None);
914 let group_by = CompactScalars(group_by);
915 write!(f, " group_by=[{}]", group_by)?;
916 }
917 if plan.order_key.len() > 0 {
918 let order_by = mode.seq(&plan.order_key, None);
919 let order_by = separated(", ", order_by);
920 write!(f, " order_by=[{}]", order_by)?;
921 }
922 if let Some(limit) = &plan.limit {
923 let limit = mode.expr(limit, None);
924 write!(f, " limit={}", limit)?;
925 }
926 if &plan.offset > &0 {
927 write!(f, " offset={}", plan.offset)?;
928 }
929 }
930 }
931 writeln!(f, "{}", annotations)?;
932 ctx.indented(|ctx| {
933 if !matches!(temporal_bucketing_strategy, ArrangementStrategy::Direct) {
934 writeln!(
935 f,
936 "{}temporal_bucketing_strategy={}",
937 ctx.indent, temporal_bucketing_strategy
938 )?;
939 }
940 input.fmt_text(f, ctx)
941 })?;
942 }
943 Negate { input } => {
944 writeln!(f, "{}Negate{}", ctx.indent, annotations)?;
945 ctx.indented(|ctx| input.fmt_text(f, ctx))?;
946 }
947 Threshold {
948 input,
949 threshold_plan,
950 } => {
951 use crate::plan::threshold::ThresholdPlan;
952 match threshold_plan {
953 ThresholdPlan::Basic(plan) => {
954 let ensure_arrangement = Arrangement::from(&plan.ensure_arrangement);
955 write!(f, "{}Threshold::Basic", ctx.indent)?;
956 write!(f, " ensure_arrangement=")?;
957 ensure_arrangement.fmt_text(f, ctx)?;
958 writeln!(f, "{}", annotations)?;
959 }
960 };
961 ctx.indented(|ctx| input.fmt_text(f, ctx))?;
962 }
963 Union {
964 inputs,
965 consolidate_output,
966 temporal_bucketing_strategies,
967 } => {
968 if *consolidate_output {
969 writeln!(
970 f,
971 "{}Union consolidate_output={}{}",
972 ctx.indent, consolidate_output, annotations
973 )?;
974 } else {
975 writeln!(f, "{}Union{}", ctx.indent, annotations)?;
976 }
977 ctx.indented(|ctx| {
978 if temporal_bucketing_strategies
979 .iter()
980 .any(|s| !matches!(s, ArrangementStrategy::Direct))
981 {
982 let strategies = temporal_bucketing_strategies
983 .iter()
984 .map(|s| format!("{}", s))
985 .collect::<Vec<_>>()
986 .join(", ");
987 writeln!(
988 f,
989 "{}temporal_bucketing_strategies=[{}]",
990 ctx.indent, strategies
991 )?;
992 }
993 for input in inputs.iter() {
994 input.fmt_text(f, ctx)?;
995 }
996 Ok(())
997 })?;
998 }
999 ArrangeBy {
1000 input_key,
1001 input,
1002 input_mfp,
1003 forms,
1004 strategy,
1005 } => {
1006 writeln!(f, "{}ArrangeBy{}", ctx.indent, annotations)?;
1007 ctx.indented(|ctx| {
1008 if let Some(key) = input_key {
1009 let key = mode.seq(key, None);
1010 let key = CompactScalars(key);
1011 writeln!(f, "{}input_key=[{}]", ctx.indent, key)?;
1012 }
1013 if !matches!(strategy, ArrangementStrategy::Direct) {
1014 writeln!(f, "{}strategy={:?}", ctx.indent, strategy)?;
1015 }
1016 mode.expr(input_mfp, None).fmt_text(f, ctx)?;
1017 forms.fmt_text(f, ctx)?;
1018 input.fmt_text(f, ctx)
1020 })?;
1021 }
1022 }
1023
1024 Ok(())
1025 }
1026}
1027
1028impl DisplayText<PlanRenderingContext<'_, Plan>> for AvailableCollections {
1029 fn fmt_text(
1030 &self,
1031 f: &mut fmt::Formatter<'_>,
1032 ctx: &mut PlanRenderingContext<'_, Plan>,
1033 ) -> fmt::Result {
1034 if ctx.config.verbose_syntax {
1035 self.fmt_verbose_text(f, ctx)
1036 } else {
1037 self.fmt_default_text(f, ctx)
1038 }
1039 }
1040}
1041impl AvailableCollections {
1042 fn fmt_default_text(
1043 &self,
1044 f: &mut fmt::Formatter<'_>,
1045 ctx: &mut PlanRenderingContext<'_, Plan>,
1046 ) -> fmt::Result {
1047 let plural = if self.arranged.len() == 1 { "" } else { "s" };
1048 write!(
1049 f,
1050 "{}Keys: {} arrangement{plural} available",
1051 ctx.indent,
1052 self.arranged.len()
1053 )?;
1054
1055 if self.raw {
1056 writeln!(f, ", plus raw stream")?;
1057 } else {
1058 writeln!(f, ", no raw stream")?;
1059 }
1060
1061 ctx.indented(|ctx| {
1062 for (i, arrangement) in self.arranged.iter().enumerate() {
1063 let arrangement = Arrangement::from(arrangement);
1064 write!(f, "{}Arrangement {i}: ", ctx.indent)?;
1065 arrangement.fmt_text(f, ctx)?;
1066 writeln!(f, "")?;
1067 }
1068 Ok(())
1069 })?;
1070
1071 Ok(())
1072 }
1073
1074 fn fmt_verbose_text(
1075 &self,
1076 f: &mut fmt::Formatter<'_>,
1077 ctx: &mut PlanRenderingContext<'_, Plan>,
1078 ) -> fmt::Result {
1079 let raw = &self.raw;
1081 writeln!(f, "{}raw={}", ctx.indent, raw)?;
1082 for (i, arrangement) in self.arranged.iter().enumerate() {
1084 let arrangement = Arrangement::from(arrangement);
1085 write!(f, "{}arrangements[{}]=", ctx.indent, i)?;
1086 arrangement.fmt_text(f, ctx)?;
1087 writeln!(f, "")?;
1088 }
1089 Ok(())
1090 }
1091}
1092
1093fn fmt_join_chain<'a, I>(
1101 f: &mut fmt::Formatter<'_>,
1102 humanizer: &dyn ExprHumanizer,
1103 mode: &HumanizedExplain,
1104 inputs: &[Plan],
1105 source_relation: usize,
1106 source_key: Option<&'a Vec<MirScalarExpr>>,
1107 stages: I,
1108) -> fmt::Result
1109where
1110 I: IntoIterator<Item = (usize, &'a Vec<MirScalarExpr>)>,
1111{
1112 write!(
1113 f,
1114 "{}",
1115 humanize_input_name(humanizer, &inputs[source_relation], source_relation)
1116 )?;
1117 if let Some(key) = source_key {
1118 fmt_join_key_brackets(f, mode, key)?;
1119 }
1120 for (lookup_relation, lookup_key) in stages {
1121 write!(
1122 f,
1123 " » {}",
1124 humanize_input_name(humanizer, &inputs[lookup_relation], lookup_relation)
1125 )?;
1126 fmt_join_key_brackets(f, mode, lookup_key)?;
1127 }
1128 Ok(())
1129}
1130
1131fn fmt_join_key_brackets(
1133 f: &mut fmt::Formatter<'_>,
1134 mode: &HumanizedExplain,
1135 key: &Vec<MirScalarExpr>,
1136) -> fmt::Result {
1137 if key.is_empty() {
1138 write!(f, "[×]")
1139 } else {
1140 let key = CompactScalars(mode.seq(key, None));
1141 write!(f, "[{key}]")
1142 }
1143}
1144
1145fn humanize_input_name(humanizer: &dyn ExprHumanizer, plan: &Plan, pos: usize) -> String {
1149 fn dig(humanizer: &dyn ExprHumanizer, plan: &Plan) -> Option<String> {
1150 use crate::plan::PlanNode::*;
1151 match &plan.node {
1152 Get { id, .. } => match id {
1153 Id::Local(lid) => Some(lid.to_string()),
1154 Id::Global(gid) => Some(
1155 humanizer
1156 .humanize_id_unqualified(*gid)
1157 .unwrap_or_else(|| gid.to_string()),
1158 ),
1159 },
1160 ArrangeBy { input, .. } => dig(humanizer, input),
1162 Mfp { input, .. } => dig(humanizer, input),
1163 _ => None,
1164 }
1165 }
1166 match dig(humanizer, plan) {
1167 Some(name) => format!("%{pos}:{name}"),
1168 None => format!("%{pos}"),
1169 }
1170}
1171
1172impl DisplayText<PlanRenderingContext<'_, Plan>> for LinearJoinPlan {
1173 fn fmt_text(
1174 &self,
1175 f: &mut fmt::Formatter<'_>,
1176 ctx: &mut PlanRenderingContext<'_, Plan>,
1177 ) -> fmt::Result {
1178 if ctx.config.verbose_syntax {
1179 self.fmt_verbose_text(f, ctx)
1180 } else {
1181 self.fmt_default_text(f, ctx)
1182 }
1183 }
1184}
1185impl LinearJoinPlan {
1186 fn has_cross_stage(&self) -> bool {
1188 self.stage_plans.iter().any(|s| s.lookup_key.is_empty())
1189 }
1190
1191 #[allow(clippy::needless_pass_by_ref_mut)]
1192 fn fmt_default_text(
1193 &self,
1194 f: &mut fmt::Formatter<'_>,
1195 ctx: &mut PlanRenderingContext<'_, Plan>,
1196 ) -> fmt::Result {
1197 for stage in self.stage_plans.iter() {
1201 if stage.closure.maps_or_filters() {
1202 writeln!(f, "{}after %{}:", ctx.indent, stage.lookup_relation)?;
1203 ctx.indented(|ctx| stage.closure.fmt_default_text(f, ctx))?;
1204 }
1205 }
1206 if let Some(final_closure) = &self.final_closure {
1207 if final_closure.maps_or_filters() {
1208 writeln!(f, "{}Final closure:", ctx.indent)?;
1209 ctx.indented(|ctx| final_closure.fmt_default_text(f, ctx))?;
1210 }
1211 }
1212 Ok(())
1213 }
1214
1215 fn fmt_verbose_text(
1216 &self,
1217 f: &mut fmt::Formatter<'_>,
1218 ctx: &mut PlanRenderingContext<'_, Plan>,
1219 ) -> fmt::Result {
1220 let mode = HumanizedExplain::new(ctx.config.redacted);
1221 let plan = self;
1222 if let Some(closure) = plan.final_closure.as_ref() {
1223 if !closure.is_identity() {
1224 writeln!(f, "{}final_closure", ctx.indent)?;
1225 ctx.indented(|ctx| closure.fmt_text(f, ctx))?;
1226 }
1227 }
1228 for (i, plan) in plan.stage_plans.iter().enumerate() {
1229 writeln!(f, "{}linear_stage[{}]", ctx.indent, i)?;
1230 ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1231 }
1232 if let Some(closure) = plan.initial_closure.as_ref() {
1233 if !closure.is_identity() {
1234 writeln!(f, "{}initial_closure", ctx.indent)?;
1235 ctx.indented(|ctx| closure.fmt_text(f, ctx))?;
1236 }
1237 }
1238 match &plan.source_key {
1239 Some(source_key) => {
1240 let source_key = mode.seq(source_key, None);
1241 let source_key = CompactScalars(source_key);
1242 writeln!(
1243 f,
1244 "{}source={{ relation={}, key=[{}] }}",
1245 ctx.indent, &plan.source_relation, source_key
1246 )?
1247 }
1248 None => writeln!(
1249 f,
1250 "{}source={{ relation={}, key=[] }}",
1251 ctx.indent, &plan.source_relation
1252 )?,
1253 };
1254 Ok(())
1255 }
1256}
1257
1258impl DisplayText<PlanRenderingContext<'_, Plan>> for LinearStagePlan {
1259 fn fmt_text(
1260 &self,
1261 f: &mut fmt::Formatter<'_>,
1262 ctx: &mut PlanRenderingContext<'_, Plan>,
1263 ) -> fmt::Result {
1264 if ctx.config.verbose_syntax {
1265 self.fmt_verbose_text(f, ctx)
1266 } else {
1267 self.fmt_default_text(f, ctx)
1268 }
1269 }
1270}
1271impl LinearStagePlan {
1272 #[allow(clippy::needless_pass_by_ref_mut)]
1273 fn fmt_default_text(
1274 &self,
1275 f: &mut fmt::Formatter<'_>,
1276 ctx: &mut PlanRenderingContext<'_, Plan>,
1277 ) -> fmt::Result {
1278 let lookup_relation = &self.lookup_relation;
1281 if !self.lookup_key.is_empty() {
1282 let lookup_key = CompactScalarSeq(&self.lookup_key);
1283 writeln!(
1284 f,
1285 "{}Lookup key {lookup_key} in %{lookup_relation}",
1286 ctx.indent
1287 )
1288 } else {
1289 writeln!(f, "{}Lookup in %{lookup_relation}", ctx.indent)
1290 }
1291 }
1292
1293 fn fmt_verbose_text(
1294 &self,
1295 f: &mut fmt::Formatter<'_>,
1296 ctx: &mut PlanRenderingContext<'_, Plan>,
1297 ) -> fmt::Result {
1298 let mode = HumanizedExplain::new(ctx.config.redacted);
1299
1300 let plan = self;
1301 if !plan.closure.is_identity() {
1302 writeln!(f, "{}closure", ctx.indent)?;
1303 ctx.indented(|ctx| plan.closure.fmt_text(f, ctx))?;
1304 }
1305 {
1306 let lookup_relation = &plan.lookup_relation;
1307 let lookup_key = CompactScalarSeq(&plan.lookup_key);
1308 writeln!(
1309 f,
1310 "{}lookup={{ relation={}, key=[{}] }}",
1311 ctx.indent, lookup_relation, lookup_key
1312 )?;
1313 }
1314 {
1315 let stream_key = mode.seq(&plan.stream_key, None);
1316 let stream_key = CompactScalars(stream_key);
1317 let stream_thinning = Indices(&plan.stream_thinning);
1318 writeln!(
1319 f,
1320 "{}stream={{ key=[{}], thinning=({}) }}",
1321 ctx.indent, stream_key, stream_thinning
1322 )?;
1323 }
1324 Ok(())
1325 }
1326}
1327
1328impl DisplayText<PlanRenderingContext<'_, Plan>> for DeltaJoinPlan {
1329 fn fmt_text(
1330 &self,
1331 f: &mut fmt::Formatter<'_>,
1332 ctx: &mut PlanRenderingContext<'_, Plan>,
1333 ) -> fmt::Result {
1334 if ctx.config.verbose_syntax {
1335 self.fmt_verbose_text(f, ctx)
1336 } else {
1337 self.fmt_default_text(f, ctx)
1338 }
1339 }
1340}
1341impl DeltaJoinPlan {
1342 fn has_cross_stage(&self) -> bool {
1344 self.path_plans
1345 .iter()
1346 .any(|p| p.stage_plans.iter().any(|s| s.lookup_key.is_empty()))
1347 }
1348
1349 fn fmt_default_text(
1350 &self,
1351 f: &mut fmt::Formatter<'_>,
1352 ctx: &mut PlanRenderingContext<'_, Plan>,
1353 ) -> fmt::Result {
1354 for plan in self.path_plans.iter() {
1358 let has_stage_closure = plan.stage_plans.iter().any(|s| s.closure.maps_or_filters());
1359 let has_final_closure = plan
1360 .final_closure
1361 .as_ref()
1362 .is_some_and(|c| c.maps_or_filters());
1363 if !has_stage_closure && !has_final_closure {
1364 continue;
1365 }
1366 writeln!(f, "{}path %{}:", ctx.indent, plan.source_relation)?;
1367 ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1368 }
1369 Ok(())
1370 }
1371
1372 fn fmt_verbose_text(
1373 &self,
1374 f: &mut fmt::Formatter<'_>,
1375 ctx: &mut PlanRenderingContext<'_, Plan>,
1376 ) -> fmt::Result {
1377 for (i, plan) in self.path_plans.iter().enumerate() {
1378 writeln!(f, "{}plan_path[{}]", ctx.indent, i)?;
1379 ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1380 }
1381 Ok(())
1382 }
1383}
1384
1385impl DisplayText<PlanRenderingContext<'_, Plan>> for DeltaPathPlan {
1386 fn fmt_text(
1387 &self,
1388 f: &mut fmt::Formatter<'_>,
1389 ctx: &mut PlanRenderingContext<'_, Plan>,
1390 ) -> fmt::Result {
1391 if ctx.config.verbose_syntax {
1392 self.fmt_verbose_text(f, ctx)
1393 } else {
1394 self.fmt_default_text(f, ctx)
1395 }
1396 }
1397}
1398
1399impl DeltaPathPlan {
1400 #[allow(clippy::needless_pass_by_ref_mut)]
1401 fn fmt_default_text(
1402 &self,
1403 f: &mut fmt::Formatter<'_>,
1404 ctx: &mut PlanRenderingContext<'_, Plan>,
1405 ) -> fmt::Result {
1406 for stage in self.stage_plans.iter() {
1407 if stage.closure.maps_or_filters() {
1408 writeln!(f, "{}after %{}:", ctx.indent, stage.lookup_relation)?;
1409 ctx.indented(|ctx| stage.closure.fmt_default_text(f, ctx))?;
1410 }
1411 }
1412 if let Some(final_closure) = &self.final_closure {
1413 if final_closure.maps_or_filters() {
1414 writeln!(f, "{}Final closure:", ctx.indent)?;
1415 ctx.indented(|ctx| final_closure.fmt_default_text(f, ctx))?;
1416 }
1417 }
1418 Ok(())
1419 }
1420
1421 fn fmt_verbose_text(
1422 &self,
1423 f: &mut fmt::Formatter<'_>,
1424 ctx: &mut PlanRenderingContext<'_, Plan>,
1425 ) -> fmt::Result {
1426 let mode = HumanizedExplain::new(ctx.config.redacted);
1427 let plan = self;
1428 if let Some(closure) = plan.final_closure.as_ref() {
1429 if !closure.is_identity() {
1430 writeln!(f, "{}final_closure", ctx.indent)?;
1431 ctx.indented(|ctx| closure.fmt_text(f, ctx))?;
1432 }
1433 }
1434 for (i, plan) in plan.stage_plans.iter().enumerate().rev() {
1435 writeln!(f, "{}delta_stage[{}]", ctx.indent, i)?;
1436 ctx.indented(|ctx| plan.fmt_text(f, ctx))?;
1437 }
1438 if !plan.initial_closure.is_identity() {
1439 writeln!(f, "{}initial_closure", ctx.indent)?;
1440 ctx.indented(|ctx| plan.initial_closure.fmt_text(f, ctx))?;
1441 }
1442 {
1443 let source_relation = &plan.source_relation;
1444 let source_key = mode.seq(&plan.source_key, None);
1445 let source_key = CompactScalars(source_key);
1446 writeln!(
1447 f,
1448 "{}source={{ relation={}, key=[{}] }}",
1449 ctx.indent, source_relation, source_key
1450 )?;
1451 }
1452 Ok(())
1453 }
1454}
1455
1456impl DisplayText<PlanRenderingContext<'_, Plan>> for DeltaStagePlan {
1457 fn fmt_text(
1458 &self,
1459 f: &mut fmt::Formatter<'_>,
1460 ctx: &mut PlanRenderingContext<'_, Plan>,
1461 ) -> fmt::Result {
1462 if ctx.config.verbose_syntax {
1463 self.fmt_verbose_text(f, ctx)
1464 } else {
1465 self.fmt_default_text(f, ctx)
1466 }
1467 }
1468}
1469impl DeltaStagePlan {
1470 #[allow(clippy::needless_pass_by_ref_mut)]
1471 fn fmt_default_text(
1472 &self,
1473 f: &mut fmt::Formatter<'_>,
1474 ctx: &mut PlanRenderingContext<'_, Plan>,
1475 ) -> fmt::Result {
1476 let lookup_relation = &self.lookup_relation;
1479 let lookup_key = CompactScalarSeq(&self.lookup_key);
1480 writeln!(
1481 f,
1482 "{}Lookup key {lookup_key} in %{lookup_relation}",
1483 ctx.indent
1484 )
1485 }
1486
1487 fn fmt_verbose_text(
1488 &self,
1489 f: &mut fmt::Formatter<'_>,
1490 ctx: &mut PlanRenderingContext<'_, Plan>,
1491 ) -> fmt::Result {
1492 let mode = HumanizedExplain::new(ctx.config.redacted);
1493 let plan = self;
1494 if !plan.closure.is_identity() {
1495 writeln!(f, "{}closure", ctx.indent)?;
1496 ctx.indented(|ctx| plan.closure.fmt_text(f, ctx))?;
1497 }
1498 {
1499 let lookup_relation = &plan.lookup_relation;
1500 let lookup_key = mode.seq(&plan.lookup_key, None);
1501 let lookup_key = CompactScalars(lookup_key);
1502 writeln!(
1503 f,
1504 "{}lookup={{ relation={}, key=[{}] }}",
1505 ctx.indent, lookup_relation, lookup_key
1506 )?;
1507 }
1508 {
1509 let stream_key = mode.seq(&plan.stream_key, None);
1510 let stream_key = CompactScalars(stream_key);
1511 let stream_thinning = mode.seq(&plan.stream_thinning, None);
1512 let stream_thinning = CompactScalars(stream_thinning);
1513 writeln!(
1514 f,
1515 "{}stream={{ key=[{}], thinning=({}) }}",
1516 ctx.indent, stream_key, stream_thinning
1517 )?;
1518 }
1519 Ok(())
1520 }
1521}
1522
1523impl DisplayText<PlanRenderingContext<'_, Plan>> for JoinClosure {
1524 fn fmt_text(
1525 &self,
1526 f: &mut fmt::Formatter<'_>,
1527 ctx: &mut PlanRenderingContext<'_, Plan>,
1528 ) -> fmt::Result {
1529 if ctx.config.verbose_syntax {
1530 self.fmt_verbose_text(f, ctx)
1531 } else {
1532 self.fmt_default_text(f, ctx)
1533 }
1534 }
1535}
1536impl JoinClosure {
1537 fn fmt_default_text(
1538 &self,
1539 f: &mut fmt::Formatter<'_>,
1540 ctx: &mut PlanRenderingContext<'_, Plan>,
1541 ) -> fmt::Result {
1542 let mode = HumanizedExplain::new(ctx.config.redacted);
1543 if !self.before.is_identity() {
1544 mode.expr(self.before.deref(), None)
1545 .fmt_default_text(f, ctx)?;
1546 }
1547 if !self.ready_equivalences.is_empty() {
1548 let equivalences = separated(
1549 " AND ",
1550 self.ready_equivalences
1551 .iter()
1552 .map(|equivalence| separated(" = ", mode.seq(equivalence, None))),
1553 );
1554 writeln!(f, "{}Equivalences: {equivalences}", ctx.indent)?;
1555 }
1556 Ok(())
1557 }
1558
1559 fn fmt_verbose_text(
1560 &self,
1561 f: &mut fmt::Formatter<'_>,
1562 ctx: &mut PlanRenderingContext<'_, Plan>,
1563 ) -> fmt::Result {
1564 let mode = HumanizedExplain::new(ctx.config.redacted);
1565 mode.expr(self.before.deref(), None).fmt_text(f, ctx)?;
1566 if !self.ready_equivalences.is_empty() {
1567 let equivalences = separated(
1568 " AND ",
1569 self.ready_equivalences
1570 .iter()
1571 .map(|equivalence| separated(" = ", mode.seq(equivalence, None))),
1572 );
1573 writeln!(f, "{}ready_equivalences={}", ctx.indent, equivalences)?;
1574 }
1575 Ok(())
1576 }
1577}
1578
1579impl DisplayText<PlanRenderingContext<'_, Plan>> for AccumulablePlan {
1580 fn fmt_text(
1581 &self,
1582 f: &mut fmt::Formatter<'_>,
1583 ctx: &mut PlanRenderingContext<'_, Plan>,
1584 ) -> fmt::Result {
1585 if ctx.config.verbose_syntax {
1586 self.fmt_verbose_text(f, ctx)
1587 } else {
1588 self.fmt_default_text(f, ctx)
1589 }
1590 }
1591}
1592impl AccumulablePlan {
1593 #[allow(clippy::needless_pass_by_ref_mut)]
1594 fn fmt_default_text(
1595 &self,
1596 f: &mut fmt::Formatter<'_>,
1597 ctx: &mut PlanRenderingContext<'_, Plan>,
1598 ) -> fmt::Result {
1599 let mode = HumanizedExplain::new(ctx.config.redacted);
1600
1601 if !self.simple_aggrs.is_empty() {
1602 let simple_aggrs = self
1603 .simple_aggrs
1604 .iter()
1605 .map(|(_i_datum, agg)| mode.expr(agg, None));
1606 let simple_aggrs = separated(", ", simple_aggrs);
1607 writeln!(f, "{}Simple aggregates: {simple_aggrs}", ctx.indent)?;
1608 }
1609
1610 if !self.distinct_aggrs.is_empty() {
1611 let distinct_aggrs = self
1612 .distinct_aggrs
1613 .iter()
1614 .map(|(_i_datum, agg)| mode.expr(agg, None));
1615 let distinct_aggrs = separated(", ", distinct_aggrs);
1616 writeln!(f, "{}Distinct aggregates: {distinct_aggrs}", ctx.indent)?;
1617 }
1618 Ok(())
1619 }
1620
1621 #[allow(clippy::needless_pass_by_ref_mut)]
1622 fn fmt_verbose_text(
1623 &self,
1624 f: &mut fmt::Formatter<'_>,
1625 ctx: &mut PlanRenderingContext<'_, Plan>,
1626 ) -> fmt::Result {
1627 let mode = HumanizedExplain::new(ctx.config.redacted);
1628 for (i, (i_datum, agg)) in self.simple_aggrs.iter().enumerate() {
1636 let agg = mode.expr(agg, None);
1637 write!(f, "{}simple_aggrs[{}]=", ctx.indent, i)?;
1638 writeln!(f, "({}, {})", i_datum, agg)?;
1639 }
1640 for (i, (i_datum, agg)) in self.distinct_aggrs.iter().enumerate() {
1642 let agg = mode.expr(agg, None);
1643 write!(f, "{}distinct_aggrs[{}]=", ctx.indent, i)?;
1644 writeln!(f, "({}, {})", i_datum, agg)?;
1645 }
1646 Ok(())
1647 }
1648}
1649
1650impl DisplayText<PlanRenderingContext<'_, Plan>> for HierarchicalPlan {
1651 fn fmt_text(
1652 &self,
1653 f: &mut fmt::Formatter<'_>,
1654 ctx: &mut PlanRenderingContext<'_, Plan>,
1655 ) -> fmt::Result {
1656 if ctx.config.verbose_syntax {
1657 self.fmt_verbose_text(f, ctx)
1658 } else {
1659 self.fmt_default_text(f, ctx)
1660 }
1661 }
1662}
1663impl HierarchicalPlan {
1664 #[allow(clippy::needless_pass_by_ref_mut)]
1665 fn fmt_default_text(
1666 &self,
1667 f: &mut fmt::Formatter<'_>,
1668 ctx: &mut PlanRenderingContext<'_, Plan>,
1669 ) -> fmt::Result {
1670 let mode = HumanizedExplain::new(ctx.config.redacted);
1671 let aggr_funcs = mode.seq(self.aggr_funcs(), None);
1672 let aggr_funcs = separated(", ", aggr_funcs);
1673 writeln!(f, "{}Aggregations: {aggr_funcs}", ctx.indent)
1674 }
1675
1676 #[allow(clippy::needless_pass_by_ref_mut)]
1677 fn fmt_verbose_text(
1678 &self,
1679 f: &mut fmt::Formatter<'_>,
1680 ctx: &mut PlanRenderingContext<'_, Plan>,
1681 ) -> fmt::Result {
1682 let mode = HumanizedExplain::new(ctx.config.redacted);
1683 match self {
1684 HierarchicalPlan::Monotonic(plan) => {
1685 let aggr_funcs = mode.seq(&plan.aggr_funcs, None);
1686 let aggr_funcs = separated(", ", aggr_funcs);
1687 writeln!(f, "{}aggr_funcs=[{}]", ctx.indent, aggr_funcs)?;
1688 writeln!(f, "{}monotonic", ctx.indent)?;
1689 if plan.must_consolidate {
1690 writeln!(f, "{}must_consolidate", ctx.indent)?;
1691 }
1692 }
1693 HierarchicalPlan::Bucketed(plan) => {
1694 let aggr_funcs = mode.seq(&plan.aggr_funcs, None);
1695 let aggr_funcs = separated(", ", aggr_funcs);
1696 writeln!(f, "{}aggr_funcs=[{}]", ctx.indent, aggr_funcs)?;
1697 let buckets = separated(", ", &plan.buckets);
1698 writeln!(f, "{}buckets=[{}]", ctx.indent, buckets)?;
1699 }
1700 }
1701 Ok(())
1702 }
1703}
1704
1705impl DisplayText<PlanRenderingContext<'_, Plan>> for BasicPlan {
1706 fn fmt_text(
1707 &self,
1708 f: &mut fmt::Formatter<'_>,
1709 ctx: &mut PlanRenderingContext<'_, Plan>,
1710 ) -> fmt::Result {
1711 if ctx.config.verbose_syntax {
1712 self.fmt_verbose_text(f, ctx)
1713 } else {
1714 self.fmt_default_text(f, ctx)
1715 }
1716 }
1717}
1718impl BasicPlan {
1719 #[allow(clippy::needless_pass_by_ref_mut)]
1720 fn fmt_default_text(
1721 &self,
1722 f: &mut fmt::Formatter<'_>,
1723 ctx: &mut PlanRenderingContext<'_, Plan>,
1724 ) -> fmt::Result {
1725 let mode = HumanizedExplain::new(ctx.config.redacted);
1726 match self {
1727 BasicPlan::Single(SingleBasicPlan {
1728 expr,
1729 fused_unnest_list: _,
1730 }) => {
1731 let agg = mode.expr(expr, None);
1732 writeln!(f, "{}Aggregation: {agg}", ctx.indent)?;
1733 }
1734 BasicPlan::Multiple(aggs) => {
1735 let mode = HumanizedExplain::new(ctx.config.redacted);
1736 write!(f, "{}Aggregations:", ctx.indent)?;
1737
1738 for agg in aggs.iter() {
1739 let agg = mode.expr(agg, None);
1740 write!(f, " {agg}")?;
1741 }
1742 writeln!(f)?;
1743 }
1744 }
1745 Ok(())
1746 }
1747
1748 #[allow(clippy::needless_pass_by_ref_mut)]
1749 fn fmt_verbose_text(
1750 &self,
1751 f: &mut fmt::Formatter<'_>,
1752 ctx: &mut PlanRenderingContext<'_, Plan>,
1753 ) -> fmt::Result {
1754 let mode = HumanizedExplain::new(ctx.config.redacted);
1755 match self {
1756 BasicPlan::Single(SingleBasicPlan {
1757 expr,
1758 fused_unnest_list,
1759 }) => {
1760 let agg = mode.expr(expr, None);
1761 let fused_unnest_list = if *fused_unnest_list {
1762 ", fused_unnest_list=true"
1763 } else {
1764 ""
1765 };
1766 writeln!(f, "{}aggr=({}{})", ctx.indent, agg, fused_unnest_list)?;
1767 }
1768 BasicPlan::Multiple(aggs) => {
1769 for (i, agg) in aggs.iter().enumerate() {
1770 let agg = mode.expr(agg, None);
1771 writeln!(f, "{}aggrs[{}]={}", ctx.indent, i, agg)?;
1772 }
1773 }
1774 }
1775 Ok(())
1776 }
1777}
1778
1779struct Arrangement<'a> {
1781 key: &'a Vec<MirScalarExpr>,
1782 permutation: Permutation<'a>,
1783 thinning: &'a Vec<usize>,
1784}
1785
1786impl<'a> From<&'a (Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)> for Arrangement<'a> {
1787 fn from(
1788 (key, permutation, thinning): &'a (Vec<MirScalarExpr>, Vec<usize>, Vec<usize>),
1789 ) -> Self {
1790 Arrangement {
1791 key,
1792 permutation: Permutation(permutation),
1793 thinning,
1794 }
1795 }
1796}
1797
1798impl<'a> DisplayText<PlanRenderingContext<'_, Plan>> for Arrangement<'a> {
1799 fn fmt_text(
1800 &self,
1801 f: &mut fmt::Formatter<'_>,
1802 ctx: &mut PlanRenderingContext<'_, Plan>,
1803 ) -> fmt::Result {
1804 if ctx.config.verbose_syntax {
1805 self.fmt_verbose_text(f, ctx)
1806 } else {
1807 self.fmt_default_text(f, ctx)
1808 }
1809 }
1810}
1811
1812impl<'a> Arrangement<'a> {
1813 #[allow(clippy::needless_pass_by_ref_mut)]
1814 fn fmt_default_text(
1815 &self,
1816 f: &mut fmt::Formatter<'_>,
1817 ctx: &PlanRenderingContext<'_, Plan>,
1818 ) -> fmt::Result {
1819 let mode = HumanizedExplain::new(ctx.config.redacted);
1820 if !self.key.is_empty() {
1821 let key = mode.seq(self.key, None);
1822 let key = CompactScalars(key);
1823 write!(f, "{key}")
1824 } else {
1825 write!(f, "(empty key)")
1826 }
1827 }
1828
1829 #[allow(clippy::needless_pass_by_ref_mut)]
1830 fn fmt_verbose_text(
1831 &self,
1832 f: &mut fmt::Formatter<'_>,
1833 ctx: &PlanRenderingContext<'_, Plan>,
1834 ) -> fmt::Result {
1835 let mode = HumanizedExplain::new(ctx.config.redacted);
1836 let key = mode.seq(self.key, None);
1838 let key = CompactScalars(key);
1839 let permutation = &self.permutation;
1841 let thinning = Indices(self.thinning);
1843 write!(
1845 f,
1846 "{{ key=[{}], permutation={}, thinning=({}) }}",
1847 key, permutation, thinning
1848 )
1849 }
1850}
1851
1852struct Permutation<'a>(&'a Vec<usize>);
1854
1855impl<'a> fmt::Display for Permutation<'a> {
1856 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1857 let mut pairs = vec![];
1858 for (x, y) in self.0.iter().enumerate().filter(|(x, y)| x != *y) {
1859 pairs.push(format!("#{}: #{}", x, y));
1860 }
1861
1862 if pairs.len() > 0 {
1863 write!(f, "{{{}}}", separated(", ", pairs))
1864 } else {
1865 write!(f, "id")
1866 }
1867 }
1868}
1869
1870struct PlanAnnotations {
1872 config: ExplainConfig,
1873 node_id: LirId,
1874}
1875
1876impl PlanAnnotations {
1882 fn new(config: ExplainConfig, plan: &Plan) -> Self {
1883 let node_id = plan.lir_id;
1884 Self { config, node_id }
1885 }
1886}
1887
1888impl fmt::Display for PlanAnnotations {
1889 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1890 if self.config.node_ids {
1891 f.debug_struct(" //")
1892 .field("node_id", &self.node_id)
1893 .finish()
1894 } else {
1895 Ok(())
1897 }
1898 }
1899}