1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Derive insights for plans.

use std::collections::BTreeMap;
use std::fmt::Debug;
use std::sync::Arc;

use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
use mz_expr::{
    AccessStrategy, AggregateExpr, AggregateFunc, Id, MirRelationExpr, OptimizedMirRelationExpr,
    RowSetFinishing,
};
use mz_ore::num::NonNeg;
use mz_repr::explain::ExprHumanizer;
use mz_repr::{GlobalId, Timestamp};
use mz_sql::ast::Statement;
use mz_sql::names::Aug;
use mz_sql::optimizer_metrics::OptimizerMetrics;
use mz_sql::plan::HirRelationExpr;
use mz_sql::session::metadata::SessionMetadata;
use mz_transform::EmptyStatisticsOracle;
use serde::Serialize;

use crate::catalog::Catalog;
use crate::coord::peek::{FastPathPlan, PeekPlan};
use crate::optimize::dataflows::ComputeInstanceSnapshot;
use crate::optimize::{self, Optimize, OptimizerConfig, OptimizerError};
use crate::session::SessionMeta;
use crate::TimestampContext;

/// Information needed to compute PlanInsights.
#[derive(Debug)]
pub struct PlanInsightsContext {
    pub stmt: Option<Statement<Aug>>,
    pub raw_expr: HirRelationExpr,
    pub catalog: Arc<Catalog>,
    // Snapshots of all user compute instances.
    //
    // TODO: Avoid populating this if not needed. Maybe make this a method that can return a
    // ComputeInstanceSnapshot for a given cluster.
    pub compute_instances: BTreeMap<String, ComputeInstanceSnapshot>,
    pub target_instance: String,
    pub metrics: OptimizerMetrics,
    pub finishing: RowSetFinishing,
    pub optimizer_config: OptimizerConfig,
    pub session: SessionMeta,
    pub timestamp_context: TimestampContext<Timestamp>,
    pub view_id: GlobalId,
    pub index_id: GlobalId,
    pub enable_re_optimize: bool,
}

/// Insights about an optimized plan.
#[derive(Clone, Debug, Default, Serialize)]
pub struct PlanInsights {
    /// Collections imported by the plan.
    ///
    /// Each key is the ID of an imported collection, and each value contains
    /// further insights about each collection and how it is used by the plan.
    pub imports: BTreeMap<String, ImportInsights>,
    /// If this plan is not fast path, this is the map of cluster names to indexes that would render
    /// this as fast path. That is: if this query were run on the cluster of the key, it would be
    /// fast because it would use the index of the value.
    pub fast_path_clusters: BTreeMap<String, Option<FastPathCluster>>,
    /// For the current cluster, whether adding a LIMIT <= this will result in a fast path.
    pub fast_path_limit: Option<usize>,
    /// Names of persist sources over which a count(*) is done.
    pub persist_count: Vec<Name>,
}

#[derive(Clone, Debug, Serialize)]
pub struct FastPathCluster {
    index: Name,
    on: Name,
}

impl PlanInsights {
    pub async fn compute_fast_path_clusters(
        &mut self,
        humanizer: &dyn ExprHumanizer,
        ctx: Box<PlanInsightsContext>,
    ) {
        let session: Arc<dyn SessionMetadata + Send> = Arc::new(ctx.session);
        let tasks = ctx
            .compute_instances
            .into_iter()
            .map(|(name, compute_instance)| {
                let raw_expr = ctx.raw_expr.clone();
                let mut finishing = ctx.finishing.clone();
                // For the current cluster, try adding a LIMIT to see if it fast paths with PeekPersist.
                if name == ctx.target_instance {
                    finishing.limit = Some(NonNeg::try_from(1).expect("non-negitave"));
                }
                let session = Arc::clone(&session);
                let timestamp_context = ctx.timestamp_context.clone();
                let mut optimizer = optimize::peek::Optimizer::new(
                    Arc::clone(&ctx.catalog),
                    compute_instance,
                    finishing,
                    ctx.view_id,
                    ctx.index_id,
                    ctx.optimizer_config.clone(),
                    ctx.metrics.clone(),
                );
                mz_ore::task::spawn_blocking(
                    || "compute fast path clusters",
                    move || {
                        // HIR ⇒ MIR lowering and MIR optimization (local)
                        let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr)?;
                        // Attach resolved context required to continue the pipeline.
                        let local_mir_plan = local_mir_plan.resolve(
                            timestamp_context,
                            &*session,
                            Box::new(EmptyStatisticsOracle {}),
                        );
                        // MIR optimization (global), MIR ⇒ LIR lowering, and LIR optimization (global)
                        let global_lir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
                        Ok::<_, OptimizerError>((name, global_lir_plan))
                    },
                )
            });
        for task in tasks {
            let res = task.await;
            let Ok(Ok((name, plan))) = res else {
                continue;
            };
            let (plan, _, _) = plan.unapply();
            if let PeekPlan::FastPath(plan) = plan {
                // Same-cluster optimization is the LIMIT check.
                if name == ctx.target_instance {
                    self.fast_path_limit =
                        Some(ctx.optimizer_config.features.persist_fast_path_limit);
                    continue;
                }
                let idx_name = if let FastPathPlan::PeekExisting(_, idx_id, _, _) = plan {
                    let idx_entry = ctx.catalog.get_entry_by_global_id(&idx_id);
                    Some(FastPathCluster {
                        index: structured_name(humanizer, idx_id),
                        on: structured_name(
                            humanizer,
                            idx_entry.index().expect("must be index").on,
                        ),
                    })
                } else {
                    // This shouldn't ever happen (changing the cluster should not affect whether a
                    // fast path of type constant or persist peek is created), but protect against
                    // it anyway.
                    None
                };
                self.fast_path_clusters.insert(name, idx_name);
            }
        }
    }
}

/// Insights about an imported collection in a plan.
#[derive(Clone, Debug, Serialize)]
pub struct ImportInsights {
    /// The full name of the imported collection.
    pub name: Name,
    /// The type of the imported collection.
    #[serde(rename = "type")]
    pub ty: ImportType,
}

/// The type of an imported collection.
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "kebab-case")]
pub enum ImportType {
    /// A compute collection--i.e., an index.
    Compute,
    /// A storage collection: a table, source, or materialized view.
    Storage,
}

/// The name of a collection.
#[derive(Debug, Clone, Serialize)]
pub struct Name {
    /// The database name.
    #[serde(skip_serializing_if = "Option::is_none")]
    pub database: Option<String>,
    /// The schema name.
    #[serde(skip_serializing_if = "Option::is_none")]
    pub schema: Option<String>,
    /// The item name.
    #[serde(skip_serializing_if = "Option::is_none")]
    pub item: Option<String>,
}

pub fn plan_insights(
    humanizer: &dyn ExprHumanizer,
    global_plan: Option<DataflowDescription<OptimizedMirRelationExpr>>,
    fast_path_plan: Option<FastPathPlan>,
) -> Option<PlanInsights> {
    match (global_plan, fast_path_plan) {
        (None, None) => None,
        (None | Some(_), Some(fast_path_plan)) => {
            Some(fast_path_insights(humanizer, fast_path_plan))
        }
        (Some(global_plan), None) => Some(global_insights(humanizer, global_plan)),
    }
}

fn fast_path_insights(humanizer: &dyn ExprHumanizer, plan: FastPathPlan) -> PlanInsights {
    let mut insights = PlanInsights::default();
    match plan {
        FastPathPlan::Constant { .. } => (),
        FastPathPlan::PeekExisting(_, id, _, _) => {
            add_import_insights(&mut insights, humanizer, id, ImportType::Compute)
        }
        FastPathPlan::PeekPersist(id, _) => {
            add_import_insights(&mut insights, humanizer, id, ImportType::Storage)
        }
    }
    insights
}

fn global_insights(
    humanizer: &dyn ExprHumanizer,
    plan: DataflowDescription<OptimizedMirRelationExpr>,
) -> PlanInsights {
    let mut insights = PlanInsights::default();
    for (id, _) in plan.source_imports {
        add_import_insights(&mut insights, humanizer, id, ImportType::Storage)
    }
    for (id, _) in plan.index_imports {
        add_import_insights(&mut insights, humanizer, id, ImportType::Compute)
    }
    for BuildDesc { plan, .. } in plan.objects_to_build {
        // Search for a count(*) over a persist read.
        plan.visit_pre(|expr| {
            let MirRelationExpr::Reduce {
                input,
                group_key,
                aggregates,
                ..
            } = expr
            else {
                return;
            };
            if !group_key.is_empty() {
                return;
            }
            let MirRelationExpr::Project { input, outputs } = &**input else {
                return;
            };
            if !outputs.is_empty() {
                return;
            }
            let MirRelationExpr::Get {
                id: Id::Global(id),
                access_strategy: AccessStrategy::Persist,
                ..
            } = &**input
            else {
                return;
            };
            let [aggregate] = aggregates.as_slice() else {
                return;
            };
            let AggregateExpr {
                func: AggregateFunc::Count,
                distinct: false,
                expr,
            } = aggregate
            else {
                return;
            };
            if !expr.is_literal_true() {
                return;
            }
            let name = structured_name(humanizer, *id);
            insights.persist_count.push(name);
        });
    }
    insights
}

fn add_import_insights(
    insights: &mut PlanInsights,
    humanizer: &dyn ExprHumanizer,
    id: GlobalId,
    ty: ImportType,
) {
    insights.imports.insert(
        id.to_string(),
        ImportInsights {
            name: structured_name(humanizer, id),
            ty,
        },
    );
}

fn structured_name(humanizer: &dyn ExprHumanizer, id: GlobalId) -> Name {
    let mut parts = humanizer.humanize_id_parts(id).unwrap_or(Vec::new());
    Name {
        item: parts.pop(),
        schema: parts.pop(),
        database: parts.pop(),
    }
}