1use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::sync::Arc;
15
16use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
17use mz_expr::{AccessStrategy, Id, MirRelationExpr, OptimizedMirRelationExpr, RowSetFinishing};
18use mz_ore::num::NonNeg;
19use mz_repr::explain::ExprHumanizer;
20use mz_repr::{GlobalId, Timestamp};
21use mz_sql::ast::Statement;
22use mz_sql::names::Aug;
23use mz_sql::optimizer_metrics::OptimizerMetrics;
24use mz_sql::plan::HirRelationExpr;
25use mz_sql::session::metadata::SessionMetadata;
26use mz_transform::EmptyStatisticsOracle;
27use serde::Serialize;
28
29use crate::TimestampContext;
30use crate::catalog::Catalog;
31use crate::coord::peek::{FastPathPlan, PeekPlan};
32use crate::optimize::dataflows::ComputeInstanceSnapshot;
33use crate::optimize::{self, Optimize, OptimizerConfig, OptimizerError};
34use crate::session::SessionMeta;
35
36#[derive(Debug)]
38pub struct PlanInsightsContext {
39 pub stmt: Option<Statement<Aug>>,
40 pub raw_expr: HirRelationExpr,
41 pub catalog: Arc<Catalog>,
42 pub compute_instances: BTreeMap<String, ComputeInstanceSnapshot>,
47 pub target_instance: String,
48 pub metrics: OptimizerMetrics,
49 pub finishing: RowSetFinishing,
50 pub optimizer_config: OptimizerConfig,
51 pub session: SessionMeta,
52 pub timestamp_context: TimestampContext<Timestamp>,
53 pub view_id: GlobalId,
54 pub index_id: GlobalId,
55 pub enable_re_optimize: bool,
56}
57
58#[derive(Clone, Debug, Default, Serialize)]
60pub struct PlanInsights {
61 pub imports: BTreeMap<String, ImportInsights>,
66 pub fast_path_clusters: BTreeMap<String, Option<FastPathCluster>>,
70 pub fast_path_limit: Option<usize>,
72 pub persist_count: Vec<Name>,
74}
75
76#[derive(Clone, Debug, Serialize)]
77pub struct FastPathCluster {
78 index: Name,
79 on: Name,
80}
81
82impl PlanInsights {
83 pub async fn compute_fast_path_clusters(
84 &mut self,
85 humanizer: &dyn ExprHumanizer,
86 ctx: Box<PlanInsightsContext>,
87 ) {
88 if !ctx.optimizer_config.features.enable_fast_path_plan_insights {
92 return;
93 }
94 let session: Arc<dyn SessionMetadata + Send> = Arc::new(ctx.session);
95 let tasks = ctx
96 .compute_instances
97 .into_iter()
98 .map(|(name, compute_instance)| {
99 let raw_expr = ctx.raw_expr.clone();
100 let mut finishing = ctx.finishing.clone();
101 if name == ctx.target_instance {
103 finishing.limit = Some(NonNeg::try_from(1).expect("non-negitave"));
104 }
105 let session = Arc::clone(&session);
106 let timestamp_context = ctx.timestamp_context.clone();
107 let mut optimizer = optimize::peek::Optimizer::new(
108 Arc::clone(&ctx.catalog),
109 compute_instance,
110 finishing,
111 ctx.view_id,
112 ctx.index_id,
113 ctx.optimizer_config.clone(),
114 ctx.metrics.clone(),
115 );
116 mz_ore::task::spawn_blocking(
117 || "compute fast path clusters",
118 move || {
119 let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr)?;
121 let local_mir_plan = local_mir_plan.resolve(
123 timestamp_context,
124 &*session,
125 Box::new(EmptyStatisticsOracle {}),
126 );
127 let global_lir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
129 Ok::<_, OptimizerError>((name, global_lir_plan))
130 },
131 )
132 });
133 for task in tasks {
134 let res = task.await;
135 let Ok(Ok((name, plan))) = res else {
136 continue;
137 };
138 let (plan, _, _) = plan.unapply();
139 if let PeekPlan::FastPath(plan) = plan {
140 if name == ctx.target_instance {
142 self.fast_path_limit =
143 Some(ctx.optimizer_config.features.persist_fast_path_limit);
144 continue;
145 }
146 let idx_name = if let FastPathPlan::PeekExisting(_, idx_id, _, _) = plan {
147 let idx_entry = ctx.catalog.get_entry_by_global_id(&idx_id);
148 Some(FastPathCluster {
149 index: structured_name(humanizer, idx_id),
150 on: structured_name(
151 humanizer,
152 idx_entry.index().expect("must be index").on,
153 ),
154 })
155 } else {
156 None
160 };
161 self.fast_path_clusters.insert(name, idx_name);
162 }
163 }
164 }
165}
166
167#[derive(Clone, Debug, Serialize)]
169pub struct ImportInsights {
170 pub name: Name,
172 #[serde(rename = "type")]
174 pub ty: ImportType,
175}
176
177#[derive(Clone, Copy, Debug, Serialize)]
179#[serde(rename_all = "kebab-case")]
180pub enum ImportType {
181 Compute,
183 Storage,
185}
186
187#[derive(Debug, Clone, Serialize)]
189pub struct Name {
190 #[serde(skip_serializing_if = "Option::is_none")]
192 pub database: Option<String>,
193 #[serde(skip_serializing_if = "Option::is_none")]
195 pub schema: Option<String>,
196 #[serde(skip_serializing_if = "Option::is_none")]
198 pub item: Option<String>,
199}
200
201pub fn plan_insights(
202 humanizer: &dyn ExprHumanizer,
203 global_plan: Option<DataflowDescription<OptimizedMirRelationExpr>>,
204 fast_path_plan: Option<FastPathPlan>,
205) -> Option<PlanInsights> {
206 match (global_plan, fast_path_plan) {
207 (None, None) => None,
208 (None | Some(_), Some(fast_path_plan)) => {
209 Some(fast_path_insights(humanizer, fast_path_plan))
210 }
211 (Some(global_plan), None) => Some(global_insights(humanizer, global_plan)),
212 }
213}
214
215fn fast_path_insights(humanizer: &dyn ExprHumanizer, plan: FastPathPlan) -> PlanInsights {
216 let mut insights = PlanInsights::default();
217 match plan {
218 FastPathPlan::Constant { .. } => (),
219 FastPathPlan::PeekExisting(_, id, _, _) => {
220 add_import_insights(&mut insights, humanizer, id, ImportType::Compute)
221 }
222 FastPathPlan::PeekPersist(id, _, _) => {
223 add_import_insights(&mut insights, humanizer, id, ImportType::Storage)
224 }
225 }
226 insights
227}
228
229fn global_insights(
230 humanizer: &dyn ExprHumanizer,
231 plan: DataflowDescription<OptimizedMirRelationExpr>,
232) -> PlanInsights {
233 let mut insights = PlanInsights::default();
234 for (id, _) in plan.source_imports {
235 add_import_insights(&mut insights, humanizer, id, ImportType::Storage)
236 }
237 for (id, _) in plan.index_imports {
238 add_import_insights(&mut insights, humanizer, id, ImportType::Compute)
239 }
240 for BuildDesc { plan, .. } in plan.objects_to_build {
241 plan.visit_pre(|expr| {
243 let MirRelationExpr::Reduce {
244 input,
245 group_key,
246 aggregates,
247 ..
248 } = expr
249 else {
250 return;
251 };
252 if !group_key.is_empty() {
253 return;
254 }
255 let MirRelationExpr::Project { input, outputs } = &**input else {
256 return;
257 };
258 if !outputs.is_empty() {
259 return;
260 }
261 let MirRelationExpr::Get {
262 id: Id::Global(id),
263 access_strategy: AccessStrategy::Persist,
264 ..
265 } = &**input
266 else {
267 return;
268 };
269 let [aggregate] = aggregates.as_slice() else {
270 return;
271 };
272 if !aggregate.is_count_asterisk() {
273 return;
274 }
275 let name = structured_name(humanizer, *id);
276 insights.persist_count.push(name);
277 });
278 }
279 insights
280}
281
282fn add_import_insights(
283 insights: &mut PlanInsights,
284 humanizer: &dyn ExprHumanizer,
285 id: GlobalId,
286 ty: ImportType,
287) {
288 insights.imports.insert(
289 id.to_string(),
290 ImportInsights {
291 name: structured_name(humanizer, id),
292 ty,
293 },
294 );
295}
296
297fn structured_name(humanizer: &dyn ExprHumanizer, id: GlobalId) -> Name {
298 let mut parts = humanizer.humanize_id_parts(id).unwrap_or(Vec::new());
299 Name {
300 item: parts.pop(),
301 schema: parts.pop(),
302 database: parts.pop(),
303 }
304}