1use std::collections::BTreeSet;
29use std::sync::Arc;
30use std::time::{Duration, Instant};
31
32use mz_compute_types::plan::Plan;
33use mz_compute_types::sinks::{
34 ComputeSinkConnection, ComputeSinkDesc, MaterializedViewSinkConnection,
35};
36use mz_expr::{MirRelationExpr, OptimizedMirRelationExpr};
37use mz_repr::explain::trace_plan;
38use mz_repr::refresh_schedule::RefreshSchedule;
39use mz_repr::{ColumnName, GlobalId, RelationDesc};
40use mz_sql::optimizer_metrics::OptimizerMetrics;
41use mz_sql::plan::HirRelationExpr;
42use mz_transform::TransformCtx;
43use mz_transform::dataflow::DataflowMetainfo;
44use mz_transform::normalize_lets::normalize_lets;
45use mz_transform::reprtypecheck::{
46 SharedContext as ReprTypecheckContext, empty_context as empty_repr_context,
47};
48use mz_transform::typecheck::{SharedContext as TypecheckContext, empty_context};
49use timely::progress::Antichain;
50
51use crate::optimize::dataflows::{
52 ComputeInstanceSnapshot, DataflowBuilder, ExprPrepStyle, prep_relation_expr, prep_scalar_expr,
53};
54use crate::optimize::{
55 LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerCatalog,
56 OptimizerConfig, OptimizerError, optimize_mir_local, trace_plan,
57};
58
59pub struct Optimizer {
60 typecheck_ctx: TypecheckContext,
62 repr_typecheck_ctx: ReprTypecheckContext,
64 catalog: Arc<dyn OptimizerCatalog>,
66 compute_instance: ComputeInstanceSnapshot,
68 sink_id: GlobalId,
70 view_id: GlobalId,
72 column_names: Vec<ColumnName>,
74 non_null_assertions: Vec<usize>,
77 refresh_schedule: Option<RefreshSchedule>,
79 debug_name: String,
81 config: OptimizerConfig,
83 metrics: OptimizerMetrics,
85 duration: Duration,
87 force_source_non_monotonic: BTreeSet<GlobalId>,
105}
106
107impl Optimizer {
108 pub fn new(
109 catalog: Arc<dyn OptimizerCatalog>,
110 compute_instance: ComputeInstanceSnapshot,
111 sink_id: GlobalId,
112 view_id: GlobalId,
113 column_names: Vec<ColumnName>,
114 non_null_assertions: Vec<usize>,
115 refresh_schedule: Option<RefreshSchedule>,
116 debug_name: String,
117 config: OptimizerConfig,
118 metrics: OptimizerMetrics,
119 force_source_non_monotonic: BTreeSet<GlobalId>,
120 ) -> Self {
121 Self {
122 typecheck_ctx: empty_context(),
123 repr_typecheck_ctx: empty_repr_context(),
124 catalog,
125 compute_instance,
126 sink_id,
127 view_id,
128 column_names,
129 non_null_assertions,
130 refresh_schedule,
131 debug_name,
132 config,
133 metrics,
134 duration: Default::default(),
135 force_source_non_monotonic,
136 }
137 }
138}
139
140#[derive(Clone, Debug)]
143pub struct LocalMirPlan {
144 expr: MirRelationExpr,
145 df_meta: DataflowMetainfo,
146}
147
148#[derive(Clone, Debug)]
154pub struct GlobalMirPlan {
155 df_desc: MirDataflowDescription,
156 df_meta: DataflowMetainfo,
157}
158
159impl GlobalMirPlan {
160 pub fn df_desc(&self) -> &MirDataflowDescription {
161 &self.df_desc
162 }
163}
164
165#[derive(Clone, Debug)]
168pub struct GlobalLirPlan {
169 df_desc: LirDataflowDescription,
170 df_meta: DataflowMetainfo,
171}
172
173impl GlobalLirPlan {
174 pub fn df_desc(&self) -> &LirDataflowDescription {
175 &self.df_desc
176 }
177
178 pub fn df_meta(&self) -> &DataflowMetainfo {
179 &self.df_meta
180 }
181
182 pub fn desc(&self) -> &RelationDesc {
183 let sink_exports = &self.df_desc.sink_exports;
184 let sink = sink_exports.values().next().expect("valid sink");
185 &sink.from_desc
186 }
187}
188
189impl Optimize<HirRelationExpr> for Optimizer {
190 type To = LocalMirPlan;
191
192 fn optimize(&mut self, expr: HirRelationExpr) -> Result<Self::To, OptimizerError> {
193 let time = Instant::now();
194
195 trace_plan!(at: "raw", &expr);
197
198 let expr = expr.lower(&self.config, Some(&self.metrics))?;
200
201 let mut df_meta = DataflowMetainfo::default();
203 let mut transform_ctx = TransformCtx::local(
204 &self.config.features,
205 &self.typecheck_ctx,
206 &self.repr_typecheck_ctx,
207 &mut df_meta,
208 Some(&self.metrics),
209 Some(self.view_id),
210 );
211 let expr = optimize_mir_local(expr, &mut transform_ctx)?.into_inner();
212
213 self.duration += time.elapsed();
214
215 Ok(LocalMirPlan { expr, df_meta })
217 }
218}
219
220impl LocalMirPlan {
221 pub fn expr(&self) -> OptimizedMirRelationExpr {
222 OptimizedMirRelationExpr(self.expr.clone())
223 }
224}
225
226impl Optimize<OptimizedMirRelationExpr> for Optimizer {
229 type To = GlobalMirPlan;
230
231 fn optimize(&mut self, expr: OptimizedMirRelationExpr) -> Result<Self::To, OptimizerError> {
232 let expr = expr.into_inner();
233 let df_meta = DataflowMetainfo::default();
234 self.optimize(LocalMirPlan { expr, df_meta })
235 }
236}
237
238impl Optimize<LocalMirPlan> for Optimizer {
239 type To = GlobalMirPlan;
240
241 fn optimize(&mut self, plan: LocalMirPlan) -> Result<Self::To, OptimizerError> {
242 let time = Instant::now();
243
244 let expr = OptimizedMirRelationExpr(plan.expr);
245 let mut df_meta = plan.df_meta;
246
247 let mut rel_typ = expr.typ();
248 for &i in self.non_null_assertions.iter() {
249 rel_typ.column_types[i].nullable = false;
250 }
251 let rel_desc = RelationDesc::new(rel_typ, self.column_names.clone());
252
253 let mut df_builder = {
254 let compute = self.compute_instance.clone();
255 DataflowBuilder::new(&*self.catalog, compute).with_config(&self.config)
256 };
257 let mut df_desc = MirDataflowDescription::new(self.debug_name.clone());
258
259 df_desc.refresh_schedule.clone_from(&self.refresh_schedule);
260
261 df_builder.import_view_into_dataflow(
262 &self.view_id,
263 &expr,
264 &mut df_desc,
265 &self.config.features,
266 )?;
267 df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
268
269 let sink_description = ComputeSinkDesc {
270 from: self.view_id,
271 from_desc: rel_desc.clone(),
272 connection: ComputeSinkConnection::MaterializedView(MaterializedViewSinkConnection {
273 value_desc: rel_desc,
274 storage_metadata: (),
275 }),
276 with_snapshot: true,
277 up_to: Antichain::default(),
278 non_null_assertions: self.non_null_assertions.clone(),
279 refresh_schedule: self.refresh_schedule.clone(),
280 };
281 df_desc.export_sink(self.sink_id, sink_description);
282
283 let style = ExprPrepStyle::Maintained;
285 df_desc.visit_children(
286 |r| prep_relation_expr(r, style),
287 |s| prep_scalar_expr(s, style),
288 )?;
289
290 let mut transform_ctx = TransformCtx::global(
292 &df_builder,
293 &mz_transform::EmptyStatisticsOracle, &self.config.features,
295 &self.typecheck_ctx,
296 &self.repr_typecheck_ctx,
297 &mut df_meta,
298 Some(&self.metrics),
299 );
300 for id in self.force_source_non_monotonic.iter() {
302 if let Some((_desc, monotonic, _upper)) = df_desc.source_imports.get_mut(id) {
303 *monotonic = false;
304 }
305 }
306 mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
308
309 if self.config.mode == OptimizeMode::Explain {
310 trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
312 }
313
314 self.duration += time.elapsed();
315
316 Ok(GlobalMirPlan { df_desc, df_meta })
318 }
319}
320
321impl Optimize<GlobalMirPlan> for Optimizer {
322 type To = GlobalLirPlan;
323
324 fn optimize(&mut self, plan: GlobalMirPlan) -> Result<Self::To, OptimizerError> {
325 let time = Instant::now();
326
327 let GlobalMirPlan {
328 mut df_desc,
329 df_meta,
330 } = plan;
331
332 for build in df_desc.objects_to_build.iter_mut() {
334 normalize_lets(&mut build.plan.0, &self.config.features)?
335 }
336
337 let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
341
342 trace_plan(&df_desc);
344
345 self.duration += time.elapsed();
346 self.metrics
347 .observe_e2e_optimization_time("materialized_view", self.duration);
348
349 Ok(GlobalLirPlan { df_desc, df_meta })
351 }
352}
353
354impl GlobalLirPlan {
355 pub fn unapply(self) -> (LirDataflowDescription, DataflowMetainfo) {
357 (self.df_desc, self.df_meta)
358 }
359}