1use std::fmt::Debug;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use mz_compute_types::ComputeInstanceId;
17use mz_compute_types::dataflows::IndexDesc;
18use mz_compute_types::plan::Plan;
19use mz_expr::{MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr, RowSetFinishing};
20use mz_ore::soft_assert_or_log;
21use mz_repr::explain::trace_plan;
22use mz_repr::{GlobalId, RelationType, Timestamp};
23use mz_sql::optimizer_metrics::OptimizerMetrics;
24use mz_sql::plan::HirRelationExpr;
25use mz_sql::session::metadata::SessionMetadata;
26use mz_transform::dataflow::DataflowMetainfo;
27use mz_transform::normalize_lets::normalize_lets;
28use mz_transform::typecheck::{SharedContext as TypecheckContext, empty_context};
29use mz_transform::{StatisticsOracle, TransformCtx};
30use timely::progress::Antichain;
31use tracing::debug_span;
32
33use crate::TimestampContext;
34use crate::catalog::Catalog;
35use crate::coord::peek::{PeekDataflowPlan, PeekPlan, create_fast_path_plan};
36use crate::optimize::dataflows::{
37 ComputeInstanceSnapshot, DataflowBuilder, EvalTime, ExprPrepStyle, prep_relation_expr,
38 prep_scalar_expr,
39};
40use crate::optimize::{
41 MirDataflowDescription, Optimize, OptimizeMode, OptimizerConfig, OptimizerError,
42 optimize_mir_local, trace_plan,
43};
44
45pub struct Optimizer {
46 typecheck_ctx: TypecheckContext,
48 catalog: Arc<Catalog>,
50 compute_instance: ComputeInstanceSnapshot,
52 finishing: RowSetFinishing,
54 select_id: GlobalId,
56 index_id: GlobalId,
58 config: OptimizerConfig,
60 metrics: OptimizerMetrics,
62 duration: Duration,
64}
65
66impl Optimizer {
67 pub fn new(
68 catalog: Arc<Catalog>,
69 compute_instance: ComputeInstanceSnapshot,
70 finishing: RowSetFinishing,
71 select_id: GlobalId,
72 index_id: GlobalId,
73 config: OptimizerConfig,
74 metrics: OptimizerMetrics,
75 ) -> Self {
76 Self {
77 typecheck_ctx: empty_context(),
78 catalog,
79 compute_instance,
80 finishing,
81 select_id,
82 index_id,
83 config,
84 metrics,
85 duration: Default::default(),
86 }
87 }
88
89 pub fn cluster_id(&self) -> ComputeInstanceId {
90 self.compute_instance.instance_id()
91 }
92
93 pub fn finishing(&self) -> &RowSetFinishing {
94 &self.finishing
95 }
96
97 pub fn select_id(&self) -> GlobalId {
98 self.select_id
99 }
100
101 pub fn index_id(&self) -> GlobalId {
102 self.index_id
103 }
104
105 pub fn config(&self) -> &OptimizerConfig {
106 &self.config
107 }
108
109 pub fn metrics(&self) -> &OptimizerMetrics {
110 &self.metrics
111 }
112
113 pub fn duration(&self) -> Duration {
114 self.duration
115 }
116}
117
118impl Debug for Optimizer {
124 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125 f.debug_struct("OptimizePeek")
126 .field("config", &self.config)
127 .finish_non_exhaustive()
128 }
129}
130
131pub struct Unresolved;
134
135#[derive(Clone)]
138pub struct LocalMirPlan<T = Unresolved> {
139 expr: MirRelationExpr,
140 df_meta: DataflowMetainfo,
141 context: T,
142}
143
144pub struct Resolved<'s> {
147 timestamp_ctx: TimestampContext<Timestamp>,
148 stats: Box<dyn StatisticsOracle>,
149 session: &'s dyn SessionMetadata,
150}
151
152#[derive(Debug)]
161pub struct GlobalLirPlan {
162 peek_plan: PeekPlan,
163 df_meta: DataflowMetainfo,
164 typ: RelationType,
165}
166
167impl Optimize<HirRelationExpr> for Optimizer {
168 type To = LocalMirPlan;
169
170 fn optimize(&mut self, expr: HirRelationExpr) -> Result<Self::To, OptimizerError> {
171 let time = Instant::now();
172
173 trace_plan!(at: "raw", &expr);
175
176 let expr = expr.lower(&self.config, Some(&self.metrics))?;
178
179 let mut df_meta = DataflowMetainfo::default();
181 let mut transform_ctx = TransformCtx::local(
182 &self.config.features,
183 &self.typecheck_ctx,
184 &mut df_meta,
185 Some(&self.metrics),
186 Some(self.select_id),
187 );
188 let expr = optimize_mir_local(expr, &mut transform_ctx)?.into_inner();
189
190 self.duration += time.elapsed();
191
192 Ok(LocalMirPlan {
194 expr,
195 df_meta,
196 context: Unresolved,
197 })
198 }
199}
200
201impl LocalMirPlan<Unresolved> {
202 pub fn resolve(
205 self,
206 timestamp_ctx: TimestampContext<Timestamp>,
207 session: &dyn SessionMetadata,
208 stats: Box<dyn StatisticsOracle>,
209 ) -> LocalMirPlan<Resolved<'_>> {
210 LocalMirPlan {
211 expr: self.expr,
212 df_meta: self.df_meta,
213 context: Resolved {
214 timestamp_ctx,
215 session,
216 stats,
217 },
218 }
219 }
220}
221
222impl<'s> Optimize<LocalMirPlan<Resolved<'s>>> for Optimizer {
223 type To = GlobalLirPlan;
224
225 fn optimize(&mut self, plan: LocalMirPlan<Resolved<'s>>) -> Result<Self::To, OptimizerError> {
226 let time = Instant::now();
227
228 let LocalMirPlan {
229 expr,
230 mut df_meta,
231 context:
232 Resolved {
233 timestamp_ctx,
234 stats,
235 session,
236 },
237 } = plan;
238
239 let expr = OptimizedMirRelationExpr(expr);
240
241 let typ = expr.typ();
245 let key = typ
246 .default_key()
247 .iter()
248 .map(|k| MirScalarExpr::column(*k))
249 .collect();
250
251 let mut df_builder = {
253 let catalog = self.catalog.state();
254 let compute = self.compute_instance.clone();
255 DataflowBuilder::new(catalog, compute).with_config(&self.config)
256 };
257
258 let debug_name = format!("oneshot-select-{}", self.select_id);
259 let mut df_desc = MirDataflowDescription::new(debug_name.to_string());
260
261 df_builder.import_view_into_dataflow(
262 &self.select_id,
263 &expr,
264 &mut df_desc,
265 &self.config.features,
266 )?;
267 df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
268
269 let style = ExprPrepStyle::OneShot {
272 logical_time: EvalTime::Deferred,
273 session,
274 catalog_state: self.catalog.state(),
275 };
276 df_desc.visit_children(
277 |r| prep_relation_expr(r, style),
278 |s| prep_scalar_expr(s, style),
279 )?;
280
281 if self.config.mode != OptimizeMode::Explain {
285 df_desc.export_index(
286 self.index_id,
287 IndexDesc {
288 on_id: self.select_id,
289 key,
290 },
291 typ.clone(),
292 );
293 }
294
295 df_desc.set_as_of(timestamp_ctx.antichain());
297
298 if let Some(until) = timestamp_ctx
308 .timestamp()
309 .and_then(Timestamp::try_step_forward)
310 {
311 df_desc.until = Antichain::from_elem(until);
312 }
313
314 let mut transform_ctx = TransformCtx::global(
316 &df_builder,
317 &*stats,
318 &self.config.features,
319 &self.typecheck_ctx,
320 &mut df_meta,
321 Some(&self.metrics),
322 );
323
324 let use_fast_path_optimizer = match create_fast_path_plan(
329 &mut df_desc,
330 self.select_id,
331 Some(&self.finishing),
332 self.config.features.persist_fast_path_limit,
333 self.config.persist_fast_path_order,
334 ) {
335 Ok(maybe_fast_path_plan) => maybe_fast_path_plan.is_some(),
336 Err(OptimizerError::InternalUnsafeMfpPlan(_)) => {
337 false
340 }
341 Err(e) => {
342 return Err(e);
343 }
344 };
345
346 mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, use_fast_path_optimizer)?;
348
349 if self.config.mode == OptimizeMode::Explain {
350 trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
352 }
353
354 let as_of = df_desc
356 .as_of
357 .clone()
358 .expect("as_of antichain")
359 .into_option()
360 .expect("unique as_of element");
361
362 let style = ExprPrepStyle::OneShot {
364 logical_time: EvalTime::Time(as_of),
365 session,
366 catalog_state: self.catalog.state(),
367 };
368 df_desc.visit_children(
369 |r| prep_relation_expr(r, style),
370 |s| prep_scalar_expr(s, style),
371 )?;
372
373 let peek_plan = match create_fast_path_plan(
383 &mut df_desc,
384 self.select_id,
385 Some(&self.finishing),
386 self.config.features.persist_fast_path_limit,
387 self.config.persist_fast_path_order,
388 )? {
389 Some(plan) if !self.config.no_fast_path => {
390 if self.config.mode == OptimizeMode::Explain {
391 debug_span!(target: "optimizer", "fast_path").in_scope(|| {
393 let finishing = if !self.finishing.is_trivial(typ.arity()) {
395 Some(&self.finishing)
396 } else {
397 None
398 };
399 trace_plan(&plan.used_indexes(finishing));
400 });
401 }
402 trace_plan!(at: "fast_path", &plan);
404
405 trace_plan(&plan);
407
408 PeekPlan::FastPath(plan)
410 }
411 _ => {
412 soft_assert_or_log!(
413 !use_fast_path_optimizer || self.config.no_fast_path,
414 "The fast_path_optimizer shouldn't make a fast path plan slow path."
415 );
416
417 for build in df_desc.objects_to_build.iter_mut() {
419 normalize_lets(&mut build.plan.0, &self.config.features)?
420 }
421
422 let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
426
427 trace_plan(&df_desc);
429
430 PeekPlan::SlowPath(PeekDataflowPlan::new(df_desc, self.index_id(), &typ))
432 }
433 };
434
435 self.duration += time.elapsed();
436 let label = match &peek_plan {
437 PeekPlan::FastPath(_) => "peek:fast_path",
438 PeekPlan::SlowPath(_) => "peek:slow_path",
439 };
440 self.metrics
441 .observe_e2e_optimization_time(label, self.duration);
442
443 Ok(GlobalLirPlan {
444 peek_plan,
445 df_meta,
446 typ,
447 })
448 }
449}
450
451impl GlobalLirPlan {
452 pub fn unapply(self) -> (PeekPlan, DataflowMetainfo, RelationType) {
454 (self.peek_plan, self.df_meta, self.typ)
455 }
456}