1use std::sync::Arc;
29use std::time::{Duration, Instant};
30
31use mz_compute_types::plan::Plan;
32use mz_compute_types::sinks::{
33 ComputeSinkConnection, ComputeSinkDesc, MaterializedViewSinkConnection,
34};
35use mz_expr::{MirRelationExpr, OptimizedMirRelationExpr};
36use mz_repr::explain::trace_plan;
37use mz_repr::refresh_schedule::RefreshSchedule;
38use mz_repr::{ColumnName, GlobalId, RelationDesc, SqlRelationType};
39use mz_sql::optimizer_metrics::OptimizerMetrics;
40use mz_sql::plan::HirRelationExpr;
41use mz_transform::TransformCtx;
42use mz_transform::dataflow::DataflowMetainfo;
43use mz_transform::normalize_lets::normalize_lets;
44use mz_transform::typecheck::{SharedTypecheckingContext, empty_typechecking_context};
45use timely::progress::Antichain;
46
47use crate::coord::infer_sql_type_for_catalog;
48use crate::optimize::dataflows::{
49 ComputeInstanceSnapshot, DataflowBuilder, ExprPrep, ExprPrepMaintained,
50};
51use crate::optimize::{
52 LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerCatalog,
53 OptimizerConfig, OptimizerError, optimize_mir_local, trace_plan,
54};
55
56pub struct Optimizer {
57 typecheck_ctx: SharedTypecheckingContext,
59 catalog: Arc<dyn OptimizerCatalog>,
61 compute_instance: ComputeInstanceSnapshot,
63 sink_id: GlobalId,
65 view_id: GlobalId,
67 column_names: Vec<ColumnName>,
69 non_null_assertions: Vec<usize>,
72 refresh_schedule: Option<RefreshSchedule>,
74 debug_name: String,
76 config: OptimizerConfig,
78 metrics: OptimizerMetrics,
80 duration: Duration,
82}
83
84impl Optimizer {
85 pub fn new(
86 catalog: Arc<dyn OptimizerCatalog>,
87 compute_instance: ComputeInstanceSnapshot,
88 sink_id: GlobalId,
89 view_id: GlobalId,
90 column_names: Vec<ColumnName>,
91 non_null_assertions: Vec<usize>,
92 refresh_schedule: Option<RefreshSchedule>,
93 debug_name: String,
94 config: OptimizerConfig,
95 metrics: OptimizerMetrics,
96 ) -> Self {
97 Self {
98 typecheck_ctx: empty_typechecking_context(),
99 catalog,
100 compute_instance,
101 sink_id,
102 view_id,
103 column_names,
104 non_null_assertions,
105 refresh_schedule,
106 debug_name,
107 config,
108 metrics,
109 duration: Default::default(),
110 }
111 }
112}
113
114#[derive(Clone, Debug)]
117pub struct LocalMirPlan {
118 expr: MirRelationExpr,
119 df_meta: DataflowMetainfo,
120 typ: SqlRelationType,
121}
122
123#[derive(Clone, Debug)]
129pub struct GlobalMirPlan {
130 df_desc: MirDataflowDescription,
131 df_meta: DataflowMetainfo,
132}
133
134impl GlobalMirPlan {
135 pub fn df_desc(&self) -> &MirDataflowDescription {
136 &self.df_desc
137 }
138}
139
140#[derive(Clone, Debug)]
143pub struct GlobalLirPlan {
144 df_desc: LirDataflowDescription,
145 df_meta: DataflowMetainfo,
146}
147
148impl GlobalLirPlan {
149 pub fn df_desc(&self) -> &LirDataflowDescription {
150 &self.df_desc
151 }
152
153 pub fn df_meta(&self) -> &DataflowMetainfo {
154 &self.df_meta
155 }
156
157 pub fn desc(&self) -> &RelationDesc {
158 let sink_exports = &self.df_desc.sink_exports;
159 let sink = sink_exports.values().next().expect("valid sink");
160 &sink.from_desc
161 }
162}
163
164impl Optimize<HirRelationExpr> for Optimizer {
165 type To = LocalMirPlan;
166
167 fn optimize(&mut self, expr: HirRelationExpr) -> Result<Self::To, OptimizerError> {
168 let time = Instant::now();
169
170 trace_plan!(at: "raw", &expr);
172
173 let mir_expr = expr.clone().lower(&self.config, Some(&self.metrics))?;
175
176 let mut df_meta = DataflowMetainfo::default();
178 let mut transform_ctx = TransformCtx::local(
179 &self.config.features,
180 &self.typecheck_ctx,
181 &mut df_meta,
182 Some(&mut self.metrics),
183 Some(self.view_id),
184 );
185 let mir_expr = optimize_mir_local(mir_expr, &mut transform_ctx)?.into_inner();
186 let typ = infer_sql_type_for_catalog(&expr, &mir_expr);
187
188 self.duration += time.elapsed();
189
190 Ok(LocalMirPlan {
192 expr: mir_expr,
193 df_meta,
194 typ,
195 })
196 }
197}
198
199impl LocalMirPlan {
200 pub fn expr(&self) -> OptimizedMirRelationExpr {
201 OptimizedMirRelationExpr(self.expr.clone())
202 }
203}
204
205impl Optimize<(OptimizedMirRelationExpr, SqlRelationType)> for Optimizer {
208 type To = GlobalMirPlan;
209
210 fn optimize(
211 &mut self,
212 (expr, typ): (OptimizedMirRelationExpr, SqlRelationType),
213 ) -> Result<Self::To, OptimizerError> {
214 let expr = expr.into_inner();
215 let df_meta = DataflowMetainfo::default();
216 self.optimize(LocalMirPlan { expr, df_meta, typ })
217 }
218}
219
220impl Optimize<LocalMirPlan> for Optimizer {
221 type To = GlobalMirPlan;
222
223 fn optimize(&mut self, plan: LocalMirPlan) -> Result<Self::To, OptimizerError> {
224 let time = Instant::now();
225
226 let expr = OptimizedMirRelationExpr(plan.expr);
227 let mut df_meta = plan.df_meta;
228
229 let mut rel_typ = plan.typ;
230 for &i in self.non_null_assertions.iter() {
231 rel_typ.column_types[i].nullable = false;
232 }
233 let rel_desc = RelationDesc::new(rel_typ, self.column_names.clone());
234
235 let mut df_builder = {
236 let compute = self.compute_instance.clone();
237 DataflowBuilder::new(&*self.catalog, compute).with_config(&self.config)
238 };
239 let mut df_desc = MirDataflowDescription::new(self.debug_name.clone());
240
241 df_desc.refresh_schedule.clone_from(&self.refresh_schedule);
242
243 df_builder.import_view_into_dataflow(
244 &self.view_id,
245 &expr,
246 &mut df_desc,
247 &self.config.features,
248 )?;
249 df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
250
251 let sink_description = ComputeSinkDesc {
252 from: self.view_id,
253 from_desc: rel_desc.clone(),
254 connection: ComputeSinkConnection::MaterializedView(MaterializedViewSinkConnection {
255 value_desc: rel_desc,
256 storage_metadata: (),
257 }),
258 with_snapshot: true,
259 up_to: Antichain::default(),
260 non_null_assertions: self.non_null_assertions.clone(),
261 refresh_schedule: self.refresh_schedule.clone(),
262 };
263 df_desc.export_sink(self.sink_id, sink_description);
264
265 let style = ExprPrepMaintained;
267 df_desc.visit_children(
268 |r| style.prep_relation_expr(r),
269 |s| style.prep_scalar_expr(s),
270 )?;
271
272 let mut transform_ctx = TransformCtx::global(
274 &df_builder,
275 &mz_transform::EmptyStatisticsOracle, &self.config.features,
277 &self.typecheck_ctx,
278 &mut df_meta,
279 Some(&mut self.metrics),
280 );
281 mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
283
284 if self.config.mode == OptimizeMode::Explain {
285 trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
287 }
288
289 self.duration += time.elapsed();
290
291 Ok(GlobalMirPlan { df_desc, df_meta })
293 }
294}
295
296impl Optimize<GlobalMirPlan> for Optimizer {
297 type To = GlobalLirPlan;
298
299 fn optimize(&mut self, plan: GlobalMirPlan) -> Result<Self::To, OptimizerError> {
300 let time = Instant::now();
301
302 let GlobalMirPlan {
303 mut df_desc,
304 df_meta,
305 } = plan;
306
307 for build in df_desc.objects_to_build.iter_mut() {
309 normalize_lets(&mut build.plan.0, &self.config.features)?
310 }
311
312 let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
316
317 trace_plan(&df_desc);
319
320 self.duration += time.elapsed();
321 self.metrics
322 .observe_e2e_optimization_time("materialized_view", self.duration);
323
324 Ok(GlobalLirPlan { df_desc, df_meta })
326 }
327}
328
329impl GlobalLirPlan {
330 pub fn unapply(self) -> (LirDataflowDescription, DataflowMetainfo) {
332 (self.df_desc, self.df_meta)
333 }
334}