1use std::marker::PhantomData;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use differential_dataflow::lattice::Lattice;
17use mz_compute_types::ComputeInstanceId;
18use mz_compute_types::plan::Plan;
19use mz_compute_types::sinks::{ComputeSinkConnection, ComputeSinkDesc, SubscribeSinkConnection};
20use mz_ore::collections::CollectionExt;
21use mz_ore::soft_assert_or_log;
22use mz_repr::{GlobalId, Timestamp};
23use mz_sql::optimizer_metrics::OptimizerMetrics;
24use mz_sql::plan::SubscribeFrom;
25use mz_transform::TransformCtx;
26use mz_transform::dataflow::{DataflowMetainfo, optimize_dataflow_snapshot};
27use mz_transform::normalize_lets::normalize_lets;
28use mz_transform::reprtypecheck::{
29 SharedContext as ReprTypecheckContext, empty_context as empty_repr_context,
30};
31use timely::progress::Antichain;
32
33use crate::CollectionIdBundle;
34use crate::optimize::dataflows::{
35 ComputeInstanceSnapshot, DataflowBuilder, ExprPrep, ExprPrepMaintained,
36 dataflow_import_id_bundle,
37};
38use crate::optimize::{
39 LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerCatalog,
40 OptimizerConfig, OptimizerError, optimize_mir_local, trace_plan,
41};
42
43pub struct Optimizer {
44 repr_typecheck_ctx: ReprTypecheckContext,
46 catalog: Arc<dyn OptimizerCatalog>,
48 compute_instance: ComputeInstanceSnapshot,
50 sink_id: GlobalId,
52 view_id: GlobalId,
55 with_snapshot: bool,
57 up_to: Option<Timestamp>,
59 debug_name: String,
61 config: OptimizerConfig,
63 metrics: OptimizerMetrics,
65 duration: Duration,
67}
68
69impl std::fmt::Debug for Optimizer {
75 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 f.debug_struct("Optimizer")
77 .field("config", &self.config)
78 .finish_non_exhaustive()
79 }
80}
81
82impl Optimizer {
83 pub fn new(
84 catalog: Arc<dyn OptimizerCatalog>,
85 compute_instance: ComputeInstanceSnapshot,
86 view_id: GlobalId,
87 sink_id: GlobalId,
88 with_snapshot: bool,
89 up_to: Option<Timestamp>,
90 debug_name: String,
91 config: OptimizerConfig,
92 metrics: OptimizerMetrics,
93 ) -> Self {
94 Self {
95 repr_typecheck_ctx: empty_repr_context(),
96 catalog,
97 compute_instance,
98 view_id,
99 sink_id,
100 with_snapshot,
101 up_to,
102 debug_name,
103 config,
104 metrics,
105 duration: Default::default(),
106 }
107 }
108
109 pub fn cluster_id(&self) -> ComputeInstanceId {
110 self.compute_instance.instance_id()
111 }
112
113 pub fn up_to(&self) -> Option<Timestamp> {
114 self.up_to.clone()
115 }
116}
117
118#[derive(Clone, Debug)]
124pub struct GlobalMirPlan<T: Clone> {
125 df_desc: MirDataflowDescription,
126 df_meta: DataflowMetainfo,
127 phantom: PhantomData<T>,
128}
129
130impl<T: Clone> GlobalMirPlan<T> {
131 pub fn id_bundle(&self, compute_instance_id: ComputeInstanceId) -> CollectionIdBundle {
133 dataflow_import_id_bundle(&self.df_desc, compute_instance_id)
134 }
135}
136
137#[derive(Clone, Debug)]
140pub struct GlobalLirPlan {
141 df_desc: LirDataflowDescription,
142 df_meta: DataflowMetainfo,
143}
144
145impl GlobalLirPlan {
146 pub fn sink_id(&self) -> GlobalId {
152 self.df_desc.sink_id()
153 }
154
155 pub fn as_of(&self) -> Option<Timestamp> {
156 self.df_desc.as_of.clone().map(|as_of| as_of.into_element())
157 }
158
159 pub fn sink_desc(&self) -> &ComputeSinkDesc {
165 let sink_exports = &self.df_desc.sink_exports;
166 let sink_desc = sink_exports.values().into_element();
167 sink_desc
168 }
169}
170
171#[derive(Clone, Debug)]
174pub struct Unresolved;
175
176#[derive(Clone, Debug)]
182pub struct Resolved;
183
184impl Optimize<SubscribeFrom> for Optimizer {
185 type To = GlobalMirPlan<Unresolved>;
186
187 fn optimize(&mut self, plan: SubscribeFrom) -> Result<Self::To, OptimizerError> {
188 let time = Instant::now();
189
190 let mut df_builder = {
191 let compute = self.compute_instance.clone();
192 DataflowBuilder::new(&*self.catalog, compute).with_config(&self.config)
193 };
194 let mut df_desc = MirDataflowDescription::new(self.debug_name.clone());
195 let mut df_meta = DataflowMetainfo::default();
196
197 match plan {
198 SubscribeFrom::Id(from_id) => {
199 let from = self.catalog.get_entry(&from_id);
200 let from_desc = from
201 .relation_desc()
202 .expect("subscribes can only be run on items with descs")
203 .into_owned();
204
205 df_builder.import_into_dataflow(&from_id, &mut df_desc, &self.config.features)?;
206 df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
207
208 let sink_description = ComputeSinkDesc {
210 from: from_id,
211 from_desc,
212 connection: ComputeSinkConnection::Subscribe(SubscribeSinkConnection::default()),
213 with_snapshot: self.with_snapshot,
214 up_to: self.up_to.map(Antichain::from_elem).unwrap_or_default(),
215 non_null_assertions: vec![],
217 refresh_schedule: None,
219 };
220 df_desc.export_sink(self.sink_id, sink_description);
221 }
222 SubscribeFrom::Query { expr, desc } => {
223 let mut transform_ctx = TransformCtx::local(
232 &self.config.features,
233 &self.repr_typecheck_ctx,
234 &mut df_meta,
235 Some(&mut self.metrics),
236 Some(self.view_id),
237 );
238 let expr = optimize_mir_local(expr, &mut transform_ctx)?;
239
240 df_builder.import_view_into_dataflow(
241 &self.view_id,
242 &expr,
243 &mut df_desc,
244 &self.config.features,
245 )?;
246 df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
247
248 let sink_description = ComputeSinkDesc {
250 from: self.view_id,
251 from_desc: desc.clone(),
252 connection: ComputeSinkConnection::Subscribe(SubscribeSinkConnection::default()),
253 with_snapshot: self.with_snapshot,
254 up_to: self.up_to.map(Antichain::from_elem).unwrap_or_default(),
255 non_null_assertions: vec![],
257 refresh_schedule: None,
259 };
260 df_desc.export_sink(self.sink_id, sink_description);
261 }
262 };
263
264 let style = ExprPrepMaintained;
266 df_desc.visit_children(
267 |r| style.prep_relation_expr(r),
268 |s| style.prep_scalar_expr(s),
269 )?;
270
271 let mut transform_ctx = TransformCtx::global(
273 &df_builder,
274 &mz_transform::EmptyStatisticsOracle, &self.config.features,
276 &self.repr_typecheck_ctx,
277 &mut df_meta,
278 Some(&mut self.metrics),
279 );
280 mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
282
283 if self.config.mode == OptimizeMode::Explain {
284 trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
286 }
287
288 self.duration += time.elapsed();
289
290 Ok(GlobalMirPlan {
292 df_desc,
293 df_meta,
294 phantom: PhantomData::<Unresolved>,
295 })
296 }
297}
298
299impl GlobalMirPlan<Unresolved> {
300 pub fn resolve(mut self, as_of: Antichain<Timestamp>) -> GlobalMirPlan<Resolved> {
306 soft_assert_or_log!(
309 self.df_desc.index_exports.is_empty(),
310 "unexpectedly setting until for a DataflowDescription with an index",
311 );
312
313 self.df_desc.set_as_of(as_of);
315
316 self.df_desc.until = Antichain::from_elem(Timestamp::MIN);
320 for (_, sink) in &self.df_desc.sink_exports {
321 self.df_desc.until.join_assign(&sink.up_to);
322 }
323
324 GlobalMirPlan {
325 df_desc: self.df_desc,
326 df_meta: self.df_meta,
327 phantom: PhantomData::<Resolved>,
328 }
329 }
330}
331
332impl Optimize<GlobalMirPlan<Resolved>> for Optimizer {
333 type To = GlobalLirPlan;
334
335 fn optimize(&mut self, plan: GlobalMirPlan<Resolved>) -> Result<Self::To, OptimizerError> {
336 let time = Instant::now();
337
338 let GlobalMirPlan {
339 mut df_desc,
340 df_meta,
341 phantom: _,
342 } = plan;
343
344 for build in df_desc.objects_to_build.iter_mut() {
346 normalize_lets(&mut build.plan.0, &self.config.features)?
347 }
348
349 if self.config.subscribe_snapshot_optimization {
350 optimize_dataflow_snapshot(&mut df_desc)?;
352 }
353
354 let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
358
359 self.duration += time.elapsed();
360 self.metrics
361 .observe_e2e_optimization_time("subscribe", self.duration);
362
363 Ok(GlobalLirPlan { df_desc, df_meta })
365 }
366}
367
368impl GlobalLirPlan {
369 pub fn unapply(self) -> (LirDataflowDescription, DataflowMetainfo) {
371 (self.df_desc, self.df_meta)
372 }
373}