mz_adapter/explain/
insights.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Derive insights for plans.
11
12use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::sync::Arc;
15
16use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
17use mz_expr::{AccessStrategy, Id, MirRelationExpr, OptimizedMirRelationExpr, RowSetFinishing};
18use mz_ore::num::NonNeg;
19use mz_repr::explain::ExprHumanizer;
20use mz_repr::{GlobalId, Timestamp};
21use mz_sql::ast::Statement;
22use mz_sql::names::Aug;
23use mz_sql::optimizer_metrics::OptimizerMetrics;
24use mz_sql::plan::HirRelationExpr;
25use mz_sql::session::metadata::SessionMetadata;
26use mz_transform::EmptyStatisticsOracle;
27use serde::Serialize;
28
29use crate::TimestampContext;
30use crate::catalog::Catalog;
31use crate::coord::peek::{FastPathPlan, PeekPlan};
32use crate::optimize::dataflows::ComputeInstanceSnapshot;
33use crate::optimize::{self, Optimize, OptimizerConfig, OptimizerError};
34use crate::session::SessionMeta;
35
36/// Information needed to compute PlanInsights.
37#[derive(Debug)]
38pub struct PlanInsightsContext {
39    pub stmt: Option<Statement<Aug>>,
40    pub raw_expr: HirRelationExpr,
41    pub catalog: Arc<Catalog>,
42    // Snapshots of all user compute instances.
43    //
44    // TODO: Avoid populating this if not needed. Maybe make this a method that can return a
45    // ComputeInstanceSnapshot for a given cluster.
46    pub compute_instances: BTreeMap<String, ComputeInstanceSnapshot>,
47    pub target_instance: String,
48    pub metrics: OptimizerMetrics,
49    pub finishing: RowSetFinishing,
50    pub optimizer_config: OptimizerConfig,
51    pub session: SessionMeta,
52    pub timestamp_context: TimestampContext<Timestamp>,
53    pub view_id: GlobalId,
54    pub index_id: GlobalId,
55    pub enable_re_optimize: bool,
56}
57
58/// Insights about an optimized plan.
59#[derive(Clone, Debug, Default, Serialize)]
60pub struct PlanInsights {
61    /// Collections imported by the plan.
62    ///
63    /// Each key is the ID of an imported collection, and each value contains
64    /// further insights about each collection and how it is used by the plan.
65    pub imports: BTreeMap<String, ImportInsights>,
66    /// If this plan is not fast path, this is the map of cluster names to indexes that would render
67    /// this as fast path. That is: if this query were run on the cluster of the key, it would be
68    /// fast because it would use the index of the value.
69    pub fast_path_clusters: BTreeMap<String, Option<FastPathCluster>>,
70    /// For the current cluster, whether adding a LIMIT <= this will result in a fast path.
71    pub fast_path_limit: Option<usize>,
72    /// Names of persist sources over which a count(*) is done.
73    pub persist_count: Vec<Name>,
74}
75
76#[derive(Clone, Debug, Serialize)]
77pub struct FastPathCluster {
78    index: Name,
79    on: Name,
80}
81
82impl PlanInsights {
83    pub async fn compute_fast_path_clusters(
84        &mut self,
85        humanizer: &dyn ExprHumanizer,
86        ctx: Box<PlanInsightsContext>,
87    ) {
88        let session: Arc<dyn SessionMetadata + Send> = Arc::new(ctx.session);
89        let tasks = ctx
90            .compute_instances
91            .into_iter()
92            .map(|(name, compute_instance)| {
93                let raw_expr = ctx.raw_expr.clone();
94                let mut finishing = ctx.finishing.clone();
95                // For the current cluster, try adding a LIMIT to see if it fast paths with PeekPersist.
96                if name == ctx.target_instance {
97                    finishing.limit = Some(NonNeg::try_from(1).expect("non-negitave"));
98                }
99                let session = Arc::clone(&session);
100                let timestamp_context = ctx.timestamp_context.clone();
101                let mut optimizer = optimize::peek::Optimizer::new(
102                    Arc::clone(&ctx.catalog),
103                    compute_instance,
104                    finishing,
105                    ctx.view_id,
106                    ctx.index_id,
107                    ctx.optimizer_config.clone(),
108                    ctx.metrics.clone(),
109                );
110                mz_ore::task::spawn_blocking(
111                    || "compute fast path clusters",
112                    move || {
113                        // HIR ⇒ MIR lowering and MIR optimization (local)
114                        let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr)?;
115                        // Attach resolved context required to continue the pipeline.
116                        let local_mir_plan = local_mir_plan.resolve(
117                            timestamp_context,
118                            &*session,
119                            Box::new(EmptyStatisticsOracle {}),
120                        );
121                        // MIR optimization (global), MIR ⇒ LIR lowering, and LIR optimization (global)
122                        let global_lir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
123                        Ok::<_, OptimizerError>((name, global_lir_plan))
124                    },
125                )
126            });
127        for task in tasks {
128            let res = task.await;
129            let Ok(Ok((name, plan))) = res else {
130                continue;
131            };
132            let (plan, _, _) = plan.unapply();
133            if let PeekPlan::FastPath(plan) = plan {
134                // Same-cluster optimization is the LIMIT check.
135                if name == ctx.target_instance {
136                    self.fast_path_limit =
137                        Some(ctx.optimizer_config.features.persist_fast_path_limit);
138                    continue;
139                }
140                let idx_name = if let FastPathPlan::PeekExisting(_, idx_id, _, _) = plan {
141                    let idx_entry = ctx.catalog.get_entry_by_global_id(&idx_id);
142                    Some(FastPathCluster {
143                        index: structured_name(humanizer, idx_id),
144                        on: structured_name(
145                            humanizer,
146                            idx_entry.index().expect("must be index").on,
147                        ),
148                    })
149                } else {
150                    // This shouldn't ever happen (changing the cluster should not affect whether a
151                    // fast path of type constant or persist peek is created), but protect against
152                    // it anyway.
153                    None
154                };
155                self.fast_path_clusters.insert(name, idx_name);
156            }
157        }
158    }
159}
160
161/// Insights about an imported collection in a plan.
162#[derive(Clone, Debug, Serialize)]
163pub struct ImportInsights {
164    /// The full name of the imported collection.
165    pub name: Name,
166    /// The type of the imported collection.
167    #[serde(rename = "type")]
168    pub ty: ImportType,
169}
170
171/// The type of an imported collection.
172#[derive(Clone, Copy, Debug, Serialize)]
173#[serde(rename_all = "kebab-case")]
174pub enum ImportType {
175    /// A compute collection--i.e., an index.
176    Compute,
177    /// A storage collection: a table, source, or materialized view.
178    Storage,
179}
180
181/// The name of a collection.
182#[derive(Debug, Clone, Serialize)]
183pub struct Name {
184    /// The database name.
185    #[serde(skip_serializing_if = "Option::is_none")]
186    pub database: Option<String>,
187    /// The schema name.
188    #[serde(skip_serializing_if = "Option::is_none")]
189    pub schema: Option<String>,
190    /// The item name.
191    #[serde(skip_serializing_if = "Option::is_none")]
192    pub item: Option<String>,
193}
194
195pub fn plan_insights(
196    humanizer: &dyn ExprHumanizer,
197    global_plan: Option<DataflowDescription<OptimizedMirRelationExpr>>,
198    fast_path_plan: Option<FastPathPlan>,
199) -> Option<PlanInsights> {
200    match (global_plan, fast_path_plan) {
201        (None, None) => None,
202        (None | Some(_), Some(fast_path_plan)) => {
203            Some(fast_path_insights(humanizer, fast_path_plan))
204        }
205        (Some(global_plan), None) => Some(global_insights(humanizer, global_plan)),
206    }
207}
208
209fn fast_path_insights(humanizer: &dyn ExprHumanizer, plan: FastPathPlan) -> PlanInsights {
210    let mut insights = PlanInsights::default();
211    match plan {
212        FastPathPlan::Constant { .. } => (),
213        FastPathPlan::PeekExisting(_, id, _, _) => {
214            add_import_insights(&mut insights, humanizer, id, ImportType::Compute)
215        }
216        FastPathPlan::PeekPersist(id, _, _) => {
217            add_import_insights(&mut insights, humanizer, id, ImportType::Storage)
218        }
219    }
220    insights
221}
222
223fn global_insights(
224    humanizer: &dyn ExprHumanizer,
225    plan: DataflowDescription<OptimizedMirRelationExpr>,
226) -> PlanInsights {
227    let mut insights = PlanInsights::default();
228    for (id, _) in plan.source_imports {
229        add_import_insights(&mut insights, humanizer, id, ImportType::Storage)
230    }
231    for (id, _) in plan.index_imports {
232        add_import_insights(&mut insights, humanizer, id, ImportType::Compute)
233    }
234    for BuildDesc { plan, .. } in plan.objects_to_build {
235        // Search for a count(*) over a persist read.
236        plan.visit_pre(|expr| {
237            let MirRelationExpr::Reduce {
238                input,
239                group_key,
240                aggregates,
241                ..
242            } = expr
243            else {
244                return;
245            };
246            if !group_key.is_empty() {
247                return;
248            }
249            let MirRelationExpr::Project { input, outputs } = &**input else {
250                return;
251            };
252            if !outputs.is_empty() {
253                return;
254            }
255            let MirRelationExpr::Get {
256                id: Id::Global(id),
257                access_strategy: AccessStrategy::Persist,
258                ..
259            } = &**input
260            else {
261                return;
262            };
263            let [aggregate] = aggregates.as_slice() else {
264                return;
265            };
266            if !aggregate.is_count_asterisk() {
267                return;
268            }
269            let name = structured_name(humanizer, *id);
270            insights.persist_count.push(name);
271        });
272    }
273    insights
274}
275
276fn add_import_insights(
277    insights: &mut PlanInsights,
278    humanizer: &dyn ExprHumanizer,
279    id: GlobalId,
280    ty: ImportType,
281) {
282    insights.imports.insert(
283        id.to_string(),
284        ImportInsights {
285            name: structured_name(humanizer, id),
286            ty,
287        },
288    );
289}
290
291fn structured_name(humanizer: &dyn ExprHumanizer, id: GlobalId) -> Name {
292    let mut parts = humanizer.humanize_id_parts(id).unwrap_or(Vec::new());
293    Name {
294        item: parts.pop(),
295        schema: parts.pop(),
296        database: parts.pop(),
297    }
298}