1use std::marker::PhantomData;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use differential_dataflow::lattice::Lattice;
17use mz_compute_types::ComputeInstanceId;
18use mz_compute_types::plan::Plan;
19use mz_compute_types::sinks::{ComputeSinkConnection, ComputeSinkDesc, SubscribeSinkConnection};
20use mz_ore::soft_assert_or_log;
21use mz_repr::{GlobalId, Timestamp};
22use mz_sql::optimizer_metrics::OptimizerMetrics;
23use mz_sql::plan::{HirToMirConfig, SubscribeFrom, SubscribePlan};
24use mz_transform::TransformCtx;
25use mz_transform::dataflow::{DataflowMetainfo, optimize_dataflow_snapshot};
26use mz_transform::normalize_lets::normalize_lets;
27use mz_transform::typecheck::{SharedTypecheckingContext, empty_typechecking_context};
28use timely::progress::Antichain;
29
30use crate::CollectionIdBundle;
31use crate::optimize::dataflows::{
32 ComputeInstanceSnapshot, DataflowBuilder, ExprPrep, ExprPrepMaintained,
33 dataflow_import_id_bundle,
34};
35use crate::optimize::{
36 LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerCatalog,
37 OptimizerConfig, OptimizerError, optimize_mir_local, trace_plan,
38};
39
40pub struct Optimizer {
41 typecheck_ctx: SharedTypecheckingContext,
43 catalog: Arc<dyn OptimizerCatalog>,
45 compute_instance: ComputeInstanceSnapshot,
47 sink_id: GlobalId,
49 view_id: GlobalId,
52 with_snapshot: bool,
54 up_to: Option<Timestamp>,
56 debug_name: String,
58 config: OptimizerConfig,
60 metrics: OptimizerMetrics,
62 duration: Duration,
64}
65
66impl std::fmt::Debug for Optimizer {
72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 f.debug_struct("Optimizer")
74 .field("config", &self.config)
75 .finish_non_exhaustive()
76 }
77}
78
79impl Optimizer {
80 pub fn new(
81 catalog: Arc<dyn OptimizerCatalog>,
82 compute_instance: ComputeInstanceSnapshot,
83 view_id: GlobalId,
84 sink_id: GlobalId,
85 with_snapshot: bool,
86 up_to: Option<Timestamp>,
87 debug_name: String,
88 config: OptimizerConfig,
89 metrics: OptimizerMetrics,
90 ) -> Self {
91 Self {
92 typecheck_ctx: empty_typechecking_context(),
93 catalog,
94 compute_instance,
95 view_id,
96 sink_id,
97 with_snapshot,
98 up_to,
99 debug_name,
100 config,
101 metrics,
102 duration: Default::default(),
103 }
104 }
105
106 pub fn cluster_id(&self) -> ComputeInstanceId {
107 self.compute_instance.instance_id()
108 }
109
110 pub fn up_to(&self) -> Option<Timestamp> {
111 self.up_to.clone()
112 }
113
114 pub fn sink_id(&self) -> GlobalId {
115 self.sink_id
116 }
117}
118
119#[derive(Clone, Debug)]
125pub struct GlobalMirPlan<T: Clone> {
126 df_desc: MirDataflowDescription,
127 df_meta: DataflowMetainfo,
128 phantom: PhantomData<T>,
129}
130
131impl<T: Clone> GlobalMirPlan<T> {
132 pub fn id_bundle(&self, compute_instance_id: ComputeInstanceId) -> CollectionIdBundle {
134 dataflow_import_id_bundle(&self.df_desc, compute_instance_id)
135 }
136}
137
138#[derive(Clone, Debug)]
141pub struct GlobalLirPlan {
142 df_desc: LirDataflowDescription,
143 df_meta: DataflowMetainfo,
144}
145
146impl GlobalLirPlan {
147 pub fn sink_id(&self) -> GlobalId {
153 self.df_desc.sink_id()
154 }
155}
156
157#[derive(Clone, Debug)]
160pub struct Unresolved;
161
162#[derive(Clone, Debug)]
168pub struct Resolved;
169
170impl Optimize<SubscribePlan> for Optimizer {
171 type To = GlobalMirPlan<Unresolved>;
172
173 fn optimize(&mut self, plan: SubscribePlan) -> Result<Self::To, OptimizerError> {
174 let output = plan.output;
175 let plan = plan.from;
176 let time = Instant::now();
177
178 let mut df_builder = {
179 let compute = self.compute_instance.clone();
180 DataflowBuilder::new(&*self.catalog, compute).with_config(&self.config)
181 };
182 let mut df_desc = MirDataflowDescription::new(self.debug_name.clone());
183 let mut df_meta = DataflowMetainfo::default();
184
185 match plan {
186 SubscribeFrom::Id(from_id) => {
187 let from = self.catalog.get_entry(&from_id);
188 let from_desc = from
189 .relation_desc()
190 .expect("subscribes can only be run on items with descs")
191 .into_owned();
192
193 df_builder.import_into_dataflow(&from_id, &mut df_desc, &self.config.features)?;
194 df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
195
196 let sink_description = ComputeSinkDesc {
198 from: from_id,
199 from_desc,
200 connection: ComputeSinkConnection::Subscribe(SubscribeSinkConnection {
201 output: output.row_order().to_vec(),
202 }),
203 with_snapshot: self.with_snapshot,
204 up_to: self.up_to.map(Antichain::from_elem).unwrap_or_default(),
205 non_null_assertions: vec![],
207 refresh_schedule: None,
209 };
210 df_desc.export_sink(self.sink_id, sink_description);
211 }
212 SubscribeFrom::Query { expr, desc } => {
213 let mut transform_ctx = TransformCtx::local(
222 &self.config.features,
223 &self.typecheck_ctx,
224 &mut df_meta,
225 Some(&mut self.metrics),
226 Some(self.view_id),
227 );
228
229 let expr = expr.lower(HirToMirConfig::from(&self.config), None)?;
230 let expr = optimize_mir_local(expr, &mut transform_ctx)?;
231
232 df_builder.import_view_into_dataflow(
233 &self.view_id,
234 &expr,
235 &mut df_desc,
236 &self.config.features,
237 )?;
238 df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
239
240 let sink_description = ComputeSinkDesc {
242 from: self.view_id,
243 from_desc: desc.clone(),
244 connection: ComputeSinkConnection::Subscribe(SubscribeSinkConnection {
245 output: output.row_order().to_vec(),
246 }),
247 with_snapshot: self.with_snapshot,
248 up_to: self.up_to.map(Antichain::from_elem).unwrap_or_default(),
249 non_null_assertions: vec![],
251 refresh_schedule: None,
253 };
254 df_desc.export_sink(self.sink_id, sink_description);
255 }
256 };
257
258 let style = ExprPrepMaintained;
260 df_desc.visit_children(
261 |r| style.prep_relation_expr(r),
262 |s| style.prep_scalar_expr(s),
263 )?;
264
265 let mut transform_ctx = TransformCtx::global(
267 &df_builder,
268 &mz_transform::EmptyStatisticsOracle, &self.config.features,
270 &self.typecheck_ctx,
271 &mut df_meta,
272 Some(&mut self.metrics),
273 );
274 mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
276
277 if self.config.mode == OptimizeMode::Explain {
278 trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
280 }
281
282 self.duration += time.elapsed();
283
284 Ok(GlobalMirPlan {
286 df_desc,
287 df_meta,
288 phantom: PhantomData::<Unresolved>,
289 })
290 }
291}
292
293impl GlobalMirPlan<Unresolved> {
294 pub fn resolve(mut self, as_of: Antichain<Timestamp>) -> GlobalMirPlan<Resolved> {
300 soft_assert_or_log!(
303 self.df_desc.index_exports.is_empty(),
304 "unexpectedly setting until for a DataflowDescription with an index",
305 );
306
307 self.df_desc.set_as_of(as_of);
309
310 self.df_desc.until = Antichain::from_elem(Timestamp::MIN);
314 for (_, sink) in &self.df_desc.sink_exports {
315 self.df_desc.until.join_assign(&sink.up_to);
316 }
317
318 GlobalMirPlan {
319 df_desc: self.df_desc,
320 df_meta: self.df_meta,
321 phantom: PhantomData::<Resolved>,
322 }
323 }
324}
325
326impl Optimize<GlobalMirPlan<Resolved>> for Optimizer {
327 type To = GlobalLirPlan;
328
329 fn optimize(&mut self, plan: GlobalMirPlan<Resolved>) -> Result<Self::To, OptimizerError> {
330 let time = Instant::now();
331
332 let GlobalMirPlan {
333 mut df_desc,
334 df_meta,
335 phantom: _,
336 } = plan;
337
338 for build in df_desc.objects_to_build.iter_mut() {
340 normalize_lets(&mut build.plan.0, &self.config.features)?
341 }
342
343 if self.config.subscribe_snapshot_optimization {
344 optimize_dataflow_snapshot(&mut df_desc)?;
346 }
347
348 let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
352
353 self.duration += time.elapsed();
354 self.metrics
355 .observe_e2e_optimization_time("subscribe", self.duration);
356
357 Ok(GlobalLirPlan { df_desc, df_meta })
359 }
360}
361
362impl GlobalLirPlan {
363 pub fn unapply(self) -> (LirDataflowDescription, DataflowMetainfo) {
365 (self.df_desc, self.df_meta)
366 }
367}