1use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::sync::Arc;
15
16use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
17use mz_expr::{AccessStrategy, Id, MirRelationExpr, OptimizedMirRelationExpr, RowSetFinishing};
18use mz_ore::num::NonNeg;
19use mz_repr::explain::ExprHumanizer;
20use mz_repr::{GlobalId, Timestamp};
21use mz_sql::ast::Statement;
22use mz_sql::names::Aug;
23use mz_sql::optimizer_metrics::OptimizerMetrics;
24use mz_sql::plan::HirRelationExpr;
25use mz_sql::session::metadata::SessionMetadata;
26use mz_transform::EmptyStatisticsOracle;
27use serde::Serialize;
28
29use crate::TimestampContext;
30use crate::catalog::Catalog;
31use crate::coord::peek::{FastPathPlan, PeekPlan};
32use crate::optimize::dataflows::ComputeInstanceSnapshot;
33use crate::optimize::{self, Optimize, OptimizerConfig, OptimizerError};
34use crate::session::SessionMeta;
35
36#[derive(Debug)]
38pub struct PlanInsightsContext {
39 pub stmt: Option<Statement<Aug>>,
40 pub raw_expr: HirRelationExpr,
41 pub catalog: Arc<Catalog>,
42 pub compute_instances: BTreeMap<String, ComputeInstanceSnapshot>,
47 pub target_instance: String,
48 pub metrics: OptimizerMetrics,
49 pub finishing: RowSetFinishing,
50 pub optimizer_config: OptimizerConfig,
51 pub session: SessionMeta,
52 pub timestamp_context: TimestampContext<Timestamp>,
53 pub view_id: GlobalId,
54 pub index_id: GlobalId,
55 pub enable_re_optimize: bool,
56}
57
58#[derive(Clone, Debug, Default, Serialize)]
60pub struct PlanInsights {
61 pub imports: BTreeMap<String, ImportInsights>,
66 pub fast_path_clusters: BTreeMap<String, Option<FastPathCluster>>,
70 pub fast_path_limit: Option<usize>,
72 pub persist_count: Vec<Name>,
74}
75
76#[derive(Clone, Debug, Serialize)]
77pub struct FastPathCluster {
78 index: Name,
79 on: Name,
80}
81
82impl PlanInsights {
83 pub async fn compute_fast_path_clusters(
84 &mut self,
85 humanizer: &dyn ExprHumanizer,
86 ctx: Box<PlanInsightsContext>,
87 ) {
88 let session: Arc<dyn SessionMetadata + Send> = Arc::new(ctx.session);
89 let tasks = ctx
90 .compute_instances
91 .into_iter()
92 .map(|(name, compute_instance)| {
93 let raw_expr = ctx.raw_expr.clone();
94 let mut finishing = ctx.finishing.clone();
95 if name == ctx.target_instance {
97 finishing.limit = Some(NonNeg::try_from(1).expect("non-negitave"));
98 }
99 let session = Arc::clone(&session);
100 let timestamp_context = ctx.timestamp_context.clone();
101 let mut optimizer = optimize::peek::Optimizer::new(
102 Arc::clone(&ctx.catalog),
103 compute_instance,
104 finishing,
105 ctx.view_id,
106 ctx.index_id,
107 ctx.optimizer_config.clone(),
108 ctx.metrics.clone(),
109 );
110 mz_ore::task::spawn_blocking(
111 || "compute fast path clusters",
112 move || {
113 let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr)?;
115 let local_mir_plan = local_mir_plan.resolve(
117 timestamp_context,
118 &*session,
119 Box::new(EmptyStatisticsOracle {}),
120 );
121 let global_lir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
123 Ok::<_, OptimizerError>((name, global_lir_plan))
124 },
125 )
126 });
127 for task in tasks {
128 let res = task.await;
129 let Ok(Ok((name, plan))) = res else {
130 continue;
131 };
132 let (plan, _, _) = plan.unapply();
133 if let PeekPlan::FastPath(plan) = plan {
134 if name == ctx.target_instance {
136 self.fast_path_limit =
137 Some(ctx.optimizer_config.features.persist_fast_path_limit);
138 continue;
139 }
140 let idx_name = if let FastPathPlan::PeekExisting(_, idx_id, _, _) = plan {
141 let idx_entry = ctx.catalog.get_entry_by_global_id(&idx_id);
142 Some(FastPathCluster {
143 index: structured_name(humanizer, idx_id),
144 on: structured_name(
145 humanizer,
146 idx_entry.index().expect("must be index").on,
147 ),
148 })
149 } else {
150 None
154 };
155 self.fast_path_clusters.insert(name, idx_name);
156 }
157 }
158 }
159}
160
161#[derive(Clone, Debug, Serialize)]
163pub struct ImportInsights {
164 pub name: Name,
166 #[serde(rename = "type")]
168 pub ty: ImportType,
169}
170
171#[derive(Clone, Copy, Debug, Serialize)]
173#[serde(rename_all = "kebab-case")]
174pub enum ImportType {
175 Compute,
177 Storage,
179}
180
181#[derive(Debug, Clone, Serialize)]
183pub struct Name {
184 #[serde(skip_serializing_if = "Option::is_none")]
186 pub database: Option<String>,
187 #[serde(skip_serializing_if = "Option::is_none")]
189 pub schema: Option<String>,
190 #[serde(skip_serializing_if = "Option::is_none")]
192 pub item: Option<String>,
193}
194
195pub fn plan_insights(
196 humanizer: &dyn ExprHumanizer,
197 global_plan: Option<DataflowDescription<OptimizedMirRelationExpr>>,
198 fast_path_plan: Option<FastPathPlan>,
199) -> Option<PlanInsights> {
200 match (global_plan, fast_path_plan) {
201 (None, None) => None,
202 (None | Some(_), Some(fast_path_plan)) => {
203 Some(fast_path_insights(humanizer, fast_path_plan))
204 }
205 (Some(global_plan), None) => Some(global_insights(humanizer, global_plan)),
206 }
207}
208
209fn fast_path_insights(humanizer: &dyn ExprHumanizer, plan: FastPathPlan) -> PlanInsights {
210 let mut insights = PlanInsights::default();
211 match plan {
212 FastPathPlan::Constant { .. } => (),
213 FastPathPlan::PeekExisting(_, id, _, _) => {
214 add_import_insights(&mut insights, humanizer, id, ImportType::Compute)
215 }
216 FastPathPlan::PeekPersist(id, _, _) => {
217 add_import_insights(&mut insights, humanizer, id, ImportType::Storage)
218 }
219 }
220 insights
221}
222
223fn global_insights(
224 humanizer: &dyn ExprHumanizer,
225 plan: DataflowDescription<OptimizedMirRelationExpr>,
226) -> PlanInsights {
227 let mut insights = PlanInsights::default();
228 for (id, _) in plan.source_imports {
229 add_import_insights(&mut insights, humanizer, id, ImportType::Storage)
230 }
231 for (id, _) in plan.index_imports {
232 add_import_insights(&mut insights, humanizer, id, ImportType::Compute)
233 }
234 for BuildDesc { plan, .. } in plan.objects_to_build {
235 plan.visit_pre(|expr| {
237 let MirRelationExpr::Reduce {
238 input,
239 group_key,
240 aggregates,
241 ..
242 } = expr
243 else {
244 return;
245 };
246 if !group_key.is_empty() {
247 return;
248 }
249 let MirRelationExpr::Project { input, outputs } = &**input else {
250 return;
251 };
252 if !outputs.is_empty() {
253 return;
254 }
255 let MirRelationExpr::Get {
256 id: Id::Global(id),
257 access_strategy: AccessStrategy::Persist,
258 ..
259 } = &**input
260 else {
261 return;
262 };
263 let [aggregate] = aggregates.as_slice() else {
264 return;
265 };
266 if !aggregate.is_count_asterisk() {
267 return;
268 }
269 let name = structured_name(humanizer, *id);
270 insights.persist_count.push(name);
271 });
272 }
273 insights
274}
275
276fn add_import_insights(
277 insights: &mut PlanInsights,
278 humanizer: &dyn ExprHumanizer,
279 id: GlobalId,
280 ty: ImportType,
281) {
282 insights.imports.insert(
283 id.to_string(),
284 ImportInsights {
285 name: structured_name(humanizer, id),
286 ty,
287 },
288 );
289}
290
291fn structured_name(humanizer: &dyn ExprHumanizer, id: GlobalId) -> Name {
292 let mut parts = humanizer.humanize_id_parts(id).unwrap_or(Vec::new());
293 Name {
294 item: parts.pop(),
295 schema: parts.pop(),
296 database: parts.pop(),
297 }
298}