1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Tracing utilities for explainable plans.

use std::fmt::Debug;

use mz_compute_client::{plan::Plan, types::dataflows::DataflowDescription};
use mz_expr::{MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr, RowSetFinishing};
use mz_repr::explain_new::{
    text_string, DisplayText, Explain, ExplainConfig, ExplainError, ExplainFormat, PlanTrace,
    TraceEntry,
};
use mz_sql::plan::{HirRelationExpr, HirScalarExpr};
use tracing::dispatcher::{self, with_default};
use tracing_subscriber::prelude::*;

use crate::{catalog::ConnCatalog, coord::peek::FastPathPlan};

use super::{ExplainContext, Explainable, UsedIndexes};

/// Provides functionality for tracing plans generated by the execution of an
/// optimization pipeline.
///
/// Internally, this will create a layered [`tracing::subscriber::Subscriber`]
/// consisting of one layer for each supported plan type `T`.
///
/// The [`OptimizerTrace::collect_trace`] method on the created instance can be
/// then used to collect the trace, and [`OptimizerTrace::drain_all`] to obtain
/// the collected trace as a vector of [`TraceEntry`] instances.
pub(crate) struct OptimizerTrace(dispatcher::Dispatch);

impl OptimizerTrace {
    /// Create a new [`OptimizerTrace`].
    pub fn new() -> OptimizerTrace {
        let subscriber = tracing_subscriber::registry()
            // Collect `explain_plan` types that are not used in the regular explain
            // path, but are useful when instrumenting code for debugging purpuses.
            .with(PlanTrace::<String>::new())
            .with(PlanTrace::<HirScalarExpr>::new())
            .with(PlanTrace::<MirScalarExpr>::new())
            // Collect `explain_plan` types that are used in the regular explain path.
            .with(PlanTrace::<HirRelationExpr>::new())
            .with(PlanTrace::<MirRelationExpr>::new())
            .with(PlanTrace::<DataflowDescription<OptimizedMirRelationExpr>>::new())
            .with(PlanTrace::<DataflowDescription<Plan>>::new());

        OptimizerTrace(dispatcher::Dispatch::new(subscriber))
    }

    /// Create a new [`OptimizerTrace`] that will only accumulate [`TraceEntry`]
    /// instances along the prefix of the given `path`.
    pub fn find(path: &'static str) -> OptimizerTrace {
        let subscriber = tracing_subscriber::registry()
            // Collect `explain_plan` types that are not used in the regular explain
            // path, but are useful when instrumenting code for debugging purpuses.
            .with(PlanTrace::<String>::find(path))
            .with(PlanTrace::<HirScalarExpr>::find(path))
            .with(PlanTrace::<MirScalarExpr>::find(path))
            // Collect `explain_plan` types that are used in the regular explain path.
            .with(PlanTrace::<HirRelationExpr>::find(path))
            .with(PlanTrace::<MirRelationExpr>::find(path))
            .with(PlanTrace::<DataflowDescription<OptimizedMirRelationExpr>>::find(path))
            .with(PlanTrace::<DataflowDescription<Plan>>::find(path));

        OptimizerTrace(dispatcher::Dispatch::new(subscriber))
    }

    /// Run the given optimization `pipeline` once and collect a trace of all
    /// plans produced during that run.
    pub fn collect_trace<T>(&self, pipeline: impl FnOnce() -> T) -> T {
        with_default(&self.0, pipeline)
    }

    /// Collect all traced plans for all plan types `T` that are available in
    /// the wrapped [`dispatcher::Dispatch`].
    pub fn drain_all(
        self,
        format: ExplainFormat,
        config: ExplainConfig,
        catalog: ConnCatalog,
        row_set_finishing: Option<RowSetFinishing>,
        used_indexes: Vec<mz_repr::GlobalId>,
        fast_path_plan: Option<FastPathPlan>,
    ) -> Result<Vec<TraceEntry<String>>, ExplainError> {
        let mut results = vec![];

        let context = ExplainContext {
            config: &config,
            humanizer: &catalog,
            used_indexes: UsedIndexes::new(vec![]),
            finishing: row_set_finishing.clone(),
        };

        // Drain trace entries of types produced by local optimizer stages.
        results.extend(itertools::chain!(
            self.drain_explainable_entries::<HirRelationExpr>(&format, &context, &None)?,
            self.drain_explainable_entries::<MirRelationExpr>(&format, &context, &None)?,
        ));

        // Drain trace entries of types produced by global optimizer stages.
        let context = ExplainContext {
            config: &config,
            humanizer: &catalog,
            used_indexes: UsedIndexes::new(used_indexes),
            finishing: row_set_finishing,
        };
        let fast_path_plan = match fast_path_plan {
            Some(mut plan) if !context.config.no_fast_path => {
                Some(Explainable::new(&mut plan).explain(&format, context.config, &context)?)
            }
            _ => None,
        };
        results.extend(itertools::chain!(
            self.drain_explainable_entries::<DataflowDescription<OptimizedMirRelationExpr>>(
                &format,
                &context,
                &fast_path_plan
            )?,
            self.drain_explainable_entries::<DataflowDescription<Plan>>(
                &format,
                &context,
                &fast_path_plan
            )?,
        ));

        // Drain trace entries of type String, HirScalarExpr, MirScalarExpr
        // which are useful for ad-hoc debugging.
        results.extend(itertools::chain!(
            self.drain_scalar_entries::<HirScalarExpr>(),
            self.drain_scalar_entries::<MirScalarExpr>(),
            self.drain_string_entries(),
        ));

        // sort plans by instant (TODO: this can be implemented in a more
        // efficient way, as we can assume that each of the runs that are used
        // to `*.extend` the `results` vector is already sorted).
        results.sort_by_key(|x| x.instant);

        Ok(results)
    }

    /// Collect all trace entries of a plan type `T` that implements
    /// [`Explainable`].
    fn drain_explainable_entries<T>(
        &self,
        format: &ExplainFormat,
        context: &ExplainContext,
        fast_path_plan: &Option<String>,
    ) -> Result<Vec<TraceEntry<String>>, ExplainError>
    where
        T: Clone + Debug + 'static,
        for<'a> Explainable<'a, T>: Explain<'a, Context = ExplainContext<'a>>,
    {
        if let Some(trace) = self.0.downcast_ref::<PlanTrace<T>>() {
            trace
                .drain_as_vec()
                .into_iter()
                .map(|mut entry| match fast_path_plan {
                    Some(fast_path_plan) if !context.config.no_fast_path => Ok(TraceEntry {
                        instant: entry.instant,
                        duration: entry.duration,
                        path: entry.path,
                        plan: fast_path_plan.clone(),
                    }),
                    _ => Ok(TraceEntry {
                        instant: entry.instant,
                        duration: entry.duration,
                        path: entry.path,
                        plan: Explainable::new(&mut entry.plan).explain(
                            format,
                            context.config,
                            context,
                        )?,
                    }),
                })
                .collect()
        } else {
            unreachable!("drain_explainable_entries called with wrong plan type T");
        }
    }

    /// Collect all trace entries of a plan type `T`.
    fn drain_scalar_entries<T>(&self) -> Vec<TraceEntry<String>>
    where
        T: Clone + Debug + 'static,
        T: DisplayText,
    {
        if let Some(trace) = self.0.downcast_ref::<PlanTrace<T>>() {
            trace
                .drain_as_vec()
                .into_iter()
                .map(|entry| TraceEntry {
                    instant: entry.instant,
                    duration: entry.duration,
                    path: entry.path,
                    plan: text_string(&entry.plan),
                })
                .collect()
        } else {
            vec![]
        }
    }

    /// Collect all trace entries with plans of type [`String`].
    fn drain_string_entries(&self) -> Vec<TraceEntry<String>> {
        if let Some(trace) = self.0.downcast_ref::<PlanTrace<String>>() {
            trace.drain_as_vec()
        } else {
            vec![]
        }
    }
}