1use std::fmt::Debug;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use mz_compute_types::ComputeInstanceId;
17use mz_compute_types::dataflows::IndexDesc;
18use mz_compute_types::plan::Plan;
19use mz_expr::{MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr, RowSetFinishing};
20use mz_ore::soft_assert_or_log;
21use mz_repr::explain::trace_plan;
22use mz_repr::{GlobalId, SqlRelationType, Timestamp};
23use mz_sql::optimizer_metrics::OptimizerMetrics;
24use mz_sql::plan::HirRelationExpr;
25use mz_sql::session::metadata::SessionMetadata;
26use mz_transform::dataflow::DataflowMetainfo;
27use mz_transform::normalize_lets::normalize_lets;
28use mz_transform::reprtypecheck::{
29 SharedContext as ReprTypecheckContext, empty_context as empty_repr_context,
30};
31use mz_transform::{StatisticsOracle, TransformCtx};
32use timely::progress::Antichain;
33use tracing::debug_span;
34
35use crate::TimestampContext;
36use crate::catalog::Catalog;
37use crate::coord::infer_sql_type_for_catalog;
38use crate::coord::peek::{PeekDataflowPlan, PeekPlan, create_fast_path_plan};
39use crate::optimize::dataflows::{
40 ComputeInstanceSnapshot, DataflowBuilder, EvalTime, ExprPrep, ExprPrepOneShot,
41};
42use crate::optimize::{
43 MirDataflowDescription, Optimize, OptimizeMode, OptimizerConfig, OptimizerError,
44 optimize_mir_local, trace_plan,
45};
46
47pub struct Optimizer {
48 repr_typecheck_ctx: ReprTypecheckContext,
50 catalog: Arc<Catalog>,
52 compute_instance: ComputeInstanceSnapshot,
54 finishing: RowSetFinishing,
56 select_id: GlobalId,
58 index_id: GlobalId,
60 config: OptimizerConfig,
62 metrics: OptimizerMetrics,
64 duration: Duration,
66}
67
68impl Optimizer {
69 pub fn new(
70 catalog: Arc<Catalog>,
71 compute_instance: ComputeInstanceSnapshot,
72 finishing: RowSetFinishing,
73 select_id: GlobalId,
74 index_id: GlobalId,
75 config: OptimizerConfig,
76 metrics: OptimizerMetrics,
77 ) -> Self {
78 Self {
79 repr_typecheck_ctx: empty_repr_context(),
80 catalog,
81 compute_instance,
82 finishing,
83 select_id,
84 index_id,
85 config,
86 metrics,
87 duration: Default::default(),
88 }
89 }
90
91 pub fn cluster_id(&self) -> ComputeInstanceId {
92 self.compute_instance.instance_id()
93 }
94
95 pub fn finishing(&self) -> &RowSetFinishing {
96 &self.finishing
97 }
98
99 pub fn select_id(&self) -> GlobalId {
100 self.select_id
101 }
102
103 pub fn index_id(&self) -> GlobalId {
104 self.index_id
105 }
106
107 pub fn config(&self) -> &OptimizerConfig {
108 &self.config
109 }
110
111 pub fn metrics(&self) -> &OptimizerMetrics {
112 &self.metrics
113 }
114
115 pub fn duration(&self) -> Duration {
116 self.duration
117 }
118}
119
120impl Debug for Optimizer {
126 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127 f.debug_struct("OptimizePeek")
128 .field("config", &self.config)
129 .finish_non_exhaustive()
130 }
131}
132
133pub struct Unresolved;
136
137#[derive(Clone)]
140pub struct LocalMirPlan<T = Unresolved> {
141 expr: MirRelationExpr,
142 typ: SqlRelationType,
143 df_meta: DataflowMetainfo,
144 context: T,
145}
146
147pub struct Resolved<'s> {
150 timestamp_ctx: TimestampContext<Timestamp>,
151 stats: Box<dyn StatisticsOracle>,
152 session: &'s dyn SessionMetadata,
153}
154
155#[derive(Debug)]
164pub struct GlobalLirPlan {
165 peek_plan: PeekPlan,
166 df_meta: DataflowMetainfo,
167 typ: SqlRelationType,
168}
169
170impl Optimize<HirRelationExpr> for Optimizer {
171 type To = LocalMirPlan;
172
173 fn optimize(&mut self, expr: HirRelationExpr) -> Result<Self::To, OptimizerError> {
174 let time = Instant::now();
175
176 trace_plan!(at: "raw", &expr);
178
179 let mir_expr = expr.clone().lower(&self.config, Some(&self.metrics))?;
181
182 let mut df_meta = DataflowMetainfo::default();
184 let mut transform_ctx = TransformCtx::local(
185 &self.config.features,
186 &self.repr_typecheck_ctx,
187 &mut df_meta,
188 Some(&mut self.metrics),
189 Some(self.select_id),
190 );
191 let mir_expr = optimize_mir_local(mir_expr, &mut transform_ctx)?.into_inner();
192 let typ = infer_sql_type_for_catalog(&expr, &mir_expr);
193
194 self.duration += time.elapsed();
195
196 Ok(LocalMirPlan {
198 expr: mir_expr,
199 typ,
200 df_meta,
201 context: Unresolved,
202 })
203 }
204}
205
206impl LocalMirPlan<Unresolved> {
207 pub fn resolve(
210 self,
211 timestamp_ctx: TimestampContext<Timestamp>,
212 session: &dyn SessionMetadata,
213 stats: Box<dyn StatisticsOracle>,
214 ) -> LocalMirPlan<Resolved<'_>> {
215 LocalMirPlan {
216 expr: self.expr,
217 typ: self.typ,
218 df_meta: self.df_meta,
219 context: Resolved {
220 timestamp_ctx,
221 session,
222 stats,
223 },
224 }
225 }
226}
227
228impl<'s> Optimize<LocalMirPlan<Resolved<'s>>> for Optimizer {
229 type To = GlobalLirPlan;
230
231 fn optimize(&mut self, plan: LocalMirPlan<Resolved<'s>>) -> Result<Self::To, OptimizerError> {
232 let time = Instant::now();
233
234 let LocalMirPlan {
235 expr,
236 typ,
237 mut df_meta,
238 context:
239 Resolved {
240 timestamp_ctx,
241 stats,
242 session,
243 },
244 } = plan;
245
246 let expr = OptimizedMirRelationExpr(expr);
247
248 let key = typ
252 .default_key()
253 .iter()
254 .map(|k| MirScalarExpr::column(*k))
255 .collect();
256
257 let mut df_builder = {
259 let catalog = self.catalog.state();
260 let compute = self.compute_instance.clone();
261 DataflowBuilder::new(catalog, compute).with_config(&self.config)
262 };
263
264 let debug_name = format!("oneshot-select-{}", self.select_id);
265 let mut df_desc = MirDataflowDescription::new(debug_name.to_string());
266
267 df_builder.import_view_into_dataflow(
268 &self.select_id,
269 &expr,
270 &mut df_desc,
271 &self.config.features,
272 )?;
273 df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
274
275 let style = ExprPrepOneShot {
278 logical_time: EvalTime::Deferred,
279 session,
280 catalog_state: self.catalog.state(),
281 };
282 df_desc.visit_children(
283 |r| style.prep_relation_expr(r),
284 |s| style.prep_scalar_expr(s),
285 )?;
286
287 if self.config.mode != OptimizeMode::Explain {
291 df_desc.export_index(
292 self.index_id,
293 IndexDesc {
294 on_id: self.select_id,
295 key,
296 },
297 typ.clone(),
298 );
299 }
300
301 df_desc.set_as_of(timestamp_ctx.antichain());
303
304 let as_of = df_desc
306 .as_of
307 .clone()
308 .expect("as_of antichain")
309 .into_option()
310 .expect("unique as_of element");
311
312 let style = ExprPrepOneShot {
314 logical_time: EvalTime::Time(as_of),
315 session,
316 catalog_state: self.catalog.state(),
317 };
318 df_desc.visit_children(
319 |r| style.prep_relation_expr(r),
320 |s| style.prep_scalar_expr(s),
321 )?;
322
323 if let Some(until) = timestamp_ctx
333 .timestamp()
334 .and_then(Timestamp::try_step_forward)
335 {
336 df_desc.until = Antichain::from_elem(until);
337 }
338
339 let mut transform_ctx = TransformCtx::global(
341 &df_builder,
342 &*stats,
343 &self.config.features,
344 &self.repr_typecheck_ctx,
345 &mut df_meta,
346 Some(&mut self.metrics),
347 );
348
349 let use_fast_path_optimizer = match create_fast_path_plan(
354 &mut df_desc,
355 self.select_id,
356 Some(&self.finishing),
357 self.config.features.persist_fast_path_limit,
358 self.config.persist_fast_path_order,
359 ) {
360 Ok(maybe_fast_path_plan) => maybe_fast_path_plan.is_some(),
361 Err(OptimizerError::InternalUnsafeMfpPlan(_)) => {
362 false
365 }
366 Err(e) => {
367 return Err(e);
368 }
369 };
370
371 mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, use_fast_path_optimizer)?;
373
374 if self.config.mode == OptimizeMode::Explain {
375 trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
377 }
378
379 let peek_plan = match create_fast_path_plan(
389 &mut df_desc,
390 self.select_id,
391 Some(&self.finishing),
392 self.config.features.persist_fast_path_limit,
393 self.config.persist_fast_path_order,
394 )? {
395 Some(plan) if !self.config.no_fast_path => {
396 if self.config.mode == OptimizeMode::Explain {
397 debug_span!(target: "optimizer", "fast_path").in_scope(|| {
399 let finishing = if !self.finishing.is_trivial(typ.arity()) {
401 Some(&self.finishing)
402 } else {
403 None
404 };
405 trace_plan(&plan.used_indexes(finishing));
406 });
407 }
408 trace_plan!(at: "fast_path", &plan);
410
411 trace_plan(&plan);
413
414 PeekPlan::FastPath(plan)
416 }
417 _ => {
418 soft_assert_or_log!(
419 !use_fast_path_optimizer || self.config.no_fast_path,
420 "The fast_path_optimizer shouldn't make a fast path plan slow path."
421 );
422
423 for build in df_desc.objects_to_build.iter_mut() {
425 normalize_lets(&mut build.plan.0, &self.config.features)?
426 }
427
428 let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
432
433 trace_plan(&df_desc);
435
436 PeekPlan::SlowPath(PeekDataflowPlan::new(df_desc, self.index_id(), &typ))
438 }
439 };
440
441 self.duration += time.elapsed();
442 let label = match &peek_plan {
443 PeekPlan::FastPath(_) => "peek:fast_path",
444 PeekPlan::SlowPath(_) => "peek:slow_path",
445 };
446 self.metrics
447 .observe_e2e_optimization_time(label, self.duration);
448
449 Ok(GlobalLirPlan {
450 peek_plan,
451 df_meta,
452 typ,
453 })
454 }
455}
456
457impl GlobalLirPlan {
458 pub fn peek_plan(&self) -> &PeekPlan {
460 &self.peek_plan
461 }
462
463 pub fn unapply(self) -> (PeekPlan, DataflowMetainfo, SqlRelationType) {
465 (self.peek_plan, self.df_meta, self.typ)
466 }
467}