1use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::sync::Arc;
15
16use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
17use mz_expr::{AccessStrategy, Id, MirRelationExpr, OptimizedMirRelationExpr, RowSetFinishing};
18use mz_ore::num::NonNeg;
19use mz_repr::GlobalId;
20use mz_repr::explain::ExprHumanizer;
21use mz_sql::ast::Statement;
22use mz_sql::names::Aug;
23use mz_sql::optimizer_metrics::OptimizerMetrics;
24use mz_sql::plan::HirRelationExpr;
25use mz_sql::session::metadata::SessionMetadata;
26use mz_transform::EmptyStatisticsOracle;
27use serde::Serialize;
28
29use crate::TimestampContext;
30use crate::catalog::Catalog;
31use crate::coord::peek::{FastPathPlan, PeekPlan};
32use crate::optimize::dataflows::ComputeInstanceSnapshot;
33use crate::optimize::{self, Optimize, OptimizerConfig, OptimizerError};
34use crate::session::SessionMeta;
35
36#[derive(Debug)]
38pub struct PlanInsightsContext {
39 pub stmt: Option<Statement<Aug>>,
40 pub raw_expr: HirRelationExpr,
41 pub catalog: Arc<Catalog>,
42 pub compute_instances: BTreeMap<String, ComputeInstanceSnapshot>,
47 pub target_instance: String,
48 pub metrics: OptimizerMetrics,
49 pub finishing: RowSetFinishing,
50 pub optimizer_config: OptimizerConfig,
51 pub session: SessionMeta,
52 pub timestamp_context: TimestampContext,
53 pub view_id: GlobalId,
54 pub index_id: GlobalId,
55 pub enable_re_optimize: bool,
56}
57
58#[derive(Clone, Debug, Default, Serialize)]
60pub struct PlanInsights {
61 pub imports: BTreeMap<String, ImportInsights>,
66 pub fast_path_clusters: BTreeMap<String, Option<FastPathCluster>>,
70 pub fast_path_limit: Option<usize>,
73 pub persist_count: Vec<Name>,
75}
76
77#[derive(Clone, Debug, Serialize)]
78pub struct FastPathCluster {
79 index: Name,
80 on: Name,
81}
82
83impl PlanInsights {
84 pub async fn compute_fast_path_clusters(
85 &mut self,
86 humanizer: &dyn ExprHumanizer,
87 ctx: Box<PlanInsightsContext>,
88 ) {
89 if !ctx.optimizer_config.features.enable_fast_path_plan_insights {
93 return;
94 }
95 let session: Arc<dyn SessionMetadata + Send> = Arc::new(ctx.session);
96 let tasks = ctx
97 .compute_instances
98 .into_iter()
99 .map(|(name, compute_instance)| {
100 let raw_expr = ctx.raw_expr.clone();
101 let mut finishing = ctx.finishing.clone();
102 if name == ctx.target_instance {
104 finishing.limit = Some(NonNeg::try_from(1).expect("non-negitave"));
105 }
106 let session = Arc::clone(&session);
107 let timestamp_context = ctx.timestamp_context.clone();
108 let mut optimizer = optimize::peek::Optimizer::new(
109 Arc::clone(&ctx.catalog),
110 compute_instance,
111 finishing,
112 ctx.view_id,
113 ctx.index_id,
114 ctx.optimizer_config.clone(),
115 ctx.metrics.clone(),
116 );
117 mz_ore::task::spawn_blocking(
118 || "compute fast path clusters",
119 move || {
120 let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr)?;
122 let local_mir_plan = local_mir_plan.resolve(
124 timestamp_context,
125 &*session,
126 Box::new(EmptyStatisticsOracle {}),
127 );
128 let global_lir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
130 Ok::<_, OptimizerError>((name, global_lir_plan))
131 },
132 )
133 });
134 for task in tasks {
135 let res = task.await;
136 let Ok((name, plan)) = res else {
137 continue;
138 };
139 let (plan, _, _) = plan.unapply();
140 if let PeekPlan::FastPath(plan) = plan {
141 if name == ctx.target_instance {
143 self.fast_path_limit =
144 Some(ctx.optimizer_config.features.persist_fast_path_limit);
145 continue;
146 }
147 let idx_name = if let FastPathPlan::PeekExisting(_, idx_id, _, _) = plan {
148 let idx_entry = ctx.catalog.get_entry_by_global_id(&idx_id);
149 Some(FastPathCluster {
150 index: structured_name(humanizer, idx_id),
151 on: structured_name(
152 humanizer,
153 idx_entry.index().expect("must be index").on,
154 ),
155 })
156 } else {
157 None
161 };
162 self.fast_path_clusters.insert(name, idx_name);
163 }
164 }
165 }
166}
167
168#[derive(Clone, Debug, Serialize)]
170pub struct ImportInsights {
171 pub name: Name,
173 #[serde(rename = "type")]
175 pub ty: ImportType,
176}
177
178#[derive(Clone, Copy, Debug, Serialize)]
180#[serde(rename_all = "kebab-case")]
181pub enum ImportType {
182 Compute,
184 Storage,
186}
187
188#[derive(Debug, Clone, Serialize)]
190pub struct Name {
191 #[serde(skip_serializing_if = "Option::is_none")]
193 pub database: Option<String>,
194 #[serde(skip_serializing_if = "Option::is_none")]
196 pub schema: Option<String>,
197 #[serde(skip_serializing_if = "Option::is_none")]
199 pub item: Option<String>,
200}
201
202pub fn plan_insights(
203 humanizer: &dyn ExprHumanizer,
204 global_plan: Option<DataflowDescription<OptimizedMirRelationExpr>>,
205 fast_path_plan: Option<FastPathPlan>,
206) -> Option<PlanInsights> {
207 match (global_plan, fast_path_plan) {
208 (None, None) => None,
209 (None | Some(_), Some(fast_path_plan)) => {
210 Some(fast_path_insights(humanizer, fast_path_plan))
211 }
212 (Some(global_plan), None) => Some(global_insights(humanizer, global_plan)),
213 }
214}
215
216fn fast_path_insights(humanizer: &dyn ExprHumanizer, plan: FastPathPlan) -> PlanInsights {
217 let mut insights = PlanInsights::default();
218 match plan {
219 FastPathPlan::Constant { .. } => (),
220 FastPathPlan::PeekExisting(_, id, _, _) => {
221 add_import_insights(&mut insights, humanizer, id, ImportType::Compute)
222 }
223 FastPathPlan::PeekPersist(id, _, _) => {
224 add_import_insights(&mut insights, humanizer, id, ImportType::Storage)
225 }
226 }
227 insights
228}
229
230fn global_insights(
231 humanizer: &dyn ExprHumanizer,
232 plan: DataflowDescription<OptimizedMirRelationExpr>,
233) -> PlanInsights {
234 let mut insights = PlanInsights::default();
235 for (id, _) in plan.source_imports {
236 add_import_insights(&mut insights, humanizer, id, ImportType::Storage)
237 }
238 for (id, _) in plan.index_imports {
239 add_import_insights(&mut insights, humanizer, id, ImportType::Compute)
240 }
241 for BuildDesc { plan, .. } in plan.objects_to_build {
242 plan.visit_pre(|expr| {
244 let MirRelationExpr::Reduce {
245 input,
246 group_key,
247 aggregates,
248 ..
249 } = expr
250 else {
251 return;
252 };
253 if !group_key.is_empty() {
254 return;
255 }
256 let MirRelationExpr::Project { input, outputs } = &**input else {
257 return;
258 };
259 if !outputs.is_empty() {
260 return;
261 }
262 let MirRelationExpr::Get {
263 id: Id::Global(id),
264 access_strategy: AccessStrategy::Persist,
265 ..
266 } = &**input
267 else {
268 return;
269 };
270 let [aggregate] = aggregates.as_slice() else {
271 return;
272 };
273 if !aggregate.is_count_asterisk() {
274 return;
275 }
276 let name = structured_name(humanizer, *id);
277 insights.persist_count.push(name);
278 });
279 }
280 insights
281}
282
283fn add_import_insights(
284 insights: &mut PlanInsights,
285 humanizer: &dyn ExprHumanizer,
286 id: GlobalId,
287 ty: ImportType,
288) {
289 insights.imports.insert(
290 id.to_string(),
291 ImportInsights {
292 name: structured_name(humanizer, id),
293 ty,
294 },
295 );
296}
297
298fn structured_name(humanizer: &dyn ExprHumanizer, id: GlobalId) -> Name {
299 let mut parts = humanizer.humanize_id_parts(id).unwrap_or(Vec::new());
300 Name {
301 item: parts.pop(),
302 schema: parts.pop(),
303 database: parts.pop(),
304 }
305}