1use std::collections::BTreeSet;
29use std::sync::Arc;
30use std::time::{Duration, Instant};
31
32use mz_compute_types::plan::Plan;
33use mz_compute_types::sinks::{
34 ComputeSinkConnection, ComputeSinkDesc, MaterializedViewSinkConnection,
35};
36use mz_expr::{MirRelationExpr, OptimizedMirRelationExpr};
37use mz_repr::explain::trace_plan;
38use mz_repr::refresh_schedule::RefreshSchedule;
39use mz_repr::{ColumnName, GlobalId, RelationDesc};
40use mz_sql::optimizer_metrics::OptimizerMetrics;
41use mz_sql::plan::HirRelationExpr;
42use mz_transform::TransformCtx;
43use mz_transform::dataflow::DataflowMetainfo;
44use mz_transform::normalize_lets::normalize_lets;
45use mz_transform::typecheck::{SharedContext as TypecheckContext, empty_context};
46use timely::progress::Antichain;
47
48use crate::optimize::dataflows::{
49 ComputeInstanceSnapshot, DataflowBuilder, ExprPrepStyle, prep_relation_expr, prep_scalar_expr,
50};
51use crate::optimize::{
52 LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerCatalog,
53 OptimizerConfig, OptimizerError, optimize_mir_local, trace_plan,
54};
55
56pub struct Optimizer {
57 typecheck_ctx: TypecheckContext,
59 catalog: Arc<dyn OptimizerCatalog>,
61 compute_instance: ComputeInstanceSnapshot,
63 sink_id: GlobalId,
65 view_id: GlobalId,
67 column_names: Vec<ColumnName>,
69 non_null_assertions: Vec<usize>,
72 refresh_schedule: Option<RefreshSchedule>,
74 debug_name: String,
76 config: OptimizerConfig,
78 metrics: OptimizerMetrics,
80 duration: Duration,
82 force_source_non_monotonic: BTreeSet<GlobalId>,
100}
101
102impl Optimizer {
103 pub fn new(
104 catalog: Arc<dyn OptimizerCatalog>,
105 compute_instance: ComputeInstanceSnapshot,
106 sink_id: GlobalId,
107 view_id: GlobalId,
108 column_names: Vec<ColumnName>,
109 non_null_assertions: Vec<usize>,
110 refresh_schedule: Option<RefreshSchedule>,
111 debug_name: String,
112 config: OptimizerConfig,
113 metrics: OptimizerMetrics,
114 force_source_non_monotonic: BTreeSet<GlobalId>,
115 ) -> Self {
116 Self {
117 typecheck_ctx: empty_context(),
118 catalog,
119 compute_instance,
120 sink_id,
121 view_id,
122 column_names,
123 non_null_assertions,
124 refresh_schedule,
125 debug_name,
126 config,
127 metrics,
128 duration: Default::default(),
129 force_source_non_monotonic,
130 }
131 }
132}
133
134#[derive(Clone, Debug)]
137pub struct LocalMirPlan {
138 expr: MirRelationExpr,
139 df_meta: DataflowMetainfo,
140}
141
142#[derive(Clone, Debug)]
148pub struct GlobalMirPlan {
149 df_desc: MirDataflowDescription,
150 df_meta: DataflowMetainfo,
151}
152
153impl GlobalMirPlan {
154 pub fn df_desc(&self) -> &MirDataflowDescription {
155 &self.df_desc
156 }
157}
158
159#[derive(Clone, Debug)]
162pub struct GlobalLirPlan {
163 df_desc: LirDataflowDescription,
164 df_meta: DataflowMetainfo,
165}
166
167impl GlobalLirPlan {
168 pub fn df_desc(&self) -> &LirDataflowDescription {
169 &self.df_desc
170 }
171
172 pub fn df_meta(&self) -> &DataflowMetainfo {
173 &self.df_meta
174 }
175
176 pub fn desc(&self) -> &RelationDesc {
177 let sink_exports = &self.df_desc.sink_exports;
178 let sink = sink_exports.values().next().expect("valid sink");
179 &sink.from_desc
180 }
181}
182
183impl Optimize<HirRelationExpr> for Optimizer {
184 type To = LocalMirPlan;
185
186 fn optimize(&mut self, expr: HirRelationExpr) -> Result<Self::To, OptimizerError> {
187 let time = Instant::now();
188
189 trace_plan!(at: "raw", &expr);
191
192 let expr = expr.lower(&self.config, Some(&self.metrics))?;
194
195 let mut df_meta = DataflowMetainfo::default();
197 let mut transform_ctx = TransformCtx::local(
198 &self.config.features,
199 &self.typecheck_ctx,
200 &mut df_meta,
201 Some(&self.metrics),
202 Some(self.view_id),
203 );
204 let expr = optimize_mir_local(expr, &mut transform_ctx)?.into_inner();
205
206 self.duration += time.elapsed();
207
208 Ok(LocalMirPlan { expr, df_meta })
210 }
211}
212
213impl LocalMirPlan {
214 pub fn expr(&self) -> OptimizedMirRelationExpr {
215 OptimizedMirRelationExpr(self.expr.clone())
216 }
217}
218
219impl Optimize<OptimizedMirRelationExpr> for Optimizer {
222 type To = GlobalMirPlan;
223
224 fn optimize(&mut self, expr: OptimizedMirRelationExpr) -> Result<Self::To, OptimizerError> {
225 let expr = expr.into_inner();
226 let df_meta = DataflowMetainfo::default();
227 self.optimize(LocalMirPlan { expr, df_meta })
228 }
229}
230
231impl Optimize<LocalMirPlan> for Optimizer {
232 type To = GlobalMirPlan;
233
234 fn optimize(&mut self, plan: LocalMirPlan) -> Result<Self::To, OptimizerError> {
235 let time = Instant::now();
236
237 let expr = OptimizedMirRelationExpr(plan.expr);
238 let mut df_meta = plan.df_meta;
239
240 let mut rel_typ = expr.typ();
241 for &i in self.non_null_assertions.iter() {
242 rel_typ.column_types[i].nullable = false;
243 }
244 let rel_desc = RelationDesc::new(rel_typ, self.column_names.clone());
245
246 let mut df_builder = {
247 let compute = self.compute_instance.clone();
248 DataflowBuilder::new(&*self.catalog, compute).with_config(&self.config)
249 };
250 let mut df_desc = MirDataflowDescription::new(self.debug_name.clone());
251
252 df_desc.refresh_schedule.clone_from(&self.refresh_schedule);
253
254 df_builder.import_view_into_dataflow(
255 &self.view_id,
256 &expr,
257 &mut df_desc,
258 &self.config.features,
259 )?;
260 df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
261
262 let sink_description = ComputeSinkDesc {
263 from: self.view_id,
264 from_desc: rel_desc.clone(),
265 connection: ComputeSinkConnection::MaterializedView(MaterializedViewSinkConnection {
266 value_desc: rel_desc,
267 storage_metadata: (),
268 }),
269 with_snapshot: true,
270 up_to: Antichain::default(),
271 non_null_assertions: self.non_null_assertions.clone(),
272 refresh_schedule: self.refresh_schedule.clone(),
273 };
274 df_desc.export_sink(self.sink_id, sink_description);
275
276 let style = ExprPrepStyle::Maintained;
278 df_desc.visit_children(
279 |r| prep_relation_expr(r, style),
280 |s| prep_scalar_expr(s, style),
281 )?;
282
283 let mut transform_ctx = TransformCtx::global(
285 &df_builder,
286 &mz_transform::EmptyStatisticsOracle, &self.config.features,
288 &self.typecheck_ctx,
289 &mut df_meta,
290 Some(&self.metrics),
291 );
292 for id in self.force_source_non_monotonic.iter() {
294 if let Some((_desc, monotonic, _upper)) = df_desc.source_imports.get_mut(id) {
295 *monotonic = false;
296 }
297 }
298 mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
300
301 if self.config.mode == OptimizeMode::Explain {
302 trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
304 }
305
306 self.duration += time.elapsed();
307
308 Ok(GlobalMirPlan { df_desc, df_meta })
310 }
311}
312
313impl Optimize<GlobalMirPlan> for Optimizer {
314 type To = GlobalLirPlan;
315
316 fn optimize(&mut self, plan: GlobalMirPlan) -> Result<Self::To, OptimizerError> {
317 let time = Instant::now();
318
319 let GlobalMirPlan {
320 mut df_desc,
321 df_meta,
322 } = plan;
323
324 for build in df_desc.objects_to_build.iter_mut() {
326 normalize_lets(&mut build.plan.0, &self.config.features)?
327 }
328
329 let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
333
334 trace_plan(&df_desc);
336
337 self.duration += time.elapsed();
338 self.metrics
339 .observe_e2e_optimization_time("materialized_view", self.duration);
340
341 Ok(GlobalLirPlan { df_desc, df_meta })
343 }
344}
345
346impl GlobalLirPlan {
347 pub fn unapply(self) -> (LirDataflowDescription, DataflowMetainfo) {
349 (self.df_desc, self.df_meta)
350 }
351}