Skip to main content

mz_adapter/explain/
insights.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Derive insights for plans.
11
12use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::sync::Arc;
15
16use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
17use mz_expr::{AccessStrategy, Id, MirRelationExpr, OptimizedMirRelationExpr, RowSetFinishing};
18use mz_ore::num::NonNeg;
19use mz_repr::GlobalId;
20use mz_repr::explain::ExprHumanizer;
21use mz_sql::ast::Statement;
22use mz_sql::names::Aug;
23use mz_sql::optimizer_metrics::OptimizerMetrics;
24use mz_sql::plan::HirRelationExpr;
25use mz_sql::session::metadata::SessionMetadata;
26use mz_transform::EmptyStatisticsOracle;
27use serde::Serialize;
28
29use crate::TimestampContext;
30use crate::catalog::Catalog;
31use crate::coord::peek::{FastPathPlan, PeekPlan};
32use crate::optimize::dataflows::ComputeInstanceSnapshot;
33use crate::optimize::{self, Optimize, OptimizerConfig, OptimizerError};
34use crate::session::SessionMeta;
35
36/// Information needed to compute PlanInsights.
37#[derive(Debug)]
38pub struct PlanInsightsContext {
39    pub stmt: Option<Statement<Aug>>,
40    pub raw_expr: HirRelationExpr,
41    pub catalog: Arc<Catalog>,
42    // Snapshots of all user compute instances.
43    //
44    // TODO: Avoid populating this if not needed. Maybe make this a method that can return a
45    // ComputeInstanceSnapshot for a given cluster.
46    pub compute_instances: BTreeMap<String, ComputeInstanceSnapshot>,
47    pub target_instance: String,
48    pub metrics: OptimizerMetrics,
49    pub finishing: RowSetFinishing,
50    pub optimizer_config: OptimizerConfig,
51    pub session: SessionMeta,
52    pub timestamp_context: TimestampContext,
53    pub view_id: GlobalId,
54    pub index_id: GlobalId,
55    pub enable_re_optimize: bool,
56}
57
58/// Insights about an optimized plan.
59#[derive(Clone, Debug, Default, Serialize)]
60pub struct PlanInsights {
61    /// Collections imported by the plan.
62    ///
63    /// Each key is the ID of an imported collection, and each value contains
64    /// further insights about each collection and how it is used by the plan.
65    pub imports: BTreeMap<String, ImportInsights>,
66    /// If this plan is not fast path, this is the map of cluster names to indexes that would render
67    /// this as fast path. That is: if this query were run on the cluster of the key, it would be
68    /// fast because it would use the index of the value.
69    pub fast_path_clusters: BTreeMap<String, Option<FastPathCluster>>,
70    /// For the current cluster, adding a `LIMIT n` such that `n + OFFSET < this` will result in a
71    /// fast path. (If the query has no `OFFSET`, the condition simplifies to `LIMIT < this`.)
72    pub fast_path_limit: Option<usize>,
73    /// Names of persist sources over which a count(*) is done.
74    pub persist_count: Vec<Name>,
75}
76
77#[derive(Clone, Debug, Serialize)]
78pub struct FastPathCluster {
79    index: Name,
80    on: Name,
81}
82
83impl PlanInsights {
84    pub async fn compute_fast_path_clusters(
85        &mut self,
86        humanizer: &dyn ExprHumanizer,
87        ctx: Box<PlanInsightsContext>,
88    ) {
89        // Warning: This function is currently dangerous, because it does optimizer work
90        // proportional to the number of clusters, and does so on the main coordinator task.
91        // see https://github.com/MaterializeInc/database-issues/issues/9492
92        if !ctx.optimizer_config.features.enable_fast_path_plan_insights {
93            return;
94        }
95        let session: Arc<dyn SessionMetadata + Send> = Arc::new(ctx.session);
96        let tasks = ctx
97            .compute_instances
98            .into_iter()
99            .map(|(name, compute_instance)| {
100                let raw_expr = ctx.raw_expr.clone();
101                let mut finishing = ctx.finishing.clone();
102                // For the current cluster, try adding a LIMIT to see if it fast paths with PeekPersist.
103                if name == ctx.target_instance {
104                    finishing.limit = Some(NonNeg::try_from(1).expect("non-negitave"));
105                }
106                let session = Arc::clone(&session);
107                let timestamp_context = ctx.timestamp_context.clone();
108                let mut optimizer = optimize::peek::Optimizer::new(
109                    Arc::clone(&ctx.catalog),
110                    compute_instance,
111                    finishing,
112                    ctx.view_id,
113                    ctx.index_id,
114                    ctx.optimizer_config.clone(),
115                    ctx.metrics.clone(),
116                );
117                mz_ore::task::spawn_blocking(
118                    || "compute fast path clusters",
119                    move || {
120                        // HIR ⇒ MIR lowering and MIR optimization (local)
121                        let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr)?;
122                        // Attach resolved context required to continue the pipeline.
123                        let local_mir_plan = local_mir_plan.resolve(
124                            timestamp_context,
125                            &*session,
126                            Box::new(EmptyStatisticsOracle {}),
127                        );
128                        // MIR optimization (global), MIR ⇒ LIR lowering, and LIR optimization (global)
129                        let global_lir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
130                        Ok::<_, OptimizerError>((name, global_lir_plan))
131                    },
132                )
133            });
134        for task in tasks {
135            let res = task.await;
136            let Ok((name, plan)) = res else {
137                continue;
138            };
139            let (plan, _, _) = plan.unapply();
140            if let PeekPlan::FastPath(plan) = plan {
141                // Same-cluster optimization is the LIMIT check.
142                if name == ctx.target_instance {
143                    self.fast_path_limit =
144                        Some(ctx.optimizer_config.features.persist_fast_path_limit);
145                    continue;
146                }
147                let idx_name = if let FastPathPlan::PeekExisting(_, idx_id, _, _) = plan {
148                    let idx_entry = ctx.catalog.get_entry_by_global_id(&idx_id);
149                    Some(FastPathCluster {
150                        index: structured_name(humanizer, idx_id),
151                        on: structured_name(
152                            humanizer,
153                            idx_entry.index().expect("must be index").on,
154                        ),
155                    })
156                } else {
157                    // This shouldn't ever happen (changing the cluster should not affect whether a
158                    // fast path of type constant or persist peek is created), but protect against
159                    // it anyway.
160                    None
161                };
162                self.fast_path_clusters.insert(name, idx_name);
163            }
164        }
165    }
166}
167
168/// Insights about an imported collection in a plan.
169#[derive(Clone, Debug, Serialize)]
170pub struct ImportInsights {
171    /// The full name of the imported collection.
172    pub name: Name,
173    /// The type of the imported collection.
174    #[serde(rename = "type")]
175    pub ty: ImportType,
176}
177
178/// The type of an imported collection.
179#[derive(Clone, Copy, Debug, Serialize)]
180#[serde(rename_all = "kebab-case")]
181pub enum ImportType {
182    /// A compute collection--i.e., an index.
183    Compute,
184    /// A storage collection: a table, source, or materialized view.
185    Storage,
186}
187
188/// The name of a collection.
189#[derive(Debug, Clone, Serialize)]
190pub struct Name {
191    /// The database name.
192    #[serde(skip_serializing_if = "Option::is_none")]
193    pub database: Option<String>,
194    /// The schema name.
195    #[serde(skip_serializing_if = "Option::is_none")]
196    pub schema: Option<String>,
197    /// The item name.
198    #[serde(skip_serializing_if = "Option::is_none")]
199    pub item: Option<String>,
200}
201
202pub fn plan_insights(
203    humanizer: &dyn ExprHumanizer,
204    global_plan: Option<DataflowDescription<OptimizedMirRelationExpr>>,
205    fast_path_plan: Option<FastPathPlan>,
206) -> Option<PlanInsights> {
207    match (global_plan, fast_path_plan) {
208        (None, None) => None,
209        (None | Some(_), Some(fast_path_plan)) => {
210            Some(fast_path_insights(humanizer, fast_path_plan))
211        }
212        (Some(global_plan), None) => Some(global_insights(humanizer, global_plan)),
213    }
214}
215
216fn fast_path_insights(humanizer: &dyn ExprHumanizer, plan: FastPathPlan) -> PlanInsights {
217    let mut insights = PlanInsights::default();
218    match plan {
219        FastPathPlan::Constant { .. } => (),
220        FastPathPlan::PeekExisting(_, id, _, _) => {
221            add_import_insights(&mut insights, humanizer, id, ImportType::Compute)
222        }
223        FastPathPlan::PeekPersist(id, _, _) => {
224            add_import_insights(&mut insights, humanizer, id, ImportType::Storage)
225        }
226    }
227    insights
228}
229
230fn global_insights(
231    humanizer: &dyn ExprHumanizer,
232    plan: DataflowDescription<OptimizedMirRelationExpr>,
233) -> PlanInsights {
234    let mut insights = PlanInsights::default();
235    for (id, _) in plan.source_imports {
236        add_import_insights(&mut insights, humanizer, id, ImportType::Storage)
237    }
238    for (id, _) in plan.index_imports {
239        add_import_insights(&mut insights, humanizer, id, ImportType::Compute)
240    }
241    for BuildDesc { plan, .. } in plan.objects_to_build {
242        // Search for a count(*) over a persist read.
243        plan.visit_pre(|expr| {
244            let MirRelationExpr::Reduce {
245                input,
246                group_key,
247                aggregates,
248                ..
249            } = expr
250            else {
251                return;
252            };
253            if !group_key.is_empty() {
254                return;
255            }
256            let MirRelationExpr::Project { input, outputs } = &**input else {
257                return;
258            };
259            if !outputs.is_empty() {
260                return;
261            }
262            let MirRelationExpr::Get {
263                id: Id::Global(id),
264                access_strategy: AccessStrategy::Persist,
265                ..
266            } = &**input
267            else {
268                return;
269            };
270            let [aggregate] = aggregates.as_slice() else {
271                return;
272            };
273            if !aggregate.is_count_asterisk() {
274                return;
275            }
276            let name = structured_name(humanizer, *id);
277            insights.persist_count.push(name);
278        });
279    }
280    insights
281}
282
283fn add_import_insights(
284    insights: &mut PlanInsights,
285    humanizer: &dyn ExprHumanizer,
286    id: GlobalId,
287    ty: ImportType,
288) {
289    insights.imports.insert(
290        id.to_string(),
291        ImportInsights {
292            name: structured_name(humanizer, id),
293            ty,
294        },
295    );
296}
297
298fn structured_name(humanizer: &dyn ExprHumanizer, id: GlobalId) -> Name {
299    let mut parts = humanizer.humanize_id_parts(id).unwrap_or(Vec::new());
300    Name {
301        item: parts.pop(),
302        schema: parts.pop(),
303        database: parts.pop(),
304    }
305}