1use std::fmt::Debug;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use mz_compute_types::ComputeInstanceId;
17use mz_compute_types::dataflows::IndexDesc;
18use mz_compute_types::plan::Plan;
19use mz_expr::{MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr, RowSetFinishing};
20use mz_ore::soft_assert_or_log;
21use mz_repr::explain::trace_plan;
22use mz_repr::{GlobalId, SqlRelationType, Timestamp};
23use mz_sql::optimizer_metrics::OptimizerMetrics;
24use mz_sql::plan::HirRelationExpr;
25use mz_sql::session::metadata::SessionMetadata;
26use mz_transform::dataflow::DataflowMetainfo;
27use mz_transform::normalize_lets::normalize_lets;
28use mz_transform::reprtypecheck::{
29 SharedContext as ReprTypecheckContext, empty_context as empty_repr_context,
30};
31use mz_transform::{StatisticsOracle, TransformCtx};
32use timely::progress::Antichain;
33use tracing::debug_span;
34
35use crate::TimestampContext;
36use crate::catalog::Catalog;
37use crate::coord::peek::{PeekDataflowPlan, PeekPlan, create_fast_path_plan};
38use crate::optimize::dataflows::{
39 ComputeInstanceSnapshot, DataflowBuilder, EvalTime, ExprPrep, ExprPrepOneShot,
40};
41use crate::optimize::{
42 MirDataflowDescription, Optimize, OptimizeMode, OptimizerConfig, OptimizerError,
43 optimize_mir_local, trace_plan,
44};
45
46pub struct Optimizer {
47 repr_typecheck_ctx: ReprTypecheckContext,
49 catalog: Arc<Catalog>,
51 compute_instance: ComputeInstanceSnapshot,
53 finishing: RowSetFinishing,
55 select_id: GlobalId,
57 index_id: GlobalId,
59 config: OptimizerConfig,
61 metrics: OptimizerMetrics,
63 duration: Duration,
65}
66
67impl Optimizer {
68 pub fn new(
69 catalog: Arc<Catalog>,
70 compute_instance: ComputeInstanceSnapshot,
71 finishing: RowSetFinishing,
72 select_id: GlobalId,
73 index_id: GlobalId,
74 config: OptimizerConfig,
75 metrics: OptimizerMetrics,
76 ) -> Self {
77 Self {
78 repr_typecheck_ctx: empty_repr_context(),
79 catalog,
80 compute_instance,
81 finishing,
82 select_id,
83 index_id,
84 config,
85 metrics,
86 duration: Default::default(),
87 }
88 }
89
90 pub fn cluster_id(&self) -> ComputeInstanceId {
91 self.compute_instance.instance_id()
92 }
93
94 pub fn finishing(&self) -> &RowSetFinishing {
95 &self.finishing
96 }
97
98 pub fn select_id(&self) -> GlobalId {
99 self.select_id
100 }
101
102 pub fn index_id(&self) -> GlobalId {
103 self.index_id
104 }
105
106 pub fn config(&self) -> &OptimizerConfig {
107 &self.config
108 }
109
110 pub fn metrics(&self) -> &OptimizerMetrics {
111 &self.metrics
112 }
113
114 pub fn duration(&self) -> Duration {
115 self.duration
116 }
117}
118
119impl Debug for Optimizer {
125 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126 f.debug_struct("OptimizePeek")
127 .field("config", &self.config)
128 .finish_non_exhaustive()
129 }
130}
131
132pub struct Unresolved;
135
136#[derive(Clone)]
139pub struct LocalMirPlan<T = Unresolved> {
140 expr: MirRelationExpr,
141 df_meta: DataflowMetainfo,
142 context: T,
143}
144
145pub struct Resolved<'s> {
148 timestamp_ctx: TimestampContext<Timestamp>,
149 stats: Box<dyn StatisticsOracle>,
150 session: &'s dyn SessionMetadata,
151}
152
153#[derive(Debug)]
162pub struct GlobalLirPlan {
163 peek_plan: PeekPlan,
164 df_meta: DataflowMetainfo,
165 typ: SqlRelationType,
166}
167
168impl Optimize<HirRelationExpr> for Optimizer {
169 type To = LocalMirPlan;
170
171 fn optimize(&mut self, expr: HirRelationExpr) -> Result<Self::To, OptimizerError> {
172 let time = Instant::now();
173
174 trace_plan!(at: "raw", &expr);
176
177 let expr = expr.lower(&self.config, Some(&self.metrics))?;
179
180 let mut df_meta = DataflowMetainfo::default();
182 let mut transform_ctx = TransformCtx::local(
183 &self.config.features,
184 &self.repr_typecheck_ctx,
185 &mut df_meta,
186 Some(&mut self.metrics),
187 Some(self.select_id),
188 );
189 let expr = optimize_mir_local(expr, &mut transform_ctx)?.into_inner();
190
191 self.duration += time.elapsed();
192
193 Ok(LocalMirPlan {
195 expr,
196 df_meta,
197 context: Unresolved,
198 })
199 }
200}
201
202impl LocalMirPlan<Unresolved> {
203 pub fn resolve(
206 self,
207 timestamp_ctx: TimestampContext<Timestamp>,
208 session: &dyn SessionMetadata,
209 stats: Box<dyn StatisticsOracle>,
210 ) -> LocalMirPlan<Resolved<'_>> {
211 LocalMirPlan {
212 expr: self.expr,
213 df_meta: self.df_meta,
214 context: Resolved {
215 timestamp_ctx,
216 session,
217 stats,
218 },
219 }
220 }
221}
222
223impl<'s> Optimize<LocalMirPlan<Resolved<'s>>> for Optimizer {
224 type To = GlobalLirPlan;
225
226 fn optimize(&mut self, plan: LocalMirPlan<Resolved<'s>>) -> Result<Self::To, OptimizerError> {
227 let time = Instant::now();
228
229 let LocalMirPlan {
230 expr,
231 mut df_meta,
232 context:
233 Resolved {
234 timestamp_ctx,
235 stats,
236 session,
237 },
238 } = plan;
239
240 let expr = OptimizedMirRelationExpr(expr);
241
242 let typ = expr.typ();
246 let key = typ
247 .default_key()
248 .iter()
249 .map(|k| MirScalarExpr::column(*k))
250 .collect();
251
252 let mut df_builder = {
254 let catalog = self.catalog.state();
255 let compute = self.compute_instance.clone();
256 DataflowBuilder::new(catalog, compute).with_config(&self.config)
257 };
258
259 let debug_name = format!("oneshot-select-{}", self.select_id);
260 let mut df_desc = MirDataflowDescription::new(debug_name.to_string());
261
262 df_builder.import_view_into_dataflow(
263 &self.select_id,
264 &expr,
265 &mut df_desc,
266 &self.config.features,
267 )?;
268 df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
269
270 let style = ExprPrepOneShot {
273 logical_time: EvalTime::Deferred,
274 session,
275 catalog_state: self.catalog.state(),
276 };
277 df_desc.visit_children(
278 |r| style.prep_relation_expr(r),
279 |s| style.prep_scalar_expr(s),
280 )?;
281
282 if self.config.mode != OptimizeMode::Explain {
286 df_desc.export_index(
287 self.index_id,
288 IndexDesc {
289 on_id: self.select_id,
290 key,
291 },
292 typ.clone(),
293 );
294 }
295
296 df_desc.set_as_of(timestamp_ctx.antichain());
298
299 let as_of = df_desc
301 .as_of
302 .clone()
303 .expect("as_of antichain")
304 .into_option()
305 .expect("unique as_of element");
306
307 let style = ExprPrepOneShot {
309 logical_time: EvalTime::Time(as_of),
310 session,
311 catalog_state: self.catalog.state(),
312 };
313 df_desc.visit_children(
314 |r| style.prep_relation_expr(r),
315 |s| style.prep_scalar_expr(s),
316 )?;
317
318 if let Some(until) = timestamp_ctx
328 .timestamp()
329 .and_then(Timestamp::try_step_forward)
330 {
331 df_desc.until = Antichain::from_elem(until);
332 }
333
334 let mut transform_ctx = TransformCtx::global(
336 &df_builder,
337 &*stats,
338 &self.config.features,
339 &self.repr_typecheck_ctx,
340 &mut df_meta,
341 Some(&mut self.metrics),
342 );
343
344 let use_fast_path_optimizer = match create_fast_path_plan(
349 &mut df_desc,
350 self.select_id,
351 Some(&self.finishing),
352 self.config.features.persist_fast_path_limit,
353 self.config.persist_fast_path_order,
354 ) {
355 Ok(maybe_fast_path_plan) => maybe_fast_path_plan.is_some(),
356 Err(OptimizerError::InternalUnsafeMfpPlan(_)) => {
357 false
360 }
361 Err(e) => {
362 return Err(e);
363 }
364 };
365
366 mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, use_fast_path_optimizer)?;
368
369 if self.config.mode == OptimizeMode::Explain {
370 trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
372 }
373
374 let peek_plan = match create_fast_path_plan(
384 &mut df_desc,
385 self.select_id,
386 Some(&self.finishing),
387 self.config.features.persist_fast_path_limit,
388 self.config.persist_fast_path_order,
389 )? {
390 Some(plan) if !self.config.no_fast_path => {
391 if self.config.mode == OptimizeMode::Explain {
392 debug_span!(target: "optimizer", "fast_path").in_scope(|| {
394 let finishing = if !self.finishing.is_trivial(typ.arity()) {
396 Some(&self.finishing)
397 } else {
398 None
399 };
400 trace_plan(&plan.used_indexes(finishing));
401 });
402 }
403 trace_plan!(at: "fast_path", &plan);
405
406 trace_plan(&plan);
408
409 PeekPlan::FastPath(plan)
411 }
412 _ => {
413 soft_assert_or_log!(
414 !use_fast_path_optimizer || self.config.no_fast_path,
415 "The fast_path_optimizer shouldn't make a fast path plan slow path."
416 );
417
418 for build in df_desc.objects_to_build.iter_mut() {
420 normalize_lets(&mut build.plan.0, &self.config.features)?
421 }
422
423 let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
427
428 trace_plan(&df_desc);
430
431 PeekPlan::SlowPath(PeekDataflowPlan::new(df_desc, self.index_id(), &typ))
433 }
434 };
435
436 self.duration += time.elapsed();
437 let label = match &peek_plan {
438 PeekPlan::FastPath(_) => "peek:fast_path",
439 PeekPlan::SlowPath(_) => "peek:slow_path",
440 };
441 self.metrics
442 .observe_e2e_optimization_time(label, self.duration);
443
444 Ok(GlobalLirPlan {
445 peek_plan,
446 df_meta,
447 typ,
448 })
449 }
450}
451
452impl GlobalLirPlan {
453 pub fn unapply(self) -> (PeekPlan, DataflowMetainfo, SqlRelationType) {
455 (self.peek_plan, self.df_meta, self.typ)
456 }
457}