1use std::fmt::Debug;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use mz_compute_types::ComputeInstanceId;
17use mz_compute_types::dataflows::IndexDesc;
18use mz_compute_types::plan::Plan;
19use mz_expr::{MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr, RowSetFinishing};
20use mz_ore::soft_assert_or_log;
21use mz_repr::explain::trace_plan;
22use mz_repr::{GlobalId, RelationType, Timestamp};
23use mz_sql::optimizer_metrics::OptimizerMetrics;
24use mz_sql::plan::HirRelationExpr;
25use mz_sql::session::metadata::SessionMetadata;
26use mz_transform::dataflow::DataflowMetainfo;
27use mz_transform::normalize_lets::normalize_lets;
28use mz_transform::typecheck::{SharedContext as TypecheckContext, empty_context};
29use mz_transform::{StatisticsOracle, TransformCtx};
30use timely::progress::Antichain;
31use tracing::debug_span;
32
33use crate::TimestampContext;
34use crate::catalog::Catalog;
35use crate::coord::peek::{PeekDataflowPlan, PeekPlan, create_fast_path_plan};
36use crate::optimize::dataflows::{
37 ComputeInstanceSnapshot, DataflowBuilder, EvalTime, ExprPrepStyle, prep_relation_expr,
38 prep_scalar_expr,
39};
40use crate::optimize::{
41 MirDataflowDescription, Optimize, OptimizeMode, OptimizerConfig, OptimizerError,
42 optimize_mir_local, trace_plan,
43};
44
45pub struct Optimizer {
46 typecheck_ctx: TypecheckContext,
48 catalog: Arc<Catalog>,
50 compute_instance: ComputeInstanceSnapshot,
52 finishing: RowSetFinishing,
54 select_id: GlobalId,
56 index_id: GlobalId,
58 config: OptimizerConfig,
60 metrics: OptimizerMetrics,
62 duration: Duration,
64}
65
66impl Optimizer {
67 pub fn new(
68 catalog: Arc<Catalog>,
69 compute_instance: ComputeInstanceSnapshot,
70 finishing: RowSetFinishing,
71 select_id: GlobalId,
72 index_id: GlobalId,
73 config: OptimizerConfig,
74 metrics: OptimizerMetrics,
75 ) -> Self {
76 Self {
77 typecheck_ctx: empty_context(),
78 catalog,
79 compute_instance,
80 finishing,
81 select_id,
82 index_id,
83 config,
84 metrics,
85 duration: Default::default(),
86 }
87 }
88
89 pub fn cluster_id(&self) -> ComputeInstanceId {
90 self.compute_instance.instance_id()
91 }
92
93 pub fn finishing(&self) -> &RowSetFinishing {
94 &self.finishing
95 }
96
97 pub fn select_id(&self) -> GlobalId {
98 self.select_id
99 }
100
101 pub fn index_id(&self) -> GlobalId {
102 self.index_id
103 }
104
105 pub fn config(&self) -> &OptimizerConfig {
106 &self.config
107 }
108
109 pub fn metrics(&self) -> &OptimizerMetrics {
110 &self.metrics
111 }
112
113 pub fn duration(&self) -> Duration {
114 self.duration
115 }
116}
117
118impl Debug for Optimizer {
124 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125 f.debug_struct("OptimizePeek")
126 .field("config", &self.config)
127 .finish_non_exhaustive()
128 }
129}
130
131pub struct Unresolved;
134
135#[derive(Clone)]
138pub struct LocalMirPlan<T = Unresolved> {
139 expr: MirRelationExpr,
140 df_meta: DataflowMetainfo,
141 context: T,
142}
143
144pub struct Resolved<'s> {
147 timestamp_ctx: TimestampContext<Timestamp>,
148 stats: Box<dyn StatisticsOracle>,
149 session: &'s dyn SessionMetadata,
150}
151
152#[derive(Debug)]
161pub struct GlobalLirPlan {
162 peek_plan: PeekPlan,
163 df_meta: DataflowMetainfo,
164 typ: RelationType,
165}
166
167impl Optimize<HirRelationExpr> for Optimizer {
168 type To = LocalMirPlan;
169
170 fn optimize(&mut self, expr: HirRelationExpr) -> Result<Self::To, OptimizerError> {
171 let time = Instant::now();
172
173 trace_plan!(at: "raw", &expr);
175
176 let expr = expr.lower(&self.config, Some(&self.metrics))?;
178
179 let mut df_meta = DataflowMetainfo::default();
181 let mut transform_ctx = TransformCtx::local(
182 &self.config.features,
183 &self.typecheck_ctx,
184 &mut df_meta,
185 Some(&self.metrics),
186 );
187 let expr = optimize_mir_local(expr, &mut transform_ctx)?.into_inner();
188
189 self.duration += time.elapsed();
190
191 Ok(LocalMirPlan {
193 expr,
194 df_meta,
195 context: Unresolved,
196 })
197 }
198}
199
200impl LocalMirPlan<Unresolved> {
201 pub fn resolve(
204 self,
205 timestamp_ctx: TimestampContext<Timestamp>,
206 session: &dyn SessionMetadata,
207 stats: Box<dyn StatisticsOracle>,
208 ) -> LocalMirPlan<Resolved> {
209 LocalMirPlan {
210 expr: self.expr,
211 df_meta: self.df_meta,
212 context: Resolved {
213 timestamp_ctx,
214 session,
215 stats,
216 },
217 }
218 }
219}
220
221impl<'s> Optimize<LocalMirPlan<Resolved<'s>>> for Optimizer {
222 type To = GlobalLirPlan;
223
224 fn optimize(&mut self, plan: LocalMirPlan<Resolved<'s>>) -> Result<Self::To, OptimizerError> {
225 let time = Instant::now();
226
227 let LocalMirPlan {
228 expr,
229 mut df_meta,
230 context:
231 Resolved {
232 timestamp_ctx,
233 stats,
234 session,
235 },
236 } = plan;
237
238 let expr = OptimizedMirRelationExpr(expr);
239
240 let typ = expr.typ();
244 let key = typ
245 .default_key()
246 .iter()
247 .map(|k| MirScalarExpr::Column(*k))
248 .collect();
249
250 let mut df_builder = {
252 let catalog = self.catalog.state();
253 let compute = self.compute_instance.clone();
254 DataflowBuilder::new(catalog, compute).with_config(&self.config)
255 };
256
257 let debug_name = format!("oneshot-select-{}", self.select_id);
258 let mut df_desc = MirDataflowDescription::new(debug_name.to_string());
259
260 df_builder.import_view_into_dataflow(
261 &self.select_id,
262 &expr,
263 &mut df_desc,
264 &self.config.features,
265 )?;
266 df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
267
268 let style = ExprPrepStyle::OneShot {
271 logical_time: EvalTime::Deferred,
272 session,
273 catalog_state: self.catalog.state(),
274 };
275 df_desc.visit_children(
276 |r| prep_relation_expr(r, style),
277 |s| prep_scalar_expr(s, style),
278 )?;
279
280 if self.config.mode != OptimizeMode::Explain {
284 df_desc.export_index(
285 self.index_id,
286 IndexDesc {
287 on_id: self.select_id,
288 key,
289 },
290 typ.clone(),
291 );
292 }
293
294 df_desc.set_as_of(timestamp_ctx.antichain());
296
297 if let Some(until) = timestamp_ctx
307 .timestamp()
308 .and_then(Timestamp::try_step_forward)
309 {
310 df_desc.until = Antichain::from_elem(until);
311 }
312
313 let mut transform_ctx = TransformCtx::global(
315 &df_builder,
316 &*stats,
317 &self.config.features,
318 &self.typecheck_ctx,
319 &mut df_meta,
320 Some(&self.metrics),
321 );
322
323 let use_fast_path_optimizer = match create_fast_path_plan(
328 &mut df_desc,
329 self.select_id,
330 Some(&self.finishing),
331 self.config.features.persist_fast_path_limit,
332 self.config.persist_fast_path_order,
333 ) {
334 Ok(maybe_fast_path_plan) => maybe_fast_path_plan.is_some(),
335 Err(OptimizerError::UnsafeMfpPlan) => {
336 false
339 }
340 Err(e) => {
341 return Err(e);
342 }
343 };
344
345 mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, use_fast_path_optimizer)?;
347
348 if self.config.mode == OptimizeMode::Explain {
349 trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
351 }
352
353 let as_of = df_desc
355 .as_of
356 .clone()
357 .expect("as_of antichain")
358 .into_option()
359 .expect("unique as_of element");
360
361 let style = ExprPrepStyle::OneShot {
363 logical_time: EvalTime::Time(as_of),
364 session,
365 catalog_state: self.catalog.state(),
366 };
367 df_desc.visit_children(
368 |r| prep_relation_expr(r, style),
369 |s| prep_scalar_expr(s, style),
370 )?;
371
372 let peek_plan = match create_fast_path_plan(
382 &mut df_desc,
383 self.select_id,
384 Some(&self.finishing),
385 self.config.features.persist_fast_path_limit,
386 self.config.persist_fast_path_order,
387 )? {
388 Some(plan) if !self.config.no_fast_path => {
389 if self.config.mode == OptimizeMode::Explain {
390 debug_span!(target: "optimizer", "fast_path").in_scope(|| {
392 let finishing = if !self.finishing.is_trivial(typ.arity()) {
394 Some(&self.finishing)
395 } else {
396 None
397 };
398 trace_plan(&plan.used_indexes(finishing));
399 });
400 }
401 trace_plan!(at: "fast_path", &plan);
403
404 trace_plan(&plan);
406
407 PeekPlan::FastPath(plan)
409 }
410 _ => {
411 soft_assert_or_log!(
412 !use_fast_path_optimizer || self.config.no_fast_path,
413 "The fast_path_optimizer shouldn't make a fast path plan slow path."
414 );
415
416 for build in df_desc.objects_to_build.iter_mut() {
418 normalize_lets(&mut build.plan.0, &self.config.features)?
419 }
420
421 let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
425
426 trace_plan(&df_desc);
428
429 PeekPlan::SlowPath(PeekDataflowPlan::new(df_desc, self.index_id(), &typ))
431 }
432 };
433
434 self.duration += time.elapsed();
435 let label = match &peek_plan {
436 PeekPlan::FastPath(_) => "peek:fast_path",
437 PeekPlan::SlowPath(_) => "peek:slow_path",
438 };
439 self.metrics
440 .observe_e2e_optimization_time(label, self.duration);
441
442 Ok(GlobalLirPlan {
443 peek_plan,
444 df_meta,
445 typ,
446 })
447 }
448}
449
450impl GlobalLirPlan {
451 pub fn unapply(self) -> (PeekPlan, DataflowMetainfo, RelationType) {
453 (self.peek_plan, self.df_meta, self.typ)
454 }
455}