1use std::fmt::Debug;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use mz_compute_types::ComputeInstanceId;
17use mz_compute_types::dataflows::IndexDesc;
18use mz_compute_types::plan::Plan;
19use mz_expr::{MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr, RowSetFinishing};
20use mz_ore::soft_assert_or_log;
21use mz_repr::explain::trace_plan;
22use mz_repr::{GlobalId, SqlRelationType, Timestamp};
23use mz_sql::optimizer_metrics::OptimizerMetrics;
24use mz_sql::plan::HirRelationExpr;
25use mz_sql::session::metadata::SessionMetadata;
26use mz_transform::dataflow::DataflowMetainfo;
27use mz_transform::normalize_lets::normalize_lets;
28use mz_transform::reprtypecheck::{
29 SharedContext as ReprTypecheckContext, empty_context as empty_repr_context,
30};
31use mz_transform::typecheck::{SharedContext as TypecheckContext, empty_context};
32use mz_transform::{StatisticsOracle, TransformCtx};
33use timely::progress::Antichain;
34use tracing::debug_span;
35
36use crate::TimestampContext;
37use crate::catalog::Catalog;
38use crate::coord::peek::{PeekDataflowPlan, PeekPlan, create_fast_path_plan};
39use crate::optimize::dataflows::{
40 ComputeInstanceSnapshot, DataflowBuilder, EvalTime, ExprPrepStyle, prep_relation_expr,
41 prep_scalar_expr,
42};
43use crate::optimize::{
44 MirDataflowDescription, Optimize, OptimizeMode, OptimizerConfig, OptimizerError,
45 optimize_mir_local, trace_plan,
46};
47
48pub struct Optimizer {
49 typecheck_ctx: TypecheckContext,
51 repr_typecheck_ctx: ReprTypecheckContext,
53 catalog: Arc<Catalog>,
55 compute_instance: ComputeInstanceSnapshot,
57 finishing: RowSetFinishing,
59 select_id: GlobalId,
61 index_id: GlobalId,
63 config: OptimizerConfig,
65 metrics: OptimizerMetrics,
67 duration: Duration,
69}
70
71impl Optimizer {
72 pub fn new(
73 catalog: Arc<Catalog>,
74 compute_instance: ComputeInstanceSnapshot,
75 finishing: RowSetFinishing,
76 select_id: GlobalId,
77 index_id: GlobalId,
78 config: OptimizerConfig,
79 metrics: OptimizerMetrics,
80 ) -> Self {
81 Self {
82 typecheck_ctx: empty_context(),
83 repr_typecheck_ctx: empty_repr_context(),
84 catalog,
85 compute_instance,
86 finishing,
87 select_id,
88 index_id,
89 config,
90 metrics,
91 duration: Default::default(),
92 }
93 }
94
95 pub fn cluster_id(&self) -> ComputeInstanceId {
96 self.compute_instance.instance_id()
97 }
98
99 pub fn finishing(&self) -> &RowSetFinishing {
100 &self.finishing
101 }
102
103 pub fn select_id(&self) -> GlobalId {
104 self.select_id
105 }
106
107 pub fn index_id(&self) -> GlobalId {
108 self.index_id
109 }
110
111 pub fn config(&self) -> &OptimizerConfig {
112 &self.config
113 }
114
115 pub fn metrics(&self) -> &OptimizerMetrics {
116 &self.metrics
117 }
118
119 pub fn duration(&self) -> Duration {
120 self.duration
121 }
122}
123
124impl Debug for Optimizer {
130 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131 f.debug_struct("OptimizePeek")
132 .field("config", &self.config)
133 .finish_non_exhaustive()
134 }
135}
136
137pub struct Unresolved;
140
141#[derive(Clone)]
144pub struct LocalMirPlan<T = Unresolved> {
145 expr: MirRelationExpr,
146 df_meta: DataflowMetainfo,
147 context: T,
148}
149
150pub struct Resolved<'s> {
153 timestamp_ctx: TimestampContext<Timestamp>,
154 stats: Box<dyn StatisticsOracle>,
155 session: &'s dyn SessionMetadata,
156}
157
158#[derive(Debug)]
167pub struct GlobalLirPlan {
168 peek_plan: PeekPlan,
169 df_meta: DataflowMetainfo,
170 typ: SqlRelationType,
171}
172
173impl Optimize<HirRelationExpr> for Optimizer {
174 type To = LocalMirPlan;
175
176 fn optimize(&mut self, expr: HirRelationExpr) -> Result<Self::To, OptimizerError> {
177 let time = Instant::now();
178
179 trace_plan!(at: "raw", &expr);
181
182 let expr = expr.lower(&self.config, Some(&self.metrics))?;
184
185 let mut df_meta = DataflowMetainfo::default();
187 let mut transform_ctx = TransformCtx::local(
188 &self.config.features,
189 &self.typecheck_ctx,
190 &self.repr_typecheck_ctx,
191 &mut df_meta,
192 Some(&self.metrics),
193 Some(self.select_id),
194 );
195 let expr = optimize_mir_local(expr, &mut transform_ctx)?.into_inner();
196
197 self.duration += time.elapsed();
198
199 Ok(LocalMirPlan {
201 expr,
202 df_meta,
203 context: Unresolved,
204 })
205 }
206}
207
208impl LocalMirPlan<Unresolved> {
209 pub fn resolve(
212 self,
213 timestamp_ctx: TimestampContext<Timestamp>,
214 session: &dyn SessionMetadata,
215 stats: Box<dyn StatisticsOracle>,
216 ) -> LocalMirPlan<Resolved<'_>> {
217 LocalMirPlan {
218 expr: self.expr,
219 df_meta: self.df_meta,
220 context: Resolved {
221 timestamp_ctx,
222 session,
223 stats,
224 },
225 }
226 }
227}
228
229impl<'s> Optimize<LocalMirPlan<Resolved<'s>>> for Optimizer {
230 type To = GlobalLirPlan;
231
232 fn optimize(&mut self, plan: LocalMirPlan<Resolved<'s>>) -> Result<Self::To, OptimizerError> {
233 let time = Instant::now();
234
235 let LocalMirPlan {
236 expr,
237 mut df_meta,
238 context:
239 Resolved {
240 timestamp_ctx,
241 stats,
242 session,
243 },
244 } = plan;
245
246 let expr = OptimizedMirRelationExpr(expr);
247
248 let typ = expr.typ();
252 let key = typ
253 .default_key()
254 .iter()
255 .map(|k| MirScalarExpr::column(*k))
256 .collect();
257
258 let mut df_builder = {
260 let catalog = self.catalog.state();
261 let compute = self.compute_instance.clone();
262 DataflowBuilder::new(catalog, compute).with_config(&self.config)
263 };
264
265 let debug_name = format!("oneshot-select-{}", self.select_id);
266 let mut df_desc = MirDataflowDescription::new(debug_name.to_string());
267
268 df_builder.import_view_into_dataflow(
269 &self.select_id,
270 &expr,
271 &mut df_desc,
272 &self.config.features,
273 )?;
274 df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
275
276 let style = ExprPrepStyle::OneShot {
279 logical_time: EvalTime::Deferred,
280 session,
281 catalog_state: self.catalog.state(),
282 };
283 df_desc.visit_children(
284 |r| prep_relation_expr(r, style),
285 |s| prep_scalar_expr(s, style),
286 )?;
287
288 if self.config.mode != OptimizeMode::Explain {
292 df_desc.export_index(
293 self.index_id,
294 IndexDesc {
295 on_id: self.select_id,
296 key,
297 },
298 typ.clone(),
299 );
300 }
301
302 df_desc.set_as_of(timestamp_ctx.antichain());
304
305 let as_of = df_desc
307 .as_of
308 .clone()
309 .expect("as_of antichain")
310 .into_option()
311 .expect("unique as_of element");
312
313 let style = ExprPrepStyle::OneShot {
315 logical_time: EvalTime::Time(as_of),
316 session,
317 catalog_state: self.catalog.state(),
318 };
319 df_desc.visit_children(
320 |r| prep_relation_expr(r, style),
321 |s| prep_scalar_expr(s, style),
322 )?;
323
324 if let Some(until) = timestamp_ctx
334 .timestamp()
335 .and_then(Timestamp::try_step_forward)
336 {
337 df_desc.until = Antichain::from_elem(until);
338 }
339
340 let mut transform_ctx = TransformCtx::global(
342 &df_builder,
343 &*stats,
344 &self.config.features,
345 &self.typecheck_ctx,
346 &self.repr_typecheck_ctx,
347 &mut df_meta,
348 Some(&self.metrics),
349 );
350
351 let use_fast_path_optimizer = match create_fast_path_plan(
356 &mut df_desc,
357 self.select_id,
358 Some(&self.finishing),
359 self.config.features.persist_fast_path_limit,
360 self.config.persist_fast_path_order,
361 ) {
362 Ok(maybe_fast_path_plan) => maybe_fast_path_plan.is_some(),
363 Err(OptimizerError::InternalUnsafeMfpPlan(_)) => {
364 false
367 }
368 Err(e) => {
369 return Err(e);
370 }
371 };
372
373 mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, use_fast_path_optimizer)?;
375
376 if self.config.mode == OptimizeMode::Explain {
377 trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
379 }
380
381 let peek_plan = match create_fast_path_plan(
391 &mut df_desc,
392 self.select_id,
393 Some(&self.finishing),
394 self.config.features.persist_fast_path_limit,
395 self.config.persist_fast_path_order,
396 )? {
397 Some(plan) if !self.config.no_fast_path => {
398 if self.config.mode == OptimizeMode::Explain {
399 debug_span!(target: "optimizer", "fast_path").in_scope(|| {
401 let finishing = if !self.finishing.is_trivial(typ.arity()) {
403 Some(&self.finishing)
404 } else {
405 None
406 };
407 trace_plan(&plan.used_indexes(finishing));
408 });
409 }
410 trace_plan!(at: "fast_path", &plan);
412
413 trace_plan(&plan);
415
416 PeekPlan::FastPath(plan)
418 }
419 _ => {
420 soft_assert_or_log!(
421 !use_fast_path_optimizer || self.config.no_fast_path,
422 "The fast_path_optimizer shouldn't make a fast path plan slow path."
423 );
424
425 for build in df_desc.objects_to_build.iter_mut() {
427 normalize_lets(&mut build.plan.0, &self.config.features)?
428 }
429
430 let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
434
435 trace_plan(&df_desc);
437
438 PeekPlan::SlowPath(PeekDataflowPlan::new(df_desc, self.index_id(), &typ))
440 }
441 };
442
443 self.duration += time.elapsed();
444 let label = match &peek_plan {
445 PeekPlan::FastPath(_) => "peek:fast_path",
446 PeekPlan::SlowPath(_) => "peek:slow_path",
447 };
448 self.metrics
449 .observe_e2e_optimization_time(label, self.duration);
450
451 Ok(GlobalLirPlan {
452 peek_plan,
453 df_meta,
454 typ,
455 })
456 }
457}
458
459impl GlobalLirPlan {
460 pub fn unapply(self) -> (PeekPlan, DataflowMetainfo, SqlRelationType) {
462 (self.peek_plan, self.df_meta, self.typ)
463 }
464}