mz_adapter/explain/
insights.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Derive insights for plans.
11
12use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::sync::Arc;
15
16use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
17use mz_expr::{AccessStrategy, Id, MirRelationExpr, OptimizedMirRelationExpr, RowSetFinishing};
18use mz_ore::num::NonNeg;
19use mz_repr::explain::ExprHumanizer;
20use mz_repr::{GlobalId, Timestamp};
21use mz_sql::ast::Statement;
22use mz_sql::names::Aug;
23use mz_sql::optimizer_metrics::OptimizerMetrics;
24use mz_sql::plan::HirRelationExpr;
25use mz_sql::session::metadata::SessionMetadata;
26use mz_transform::EmptyStatisticsOracle;
27use serde::Serialize;
28
29use crate::TimestampContext;
30use crate::catalog::Catalog;
31use crate::coord::peek::{FastPathPlan, PeekPlan};
32use crate::optimize::dataflows::ComputeInstanceSnapshot;
33use crate::optimize::{self, Optimize, OptimizerConfig, OptimizerError};
34use crate::session::SessionMeta;
35
36/// Information needed to compute PlanInsights.
37#[derive(Debug)]
38pub struct PlanInsightsContext {
39    pub stmt: Option<Statement<Aug>>,
40    pub raw_expr: HirRelationExpr,
41    pub catalog: Arc<Catalog>,
42    // Snapshots of all user compute instances.
43    //
44    // TODO: Avoid populating this if not needed. Maybe make this a method that can return a
45    // ComputeInstanceSnapshot for a given cluster.
46    pub compute_instances: BTreeMap<String, ComputeInstanceSnapshot>,
47    pub target_instance: String,
48    pub metrics: OptimizerMetrics,
49    pub finishing: RowSetFinishing,
50    pub optimizer_config: OptimizerConfig,
51    pub session: SessionMeta,
52    pub timestamp_context: TimestampContext<Timestamp>,
53    pub view_id: GlobalId,
54    pub index_id: GlobalId,
55    pub enable_re_optimize: bool,
56}
57
58/// Insights about an optimized plan.
59#[derive(Clone, Debug, Default, Serialize)]
60pub struct PlanInsights {
61    /// Collections imported by the plan.
62    ///
63    /// Each key is the ID of an imported collection, and each value contains
64    /// further insights about each collection and how it is used by the plan.
65    pub imports: BTreeMap<String, ImportInsights>,
66    /// If this plan is not fast path, this is the map of cluster names to indexes that would render
67    /// this as fast path. That is: if this query were run on the cluster of the key, it would be
68    /// fast because it would use the index of the value.
69    pub fast_path_clusters: BTreeMap<String, Option<FastPathCluster>>,
70    /// For the current cluster, whether adding a LIMIT <= this will result in a fast path.
71    pub fast_path_limit: Option<usize>,
72    /// Names of persist sources over which a count(*) is done.
73    pub persist_count: Vec<Name>,
74}
75
76#[derive(Clone, Debug, Serialize)]
77pub struct FastPathCluster {
78    index: Name,
79    on: Name,
80}
81
82impl PlanInsights {
83    pub async fn compute_fast_path_clusters(
84        &mut self,
85        humanizer: &dyn ExprHumanizer,
86        ctx: Box<PlanInsightsContext>,
87    ) {
88        // Warning: This function is currently dangerous, because it does optimizer work
89        // proportional to the number of clusters, and does so on the main coordinator task.
90        // see https://github.com/MaterializeInc/database-issues/issues/9492
91        if !ctx.optimizer_config.features.enable_fast_path_plan_insights {
92            return;
93        }
94        let session: Arc<dyn SessionMetadata + Send> = Arc::new(ctx.session);
95        let tasks = ctx
96            .compute_instances
97            .into_iter()
98            .map(|(name, compute_instance)| {
99                let raw_expr = ctx.raw_expr.clone();
100                let mut finishing = ctx.finishing.clone();
101                // For the current cluster, try adding a LIMIT to see if it fast paths with PeekPersist.
102                if name == ctx.target_instance {
103                    finishing.limit = Some(NonNeg::try_from(1).expect("non-negitave"));
104                }
105                let session = Arc::clone(&session);
106                let timestamp_context = ctx.timestamp_context.clone();
107                let mut optimizer = optimize::peek::Optimizer::new(
108                    Arc::clone(&ctx.catalog),
109                    compute_instance,
110                    finishing,
111                    ctx.view_id,
112                    ctx.index_id,
113                    ctx.optimizer_config.clone(),
114                    ctx.metrics.clone(),
115                );
116                mz_ore::task::spawn_blocking(
117                    || "compute fast path clusters",
118                    move || {
119                        // HIR ⇒ MIR lowering and MIR optimization (local)
120                        let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr)?;
121                        // Attach resolved context required to continue the pipeline.
122                        let local_mir_plan = local_mir_plan.resolve(
123                            timestamp_context,
124                            &*session,
125                            Box::new(EmptyStatisticsOracle {}),
126                        );
127                        // MIR optimization (global), MIR ⇒ LIR lowering, and LIR optimization (global)
128                        let global_lir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
129                        Ok::<_, OptimizerError>((name, global_lir_plan))
130                    },
131                )
132            });
133        for task in tasks {
134            let res = task.await;
135            let Ok(Ok((name, plan))) = res else {
136                continue;
137            };
138            let (plan, _, _) = plan.unapply();
139            if let PeekPlan::FastPath(plan) = plan {
140                // Same-cluster optimization is the LIMIT check.
141                if name == ctx.target_instance {
142                    self.fast_path_limit =
143                        Some(ctx.optimizer_config.features.persist_fast_path_limit);
144                    continue;
145                }
146                let idx_name = if let FastPathPlan::PeekExisting(_, idx_id, _, _) = plan {
147                    let idx_entry = ctx.catalog.get_entry_by_global_id(&idx_id);
148                    Some(FastPathCluster {
149                        index: structured_name(humanizer, idx_id),
150                        on: structured_name(
151                            humanizer,
152                            idx_entry.index().expect("must be index").on,
153                        ),
154                    })
155                } else {
156                    // This shouldn't ever happen (changing the cluster should not affect whether a
157                    // fast path of type constant or persist peek is created), but protect against
158                    // it anyway.
159                    None
160                };
161                self.fast_path_clusters.insert(name, idx_name);
162            }
163        }
164    }
165}
166
167/// Insights about an imported collection in a plan.
168#[derive(Clone, Debug, Serialize)]
169pub struct ImportInsights {
170    /// The full name of the imported collection.
171    pub name: Name,
172    /// The type of the imported collection.
173    #[serde(rename = "type")]
174    pub ty: ImportType,
175}
176
177/// The type of an imported collection.
178#[derive(Clone, Copy, Debug, Serialize)]
179#[serde(rename_all = "kebab-case")]
180pub enum ImportType {
181    /// A compute collection--i.e., an index.
182    Compute,
183    /// A storage collection: a table, source, or materialized view.
184    Storage,
185}
186
187/// The name of a collection.
188#[derive(Debug, Clone, Serialize)]
189pub struct Name {
190    /// The database name.
191    #[serde(skip_serializing_if = "Option::is_none")]
192    pub database: Option<String>,
193    /// The schema name.
194    #[serde(skip_serializing_if = "Option::is_none")]
195    pub schema: Option<String>,
196    /// The item name.
197    #[serde(skip_serializing_if = "Option::is_none")]
198    pub item: Option<String>,
199}
200
201pub fn plan_insights(
202    humanizer: &dyn ExprHumanizer,
203    global_plan: Option<DataflowDescription<OptimizedMirRelationExpr>>,
204    fast_path_plan: Option<FastPathPlan>,
205) -> Option<PlanInsights> {
206    match (global_plan, fast_path_plan) {
207        (None, None) => None,
208        (None | Some(_), Some(fast_path_plan)) => {
209            Some(fast_path_insights(humanizer, fast_path_plan))
210        }
211        (Some(global_plan), None) => Some(global_insights(humanizer, global_plan)),
212    }
213}
214
215fn fast_path_insights(humanizer: &dyn ExprHumanizer, plan: FastPathPlan) -> PlanInsights {
216    let mut insights = PlanInsights::default();
217    match plan {
218        FastPathPlan::Constant { .. } => (),
219        FastPathPlan::PeekExisting(_, id, _, _) => {
220            add_import_insights(&mut insights, humanizer, id, ImportType::Compute)
221        }
222        FastPathPlan::PeekPersist(id, _, _) => {
223            add_import_insights(&mut insights, humanizer, id, ImportType::Storage)
224        }
225    }
226    insights
227}
228
229fn global_insights(
230    humanizer: &dyn ExprHumanizer,
231    plan: DataflowDescription<OptimizedMirRelationExpr>,
232) -> PlanInsights {
233    let mut insights = PlanInsights::default();
234    for (id, _) in plan.source_imports {
235        add_import_insights(&mut insights, humanizer, id, ImportType::Storage)
236    }
237    for (id, _) in plan.index_imports {
238        add_import_insights(&mut insights, humanizer, id, ImportType::Compute)
239    }
240    for BuildDesc { plan, .. } in plan.objects_to_build {
241        // Search for a count(*) over a persist read.
242        plan.visit_pre(|expr| {
243            let MirRelationExpr::Reduce {
244                input,
245                group_key,
246                aggregates,
247                ..
248            } = expr
249            else {
250                return;
251            };
252            if !group_key.is_empty() {
253                return;
254            }
255            let MirRelationExpr::Project { input, outputs } = &**input else {
256                return;
257            };
258            if !outputs.is_empty() {
259                return;
260            }
261            let MirRelationExpr::Get {
262                id: Id::Global(id),
263                access_strategy: AccessStrategy::Persist,
264                ..
265            } = &**input
266            else {
267                return;
268            };
269            let [aggregate] = aggregates.as_slice() else {
270                return;
271            };
272            if !aggregate.is_count_asterisk() {
273                return;
274            }
275            let name = structured_name(humanizer, *id);
276            insights.persist_count.push(name);
277        });
278    }
279    insights
280}
281
282fn add_import_insights(
283    insights: &mut PlanInsights,
284    humanizer: &dyn ExprHumanizer,
285    id: GlobalId,
286    ty: ImportType,
287) {
288    insights.imports.insert(
289        id.to_string(),
290        ImportInsights {
291            name: structured_name(humanizer, id),
292            ty,
293        },
294    );
295}
296
297fn structured_name(humanizer: &dyn ExprHumanizer, id: GlobalId) -> Name {
298    let mut parts = humanizer.humanize_id_parts(id).unwrap_or(Vec::new());
299    Name {
300        item: parts.pop(),
301        schema: parts.pop(),
302        database: parts.pop(),
303    }
304}