1use std::collections::BTreeSet;
29use std::sync::Arc;
30use std::time::{Duration, Instant};
31
32use mz_compute_types::plan::Plan;
33use mz_compute_types::sinks::{
34 ComputeSinkConnection, ComputeSinkDesc, MaterializedViewSinkConnection,
35};
36use mz_expr::{MirRelationExpr, OptimizedMirRelationExpr};
37use mz_repr::explain::trace_plan;
38use mz_repr::refresh_schedule::RefreshSchedule;
39use mz_repr::{ColumnName, GlobalId, RelationDesc, SqlRelationType};
40use mz_sql::optimizer_metrics::OptimizerMetrics;
41use mz_sql::plan::HirRelationExpr;
42use mz_transform::TransformCtx;
43use mz_transform::dataflow::DataflowMetainfo;
44use mz_transform::normalize_lets::normalize_lets;
45use mz_transform::typecheck::{SharedTypecheckingContext, empty_typechecking_context};
46use timely::progress::Antichain;
47
48use crate::coord::infer_sql_type_for_catalog;
49use crate::optimize::dataflows::{
50 ComputeInstanceSnapshot, DataflowBuilder, ExprPrep, ExprPrepMaintained,
51};
52use crate::optimize::{
53 LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerCatalog,
54 OptimizerConfig, OptimizerError, optimize_mir_local, trace_plan,
55};
56
57pub struct Optimizer {
58 typecheck_ctx: SharedTypecheckingContext,
60 catalog: Arc<dyn OptimizerCatalog>,
62 compute_instance: ComputeInstanceSnapshot,
64 sink_id: GlobalId,
66 view_id: GlobalId,
68 column_names: Vec<ColumnName>,
70 non_null_assertions: Vec<usize>,
73 refresh_schedule: Option<RefreshSchedule>,
75 debug_name: String,
77 config: OptimizerConfig,
79 metrics: OptimizerMetrics,
81 duration: Duration,
83 force_source_non_monotonic: BTreeSet<GlobalId>,
101}
102
103impl Optimizer {
104 pub fn new(
105 catalog: Arc<dyn OptimizerCatalog>,
106 compute_instance: ComputeInstanceSnapshot,
107 sink_id: GlobalId,
108 view_id: GlobalId,
109 column_names: Vec<ColumnName>,
110 non_null_assertions: Vec<usize>,
111 refresh_schedule: Option<RefreshSchedule>,
112 debug_name: String,
113 config: OptimizerConfig,
114 metrics: OptimizerMetrics,
115 force_source_non_monotonic: BTreeSet<GlobalId>,
116 ) -> Self {
117 Self {
118 typecheck_ctx: empty_typechecking_context(),
119 catalog,
120 compute_instance,
121 sink_id,
122 view_id,
123 column_names,
124 non_null_assertions,
125 refresh_schedule,
126 debug_name,
127 config,
128 metrics,
129 duration: Default::default(),
130 force_source_non_monotonic,
131 }
132 }
133}
134
135#[derive(Clone, Debug)]
138pub struct LocalMirPlan {
139 expr: MirRelationExpr,
140 df_meta: DataflowMetainfo,
141 typ: SqlRelationType,
142}
143
144#[derive(Clone, Debug)]
150pub struct GlobalMirPlan {
151 df_desc: MirDataflowDescription,
152 df_meta: DataflowMetainfo,
153}
154
155impl GlobalMirPlan {
156 pub fn df_desc(&self) -> &MirDataflowDescription {
157 &self.df_desc
158 }
159}
160
161#[derive(Clone, Debug)]
164pub struct GlobalLirPlan {
165 df_desc: LirDataflowDescription,
166 df_meta: DataflowMetainfo,
167}
168
169impl GlobalLirPlan {
170 pub fn df_desc(&self) -> &LirDataflowDescription {
171 &self.df_desc
172 }
173
174 pub fn df_meta(&self) -> &DataflowMetainfo {
175 &self.df_meta
176 }
177
178 pub fn desc(&self) -> &RelationDesc {
179 let sink_exports = &self.df_desc.sink_exports;
180 let sink = sink_exports.values().next().expect("valid sink");
181 &sink.from_desc
182 }
183}
184
185impl Optimize<HirRelationExpr> for Optimizer {
186 type To = LocalMirPlan;
187
188 fn optimize(&mut self, expr: HirRelationExpr) -> Result<Self::To, OptimizerError> {
189 let time = Instant::now();
190
191 trace_plan!(at: "raw", &expr);
193
194 let mir_expr = expr.clone().lower(&self.config, Some(&self.metrics))?;
196
197 let mut df_meta = DataflowMetainfo::default();
199 let mut transform_ctx = TransformCtx::local(
200 &self.config.features,
201 &self.typecheck_ctx,
202 &mut df_meta,
203 Some(&mut self.metrics),
204 Some(self.view_id),
205 );
206 let mir_expr = optimize_mir_local(mir_expr, &mut transform_ctx)?.into_inner();
207 let typ = infer_sql_type_for_catalog(&expr, &mir_expr);
208
209 self.duration += time.elapsed();
210
211 Ok(LocalMirPlan {
213 expr: mir_expr,
214 df_meta,
215 typ,
216 })
217 }
218}
219
220impl LocalMirPlan {
221 pub fn expr(&self) -> OptimizedMirRelationExpr {
222 OptimizedMirRelationExpr(self.expr.clone())
223 }
224}
225
226impl Optimize<(OptimizedMirRelationExpr, SqlRelationType)> for Optimizer {
229 type To = GlobalMirPlan;
230
231 fn optimize(
232 &mut self,
233 (expr, typ): (OptimizedMirRelationExpr, SqlRelationType),
234 ) -> Result<Self::To, OptimizerError> {
235 let expr = expr.into_inner();
236 let df_meta = DataflowMetainfo::default();
237 self.optimize(LocalMirPlan { expr, df_meta, typ })
238 }
239}
240
241impl Optimize<LocalMirPlan> for Optimizer {
242 type To = GlobalMirPlan;
243
244 fn optimize(&mut self, plan: LocalMirPlan) -> Result<Self::To, OptimizerError> {
245 let time = Instant::now();
246
247 let expr = OptimizedMirRelationExpr(plan.expr);
248 let mut df_meta = plan.df_meta;
249
250 let mut rel_typ = plan.typ;
251 for &i in self.non_null_assertions.iter() {
252 rel_typ.column_types[i].nullable = false;
253 }
254 let rel_desc = RelationDesc::new(rel_typ, self.column_names.clone());
255
256 let mut df_builder = {
257 let compute = self.compute_instance.clone();
258 DataflowBuilder::new(&*self.catalog, compute).with_config(&self.config)
259 };
260 let mut df_desc = MirDataflowDescription::new(self.debug_name.clone());
261
262 df_desc.refresh_schedule.clone_from(&self.refresh_schedule);
263
264 df_builder.import_view_into_dataflow(
265 &self.view_id,
266 &expr,
267 &mut df_desc,
268 &self.config.features,
269 )?;
270 df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
271
272 let sink_description = ComputeSinkDesc {
273 from: self.view_id,
274 from_desc: rel_desc.clone(),
275 connection: ComputeSinkConnection::MaterializedView(MaterializedViewSinkConnection {
276 value_desc: rel_desc,
277 storage_metadata: (),
278 }),
279 with_snapshot: true,
280 up_to: Antichain::default(),
281 non_null_assertions: self.non_null_assertions.clone(),
282 refresh_schedule: self.refresh_schedule.clone(),
283 };
284 df_desc.export_sink(self.sink_id, sink_description);
285
286 let style = ExprPrepMaintained;
288 df_desc.visit_children(
289 |r| style.prep_relation_expr(r),
290 |s| style.prep_scalar_expr(s),
291 )?;
292
293 let mut transform_ctx = TransformCtx::global(
295 &df_builder,
296 &mz_transform::EmptyStatisticsOracle, &self.config.features,
298 &self.typecheck_ctx,
299 &mut df_meta,
300 Some(&mut self.metrics),
301 );
302 for id in self.force_source_non_monotonic.iter() {
304 if let Some(import) = df_desc.source_imports.get_mut(id) {
305 import.monotonic = false;
306 }
307 }
308 mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
310
311 if self.config.mode == OptimizeMode::Explain {
312 trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
314 }
315
316 self.duration += time.elapsed();
317
318 Ok(GlobalMirPlan { df_desc, df_meta })
320 }
321}
322
323impl Optimize<GlobalMirPlan> for Optimizer {
324 type To = GlobalLirPlan;
325
326 fn optimize(&mut self, plan: GlobalMirPlan) -> Result<Self::To, OptimizerError> {
327 let time = Instant::now();
328
329 let GlobalMirPlan {
330 mut df_desc,
331 df_meta,
332 } = plan;
333
334 for build in df_desc.objects_to_build.iter_mut() {
336 normalize_lets(&mut build.plan.0, &self.config.features)?
337 }
338
339 let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
343
344 trace_plan(&df_desc);
346
347 self.duration += time.elapsed();
348 self.metrics
349 .observe_e2e_optimization_time("materialized_view", self.duration);
350
351 Ok(GlobalLirPlan { df_desc, df_meta })
353 }
354}
355
356impl GlobalLirPlan {
357 pub fn unapply(self) -> (LirDataflowDescription, DataflowMetainfo) {
359 (self.df_desc, self.df_meta)
360 }
361}