1use std::collections::BTreeSet;
29use std::sync::Arc;
30use std::time::{Duration, Instant};
31
32use mz_compute_types::plan::Plan;
33use mz_compute_types::sinks::{
34 ComputeSinkConnection, ComputeSinkDesc, MaterializedViewSinkConnection,
35};
36use mz_expr::{MirRelationExpr, OptimizedMirRelationExpr};
37use mz_repr::explain::trace_plan;
38use mz_repr::refresh_schedule::RefreshSchedule;
39use mz_repr::{ColumnName, GlobalId, RelationDesc};
40use mz_sql::optimizer_metrics::OptimizerMetrics;
41use mz_sql::plan::HirRelationExpr;
42use mz_transform::TransformCtx;
43use mz_transform::dataflow::DataflowMetainfo;
44use mz_transform::normalize_lets::normalize_lets;
45use mz_transform::reprtypecheck::{
46 SharedContext as ReprTypecheckContext, empty_context as empty_repr_context,
47};
48use timely::progress::Antichain;
49
50use crate::optimize::dataflows::{
51 ComputeInstanceSnapshot, DataflowBuilder, ExprPrepStyle, prep_relation_expr, prep_scalar_expr,
52};
53use crate::optimize::{
54 LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerCatalog,
55 OptimizerConfig, OptimizerError, optimize_mir_local, trace_plan,
56};
57
58pub struct Optimizer {
59 repr_typecheck_ctx: ReprTypecheckContext,
61 catalog: Arc<dyn OptimizerCatalog>,
63 compute_instance: ComputeInstanceSnapshot,
65 sink_id: GlobalId,
67 view_id: GlobalId,
69 column_names: Vec<ColumnName>,
71 non_null_assertions: Vec<usize>,
74 refresh_schedule: Option<RefreshSchedule>,
76 debug_name: String,
78 config: OptimizerConfig,
80 metrics: OptimizerMetrics,
82 duration: Duration,
84 force_source_non_monotonic: BTreeSet<GlobalId>,
102}
103
104impl Optimizer {
105 pub fn new(
106 catalog: Arc<dyn OptimizerCatalog>,
107 compute_instance: ComputeInstanceSnapshot,
108 sink_id: GlobalId,
109 view_id: GlobalId,
110 column_names: Vec<ColumnName>,
111 non_null_assertions: Vec<usize>,
112 refresh_schedule: Option<RefreshSchedule>,
113 debug_name: String,
114 config: OptimizerConfig,
115 metrics: OptimizerMetrics,
116 force_source_non_monotonic: BTreeSet<GlobalId>,
117 ) -> Self {
118 Self {
119 repr_typecheck_ctx: empty_repr_context(),
120 catalog,
121 compute_instance,
122 sink_id,
123 view_id,
124 column_names,
125 non_null_assertions,
126 refresh_schedule,
127 debug_name,
128 config,
129 metrics,
130 duration: Default::default(),
131 force_source_non_monotonic,
132 }
133 }
134}
135
136#[derive(Clone, Debug)]
139pub struct LocalMirPlan {
140 expr: MirRelationExpr,
141 df_meta: DataflowMetainfo,
142}
143
144#[derive(Clone, Debug)]
150pub struct GlobalMirPlan {
151 df_desc: MirDataflowDescription,
152 df_meta: DataflowMetainfo,
153}
154
155impl GlobalMirPlan {
156 pub fn df_desc(&self) -> &MirDataflowDescription {
157 &self.df_desc
158 }
159}
160
161#[derive(Clone, Debug)]
164pub struct GlobalLirPlan {
165 df_desc: LirDataflowDescription,
166 df_meta: DataflowMetainfo,
167}
168
169impl GlobalLirPlan {
170 pub fn df_desc(&self) -> &LirDataflowDescription {
171 &self.df_desc
172 }
173
174 pub fn df_meta(&self) -> &DataflowMetainfo {
175 &self.df_meta
176 }
177
178 pub fn desc(&self) -> &RelationDesc {
179 let sink_exports = &self.df_desc.sink_exports;
180 let sink = sink_exports.values().next().expect("valid sink");
181 &sink.from_desc
182 }
183}
184
185impl Optimize<HirRelationExpr> for Optimizer {
186 type To = LocalMirPlan;
187
188 fn optimize(&mut self, expr: HirRelationExpr) -> Result<Self::To, OptimizerError> {
189 let time = Instant::now();
190
191 trace_plan!(at: "raw", &expr);
193
194 let expr = expr.lower(&self.config, Some(&self.metrics))?;
196
197 let mut df_meta = DataflowMetainfo::default();
199 let mut transform_ctx = TransformCtx::local(
200 &self.config.features,
201 &self.repr_typecheck_ctx,
202 &mut df_meta,
203 Some(&mut self.metrics),
204 Some(self.view_id),
205 );
206 let expr = optimize_mir_local(expr, &mut transform_ctx)?.into_inner();
207
208 self.duration += time.elapsed();
209
210 Ok(LocalMirPlan { expr, df_meta })
212 }
213}
214
215impl LocalMirPlan {
216 pub fn expr(&self) -> OptimizedMirRelationExpr {
217 OptimizedMirRelationExpr(self.expr.clone())
218 }
219}
220
221impl Optimize<OptimizedMirRelationExpr> for Optimizer {
224 type To = GlobalMirPlan;
225
226 fn optimize(&mut self, expr: OptimizedMirRelationExpr) -> Result<Self::To, OptimizerError> {
227 let expr = expr.into_inner();
228 let df_meta = DataflowMetainfo::default();
229 self.optimize(LocalMirPlan { expr, df_meta })
230 }
231}
232
233impl Optimize<LocalMirPlan> for Optimizer {
234 type To = GlobalMirPlan;
235
236 fn optimize(&mut self, plan: LocalMirPlan) -> Result<Self::To, OptimizerError> {
237 let time = Instant::now();
238
239 let expr = OptimizedMirRelationExpr(plan.expr);
240 let mut df_meta = plan.df_meta;
241
242 let mut rel_typ = expr.typ();
243 for &i in self.non_null_assertions.iter() {
244 rel_typ.column_types[i].nullable = false;
245 }
246 let rel_desc = RelationDesc::new(rel_typ, self.column_names.clone());
247
248 let mut df_builder = {
249 let compute = self.compute_instance.clone();
250 DataflowBuilder::new(&*self.catalog, compute).with_config(&self.config)
251 };
252 let mut df_desc = MirDataflowDescription::new(self.debug_name.clone());
253
254 df_desc.refresh_schedule.clone_from(&self.refresh_schedule);
255
256 df_builder.import_view_into_dataflow(
257 &self.view_id,
258 &expr,
259 &mut df_desc,
260 &self.config.features,
261 )?;
262 df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
263
264 let sink_description = ComputeSinkDesc {
265 from: self.view_id,
266 from_desc: rel_desc.clone(),
267 connection: ComputeSinkConnection::MaterializedView(MaterializedViewSinkConnection {
268 value_desc: rel_desc,
269 storage_metadata: (),
270 }),
271 with_snapshot: true,
272 up_to: Antichain::default(),
273 non_null_assertions: self.non_null_assertions.clone(),
274 refresh_schedule: self.refresh_schedule.clone(),
275 };
276 df_desc.export_sink(self.sink_id, sink_description);
277
278 let style = ExprPrepStyle::Maintained;
280 df_desc.visit_children(
281 |r| prep_relation_expr(r, style),
282 |s| prep_scalar_expr(s, style),
283 )?;
284
285 let mut transform_ctx = TransformCtx::global(
287 &df_builder,
288 &mz_transform::EmptyStatisticsOracle, &self.config.features,
290 &self.repr_typecheck_ctx,
291 &mut df_meta,
292 Some(&mut self.metrics),
293 );
294 for id in self.force_source_non_monotonic.iter() {
296 if let Some((_desc, monotonic, _upper)) = df_desc.source_imports.get_mut(id) {
297 *monotonic = false;
298 }
299 }
300 mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
302
303 if self.config.mode == OptimizeMode::Explain {
304 trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
306 }
307
308 self.duration += time.elapsed();
309
310 Ok(GlobalMirPlan { df_desc, df_meta })
312 }
313}
314
315impl Optimize<GlobalMirPlan> for Optimizer {
316 type To = GlobalLirPlan;
317
318 fn optimize(&mut self, plan: GlobalMirPlan) -> Result<Self::To, OptimizerError> {
319 let time = Instant::now();
320
321 let GlobalMirPlan {
322 mut df_desc,
323 df_meta,
324 } = plan;
325
326 for build in df_desc.objects_to_build.iter_mut() {
328 normalize_lets(&mut build.plan.0, &self.config.features)?
329 }
330
331 let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
335
336 trace_plan(&df_desc);
338
339 self.duration += time.elapsed();
340 self.metrics
341 .observe_e2e_optimization_time("materialized_view", self.duration);
342
343 Ok(GlobalLirPlan { df_desc, df_meta })
345 }
346}
347
348impl GlobalLirPlan {
349 pub fn unapply(self) -> (LirDataflowDescription, DataflowMetainfo) {
351 (self.df_desc, self.df_meta)
352 }
353}