1use std::fmt::Debug;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use mz_compute_types::ComputeInstanceId;
17use mz_compute_types::dataflows::IndexDesc;
18use mz_compute_types::plan::Plan;
19use mz_expr::{MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr, RowSetFinishing};
20use mz_ore::soft_assert_or_log;
21use mz_repr::explain::trace_plan;
22use mz_repr::{GlobalId, ReprRelationType, SqlRelationType, Timestamp};
23use mz_sql::optimizer_metrics::OptimizerMetrics;
24use mz_sql::plan::HirRelationExpr;
25use mz_sql::session::metadata::SessionMetadata;
26use mz_transform::dataflow::DataflowMetainfo;
27use mz_transform::normalize_lets::normalize_lets;
28use mz_transform::typecheck::{SharedTypecheckingContext, empty_typechecking_context};
29use mz_transform::{StatisticsOracle, TransformCtx};
30use timely::progress::Antichain;
31use tracing::debug_span;
32
33use crate::TimestampContext;
34use crate::catalog::Catalog;
35use crate::coord::infer_sql_type_for_catalog;
36use crate::coord::peek::{PeekDataflowPlan, PeekPlan, create_fast_path_plan};
37use crate::optimize::dataflows::{
38 ComputeInstanceSnapshot, DataflowBuilder, EvalTime, ExprPrep, ExprPrepOneShot,
39};
40use crate::optimize::{
41 MirDataflowDescription, Optimize, OptimizeMode, OptimizerConfig, OptimizerError,
42 optimize_mir_local, trace_plan,
43};
44
45pub struct Optimizer {
46 typecheck_ctx: SharedTypecheckingContext,
48 catalog: Arc<Catalog>,
50 compute_instance: ComputeInstanceSnapshot,
52 finishing: RowSetFinishing,
54 select_id: GlobalId,
56 index_id: GlobalId,
58 config: OptimizerConfig,
60 metrics: OptimizerMetrics,
62 duration: Duration,
64}
65
66impl Optimizer {
67 pub fn new(
68 catalog: Arc<Catalog>,
69 compute_instance: ComputeInstanceSnapshot,
70 finishing: RowSetFinishing,
71 select_id: GlobalId,
72 index_id: GlobalId,
73 config: OptimizerConfig,
74 metrics: OptimizerMetrics,
75 ) -> Self {
76 Self {
77 typecheck_ctx: empty_typechecking_context(),
78 catalog,
79 compute_instance,
80 finishing,
81 select_id,
82 index_id,
83 config,
84 metrics,
85 duration: Default::default(),
86 }
87 }
88
89 pub fn cluster_id(&self) -> ComputeInstanceId {
90 self.compute_instance.instance_id()
91 }
92
93 pub fn finishing(&self) -> &RowSetFinishing {
94 &self.finishing
95 }
96
97 pub fn select_id(&self) -> GlobalId {
98 self.select_id
99 }
100
101 pub fn index_id(&self) -> GlobalId {
102 self.index_id
103 }
104
105 pub fn config(&self) -> &OptimizerConfig {
106 &self.config
107 }
108
109 pub fn metrics(&self) -> &OptimizerMetrics {
110 &self.metrics
111 }
112
113 pub fn duration(&self) -> Duration {
114 self.duration
115 }
116}
117
118impl Debug for Optimizer {
124 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125 f.debug_struct("OptimizePeek")
126 .field("config", &self.config)
127 .finish_non_exhaustive()
128 }
129}
130
131pub struct Unresolved;
134
135#[derive(Clone)]
138pub struct LocalMirPlan<T = Unresolved> {
139 expr: MirRelationExpr,
140 typ: SqlRelationType,
141 df_meta: DataflowMetainfo,
142 context: T,
143}
144
145pub struct Resolved<'s> {
148 timestamp_ctx: TimestampContext<Timestamp>,
149 stats: Box<dyn StatisticsOracle>,
150 session: &'s dyn SessionMetadata,
151}
152
153#[derive(Debug)]
162pub struct GlobalLirPlan {
163 peek_plan: PeekPlan,
164 df_meta: DataflowMetainfo,
165 typ: SqlRelationType,
166}
167
168impl Optimize<HirRelationExpr> for Optimizer {
169 type To = LocalMirPlan;
170
171 fn optimize(&mut self, expr: HirRelationExpr) -> Result<Self::To, OptimizerError> {
172 let time = Instant::now();
173
174 trace_plan!(at: "raw", &expr);
176
177 let mir_expr = expr.clone().lower(&self.config, Some(&self.metrics))?;
179
180 let mut df_meta = DataflowMetainfo::default();
182 let mut transform_ctx = TransformCtx::local(
183 &self.config.features,
184 &self.typecheck_ctx,
185 &mut df_meta,
186 Some(&mut self.metrics),
187 Some(self.select_id),
188 );
189 let mir_expr = optimize_mir_local(mir_expr, &mut transform_ctx)?.into_inner();
190 let typ = infer_sql_type_for_catalog(&expr, &mir_expr);
191
192 self.duration += time.elapsed();
193
194 Ok(LocalMirPlan {
196 expr: mir_expr,
197 typ,
198 df_meta,
199 context: Unresolved,
200 })
201 }
202}
203
204impl LocalMirPlan<Unresolved> {
205 pub fn resolve(
208 self,
209 timestamp_ctx: TimestampContext<Timestamp>,
210 session: &dyn SessionMetadata,
211 stats: Box<dyn StatisticsOracle>,
212 ) -> LocalMirPlan<Resolved<'_>> {
213 LocalMirPlan {
214 expr: self.expr,
215 typ: self.typ,
216 df_meta: self.df_meta,
217 context: Resolved {
218 timestamp_ctx,
219 session,
220 stats,
221 },
222 }
223 }
224}
225
226impl<'s> Optimize<LocalMirPlan<Resolved<'s>>> for Optimizer {
227 type To = GlobalLirPlan;
228
229 fn optimize(&mut self, plan: LocalMirPlan<Resolved<'s>>) -> Result<Self::To, OptimizerError> {
230 let time = Instant::now();
231
232 let LocalMirPlan {
233 expr,
234 typ,
235 mut df_meta,
236 context:
237 Resolved {
238 timestamp_ctx,
239 stats,
240 session,
241 },
242 } = plan;
243
244 let expr = OptimizedMirRelationExpr(expr);
245
246 let key = typ
250 .default_key()
251 .iter()
252 .map(|k| MirScalarExpr::column(*k))
253 .collect();
254
255 let mut df_builder = {
257 let catalog = self.catalog.state();
258 let compute = self.compute_instance.clone();
259 DataflowBuilder::new(catalog, compute).with_config(&self.config)
260 };
261
262 let debug_name = format!("oneshot-select-{}", self.select_id);
263 let mut df_desc = MirDataflowDescription::new(debug_name.to_string());
264
265 df_builder.import_view_into_dataflow(
266 &self.select_id,
267 &expr,
268 &mut df_desc,
269 &self.config.features,
270 )?;
271 df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
272
273 let style = ExprPrepOneShot {
276 logical_time: EvalTime::Deferred,
277 session,
278 catalog_state: self.catalog.state(),
279 };
280 df_desc.visit_children(
281 |r| style.prep_relation_expr(r),
282 |s| style.prep_scalar_expr(s),
283 )?;
284
285 if self.config.mode != OptimizeMode::Explain {
289 df_desc.export_index(
290 self.index_id,
291 IndexDesc {
292 on_id: self.select_id,
293 key,
294 },
295 ReprRelationType::from(&typ),
296 );
297 }
298
299 df_desc.set_as_of(timestamp_ctx.antichain());
301
302 let as_of = df_desc
304 .as_of
305 .clone()
306 .expect("as_of antichain")
307 .into_option()
308 .expect("unique as_of element");
309
310 let style = ExprPrepOneShot {
312 logical_time: EvalTime::Time(as_of),
313 session,
314 catalog_state: self.catalog.state(),
315 };
316 df_desc.visit_children(
317 |r| style.prep_relation_expr(r),
318 |s| style.prep_scalar_expr(s),
319 )?;
320
321 if let Some(until) = timestamp_ctx
331 .timestamp()
332 .and_then(Timestamp::try_step_forward)
333 {
334 df_desc.until = Antichain::from_elem(until);
335 }
336
337 let mut transform_ctx = TransformCtx::global(
339 &df_builder,
340 &*stats,
341 &self.config.features,
342 &self.typecheck_ctx,
343 &mut df_meta,
344 Some(&mut self.metrics),
345 );
346
347 let use_fast_path_optimizer = match create_fast_path_plan(
352 &mut df_desc,
353 self.select_id,
354 Some(&self.finishing),
355 self.config.features.persist_fast_path_limit,
356 self.config.persist_fast_path_order,
357 ) {
358 Ok(maybe_fast_path_plan) => maybe_fast_path_plan.is_some(),
359 Err(OptimizerError::InternalUnsafeMfpPlan(_)) => {
360 false
363 }
364 Err(e) => {
365 return Err(e);
366 }
367 };
368
369 mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, use_fast_path_optimizer)?;
371
372 if self.config.mode == OptimizeMode::Explain {
373 trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
375 }
376
377 let peek_plan = match create_fast_path_plan(
387 &mut df_desc,
388 self.select_id,
389 Some(&self.finishing),
390 self.config.features.persist_fast_path_limit,
391 self.config.persist_fast_path_order,
392 )? {
393 Some(plan) if !self.config.no_fast_path => {
394 if self.config.mode == OptimizeMode::Explain {
395 debug_span!(target: "optimizer", "fast_path").in_scope(|| {
397 let finishing = if !self.finishing.is_trivial(typ.arity()) {
399 Some(&self.finishing)
400 } else {
401 None
402 };
403 trace_plan(&plan.used_indexes(finishing));
404 });
405 }
406 trace_plan!(at: "fast_path", &plan);
408
409 trace_plan(&plan);
411
412 PeekPlan::FastPath(plan)
414 }
415 _ => {
416 soft_assert_or_log!(
417 !use_fast_path_optimizer || self.config.no_fast_path,
418 "The fast_path_optimizer shouldn't make a fast path plan slow path."
419 );
420
421 for build in df_desc.objects_to_build.iter_mut() {
423 normalize_lets(&mut build.plan.0, &self.config.features)?
424 }
425
426 let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
430
431 trace_plan(&df_desc);
433
434 PeekPlan::SlowPath(PeekDataflowPlan::new(df_desc, self.index_id(), &typ))
436 }
437 };
438
439 self.duration += time.elapsed();
440 let label = match &peek_plan {
441 PeekPlan::FastPath(_) => "peek:fast_path",
442 PeekPlan::SlowPath(_) => "peek:slow_path",
443 };
444 self.metrics
445 .observe_e2e_optimization_time(label, self.duration);
446
447 Ok(GlobalLirPlan {
448 peek_plan,
449 df_meta,
450 typ,
451 })
452 }
453}
454
455impl GlobalLirPlan {
456 pub fn peek_plan(&self) -> &PeekPlan {
458 &self.peek_plan
459 }
460
461 pub fn unapply(self) -> (PeekPlan, DataflowMetainfo, SqlRelationType) {
463 (self.peek_plan, self.df_meta, self.typ)
464 }
465}