1use std::collections::BTreeSet;
29use std::sync::Arc;
30use std::time::{Duration, Instant};
31
32use mz_compute_types::plan::Plan;
33use mz_compute_types::sinks::{
34 ComputeSinkConnection, ComputeSinkDesc, MaterializedViewSinkConnection,
35};
36use mz_expr::{MirRelationExpr, OptimizedMirRelationExpr};
37use mz_repr::explain::trace_plan;
38use mz_repr::refresh_schedule::RefreshSchedule;
39use mz_repr::{ColumnName, GlobalId, RelationDesc, SqlRelationType};
40use mz_sql::optimizer_metrics::OptimizerMetrics;
41use mz_sql::plan::HirRelationExpr;
42use mz_transform::TransformCtx;
43use mz_transform::dataflow::DataflowMetainfo;
44use mz_transform::normalize_lets::normalize_lets;
45use mz_transform::reprtypecheck::{
46 SharedContext as ReprTypecheckContext, empty_context as empty_repr_context,
47};
48use timely::progress::Antichain;
49
50use crate::coord::infer_sql_type_for_catalog;
51use crate::optimize::dataflows::{
52 ComputeInstanceSnapshot, DataflowBuilder, ExprPrep, ExprPrepMaintained,
53};
54use crate::optimize::{
55 LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerCatalog,
56 OptimizerConfig, OptimizerError, optimize_mir_local, trace_plan,
57};
58
59pub struct Optimizer {
60 repr_typecheck_ctx: ReprTypecheckContext,
62 catalog: Arc<dyn OptimizerCatalog>,
64 compute_instance: ComputeInstanceSnapshot,
66 sink_id: GlobalId,
68 view_id: GlobalId,
70 column_names: Vec<ColumnName>,
72 non_null_assertions: Vec<usize>,
75 refresh_schedule: Option<RefreshSchedule>,
77 debug_name: String,
79 config: OptimizerConfig,
81 metrics: OptimizerMetrics,
83 duration: Duration,
85 force_source_non_monotonic: BTreeSet<GlobalId>,
103}
104
105impl Optimizer {
106 pub fn new(
107 catalog: Arc<dyn OptimizerCatalog>,
108 compute_instance: ComputeInstanceSnapshot,
109 sink_id: GlobalId,
110 view_id: GlobalId,
111 column_names: Vec<ColumnName>,
112 non_null_assertions: Vec<usize>,
113 refresh_schedule: Option<RefreshSchedule>,
114 debug_name: String,
115 config: OptimizerConfig,
116 metrics: OptimizerMetrics,
117 force_source_non_monotonic: BTreeSet<GlobalId>,
118 ) -> Self {
119 Self {
120 repr_typecheck_ctx: empty_repr_context(),
121 catalog,
122 compute_instance,
123 sink_id,
124 view_id,
125 column_names,
126 non_null_assertions,
127 refresh_schedule,
128 debug_name,
129 config,
130 metrics,
131 duration: Default::default(),
132 force_source_non_monotonic,
133 }
134 }
135}
136
137#[derive(Clone, Debug)]
140pub struct LocalMirPlan {
141 expr: MirRelationExpr,
142 df_meta: DataflowMetainfo,
143 typ: SqlRelationType,
144}
145
146#[derive(Clone, Debug)]
152pub struct GlobalMirPlan {
153 df_desc: MirDataflowDescription,
154 df_meta: DataflowMetainfo,
155}
156
157impl GlobalMirPlan {
158 pub fn df_desc(&self) -> &MirDataflowDescription {
159 &self.df_desc
160 }
161}
162
163#[derive(Clone, Debug)]
166pub struct GlobalLirPlan {
167 df_desc: LirDataflowDescription,
168 df_meta: DataflowMetainfo,
169}
170
171impl GlobalLirPlan {
172 pub fn df_desc(&self) -> &LirDataflowDescription {
173 &self.df_desc
174 }
175
176 pub fn df_meta(&self) -> &DataflowMetainfo {
177 &self.df_meta
178 }
179
180 pub fn desc(&self) -> &RelationDesc {
181 let sink_exports = &self.df_desc.sink_exports;
182 let sink = sink_exports.values().next().expect("valid sink");
183 &sink.from_desc
184 }
185}
186
187impl Optimize<HirRelationExpr> for Optimizer {
188 type To = LocalMirPlan;
189
190 fn optimize(&mut self, expr: HirRelationExpr) -> Result<Self::To, OptimizerError> {
191 let time = Instant::now();
192
193 trace_plan!(at: "raw", &expr);
195
196 let mir_expr = expr.clone().lower(&self.config, Some(&self.metrics))?;
198
199 let mut df_meta = DataflowMetainfo::default();
201 let mut transform_ctx = TransformCtx::local(
202 &self.config.features,
203 &self.repr_typecheck_ctx,
204 &mut df_meta,
205 Some(&mut self.metrics),
206 Some(self.view_id),
207 );
208 let mir_expr = optimize_mir_local(mir_expr, &mut transform_ctx)?.into_inner();
209 let typ = infer_sql_type_for_catalog(&expr, &mir_expr);
210
211 self.duration += time.elapsed();
212
213 Ok(LocalMirPlan {
215 expr: mir_expr,
216 df_meta,
217 typ,
218 })
219 }
220}
221
222impl LocalMirPlan {
223 pub fn expr(&self) -> OptimizedMirRelationExpr {
224 OptimizedMirRelationExpr(self.expr.clone())
225 }
226}
227
228impl Optimize<(OptimizedMirRelationExpr, SqlRelationType)> for Optimizer {
231 type To = GlobalMirPlan;
232
233 fn optimize(
234 &mut self,
235 (expr, typ): (OptimizedMirRelationExpr, SqlRelationType),
236 ) -> Result<Self::To, OptimizerError> {
237 let expr = expr.into_inner();
238 let df_meta = DataflowMetainfo::default();
239 self.optimize(LocalMirPlan { expr, df_meta, typ })
240 }
241}
242
243impl Optimize<LocalMirPlan> for Optimizer {
244 type To = GlobalMirPlan;
245
246 fn optimize(&mut self, plan: LocalMirPlan) -> Result<Self::To, OptimizerError> {
247 let time = Instant::now();
248
249 let expr = OptimizedMirRelationExpr(plan.expr);
250 let mut df_meta = plan.df_meta;
251
252 let mut rel_typ = plan.typ;
253 for &i in self.non_null_assertions.iter() {
254 rel_typ.column_types[i].nullable = false;
255 }
256 let rel_desc = RelationDesc::new(rel_typ, self.column_names.clone());
257
258 let mut df_builder = {
259 let compute = self.compute_instance.clone();
260 DataflowBuilder::new(&*self.catalog, compute).with_config(&self.config)
261 };
262 let mut df_desc = MirDataflowDescription::new(self.debug_name.clone());
263
264 df_desc.refresh_schedule.clone_from(&self.refresh_schedule);
265
266 df_builder.import_view_into_dataflow(
267 &self.view_id,
268 &expr,
269 &mut df_desc,
270 &self.config.features,
271 )?;
272 df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
273
274 let sink_description = ComputeSinkDesc {
275 from: self.view_id,
276 from_desc: rel_desc.clone(),
277 connection: ComputeSinkConnection::MaterializedView(MaterializedViewSinkConnection {
278 value_desc: rel_desc,
279 storage_metadata: (),
280 }),
281 with_snapshot: true,
282 up_to: Antichain::default(),
283 non_null_assertions: self.non_null_assertions.clone(),
284 refresh_schedule: self.refresh_schedule.clone(),
285 };
286 df_desc.export_sink(self.sink_id, sink_description);
287
288 let style = ExprPrepMaintained;
290 df_desc.visit_children(
291 |r| style.prep_relation_expr(r),
292 |s| style.prep_scalar_expr(s),
293 )?;
294
295 let mut transform_ctx = TransformCtx::global(
297 &df_builder,
298 &mz_transform::EmptyStatisticsOracle, &self.config.features,
300 &self.repr_typecheck_ctx,
301 &mut df_meta,
302 Some(&mut self.metrics),
303 );
304 for id in self.force_source_non_monotonic.iter() {
306 if let Some(import) = df_desc.source_imports.get_mut(id) {
307 import.monotonic = false;
308 }
309 }
310 mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
312
313 if self.config.mode == OptimizeMode::Explain {
314 trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
316 }
317
318 self.duration += time.elapsed();
319
320 Ok(GlobalMirPlan { df_desc, df_meta })
322 }
323}
324
325impl Optimize<GlobalMirPlan> for Optimizer {
326 type To = GlobalLirPlan;
327
328 fn optimize(&mut self, plan: GlobalMirPlan) -> Result<Self::To, OptimizerError> {
329 let time = Instant::now();
330
331 let GlobalMirPlan {
332 mut df_desc,
333 df_meta,
334 } = plan;
335
336 for build in df_desc.objects_to_build.iter_mut() {
338 normalize_lets(&mut build.plan.0, &self.config.features)?
339 }
340
341 let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
345
346 trace_plan(&df_desc);
348
349 self.duration += time.elapsed();
350 self.metrics
351 .observe_e2e_optimization_time("materialized_view", self.duration);
352
353 Ok(GlobalLirPlan { df_desc, df_meta })
355 }
356}
357
358impl GlobalLirPlan {
359 pub fn unapply(self) -> (LirDataflowDescription, DataflowMetainfo) {
361 (self.df_desc, self.df_meta)
362 }
363}