1use std::marker::PhantomData;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use differential_dataflow::lattice::Lattice;
17use mz_compute_types::ComputeInstanceId;
18use mz_compute_types::plan::Plan;
19use mz_compute_types::sinks::{ComputeSinkConnection, ComputeSinkDesc, SubscribeSinkConnection};
20use mz_ore::collections::CollectionExt;
21use mz_ore::soft_assert_or_log;
22use mz_repr::{GlobalId, Timestamp};
23use mz_sql::optimizer_metrics::OptimizerMetrics;
24use mz_sql::plan::SubscribeFrom;
25use mz_transform::TransformCtx;
26use mz_transform::dataflow::{DataflowMetainfo, optimize_dataflow_snapshot};
27use mz_transform::normalize_lets::normalize_lets;
28use mz_transform::reprtypecheck::{
29 SharedContext as ReprTypecheckContext, empty_context as empty_repr_context,
30};
31use timely::progress::Antichain;
32
33use crate::CollectionIdBundle;
34use crate::optimize::dataflows::{
35 ComputeInstanceSnapshot, DataflowBuilder, ExprPrep, ExprPrepMaintained,
36 dataflow_import_id_bundle,
37};
38use crate::optimize::{
39 LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerCatalog,
40 OptimizerConfig, OptimizerError, optimize_mir_local, trace_plan,
41};
42
43pub struct Optimizer {
44 repr_typecheck_ctx: ReprTypecheckContext,
46 catalog: Arc<dyn OptimizerCatalog>,
48 compute_instance: ComputeInstanceSnapshot,
50 sink_id: GlobalId,
52 view_id: GlobalId,
55 with_snapshot: bool,
57 up_to: Option<Timestamp>,
59 debug_name: String,
61 config: OptimizerConfig,
63 metrics: OptimizerMetrics,
65 duration: Duration,
67}
68
69impl std::fmt::Debug for Optimizer {
75 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 f.debug_struct("Optimizer")
77 .field("config", &self.config)
78 .finish_non_exhaustive()
79 }
80}
81
82impl Optimizer {
83 pub fn new(
84 catalog: Arc<dyn OptimizerCatalog>,
85 compute_instance: ComputeInstanceSnapshot,
86 view_id: GlobalId,
87 sink_id: GlobalId,
88 with_snapshot: bool,
89 up_to: Option<Timestamp>,
90 debug_name: String,
91 config: OptimizerConfig,
92 metrics: OptimizerMetrics,
93 ) -> Self {
94 Self {
95 repr_typecheck_ctx: empty_repr_context(),
96 catalog,
97 compute_instance,
98 view_id,
99 sink_id,
100 with_snapshot,
101 up_to,
102 debug_name,
103 config,
104 metrics,
105 duration: Default::default(),
106 }
107 }
108
109 pub fn cluster_id(&self) -> ComputeInstanceId {
110 self.compute_instance.instance_id()
111 }
112
113 pub fn up_to(&self) -> Option<Timestamp> {
114 self.up_to.clone()
115 }
116
117 pub fn sink_id(&self) -> GlobalId {
118 self.sink_id
119 }
120}
121
122#[derive(Clone, Debug)]
128pub struct GlobalMirPlan<T: Clone> {
129 df_desc: MirDataflowDescription,
130 df_meta: DataflowMetainfo,
131 phantom: PhantomData<T>,
132}
133
134impl<T: Clone> GlobalMirPlan<T> {
135 pub fn id_bundle(&self, compute_instance_id: ComputeInstanceId) -> CollectionIdBundle {
137 dataflow_import_id_bundle(&self.df_desc, compute_instance_id)
138 }
139}
140
141#[derive(Clone, Debug)]
144pub struct GlobalLirPlan {
145 df_desc: LirDataflowDescription,
146 df_meta: DataflowMetainfo,
147}
148
149impl GlobalLirPlan {
150 pub fn sink_id(&self) -> GlobalId {
156 self.df_desc.sink_id()
157 }
158
159 pub fn as_of(&self) -> Option<Timestamp> {
160 self.df_desc.as_of.clone().map(|as_of| as_of.into_element())
161 }
162
163 pub fn sink_desc(&self) -> &ComputeSinkDesc {
169 let sink_exports = &self.df_desc.sink_exports;
170 let sink_desc = sink_exports.values().into_element();
171 sink_desc
172 }
173}
174
175#[derive(Clone, Debug)]
178pub struct Unresolved;
179
180#[derive(Clone, Debug)]
186pub struct Resolved;
187
188impl Optimize<SubscribeFrom> for Optimizer {
189 type To = GlobalMirPlan<Unresolved>;
190
191 fn optimize(&mut self, plan: SubscribeFrom) -> Result<Self::To, OptimizerError> {
192 let time = Instant::now();
193
194 let mut df_builder = {
195 let compute = self.compute_instance.clone();
196 DataflowBuilder::new(&*self.catalog, compute).with_config(&self.config)
197 };
198 let mut df_desc = MirDataflowDescription::new(self.debug_name.clone());
199 let mut df_meta = DataflowMetainfo::default();
200
201 match plan {
202 SubscribeFrom::Id(from_id) => {
203 let from = self.catalog.get_entry(&from_id);
204 let from_desc = from
205 .relation_desc()
206 .expect("subscribes can only be run on items with descs")
207 .into_owned();
208
209 df_builder.import_into_dataflow(&from_id, &mut df_desc, &self.config.features)?;
210 df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
211
212 let subscribe_conn = SubscribeSinkConnection::default();
214 let sink_description = ComputeSinkDesc {
215 from: from_id,
216 from_desc,
217 connection: ComputeSinkConnection::Subscribe(subscribe_conn),
218 with_snapshot: self.with_snapshot,
219 up_to: self.up_to.map(Antichain::from_elem).unwrap_or_default(),
220 non_null_assertions: vec![],
222 refresh_schedule: None,
224 };
225 df_desc.export_sink(self.sink_id, sink_description);
226 }
227 SubscribeFrom::Query { expr, desc } => {
228 let mut transform_ctx = TransformCtx::local(
237 &self.config.features,
238 &self.repr_typecheck_ctx,
239 &mut df_meta,
240 Some(&mut self.metrics),
241 Some(self.view_id),
242 );
243 let expr = optimize_mir_local(expr, &mut transform_ctx)?;
244
245 df_builder.import_view_into_dataflow(
246 &self.view_id,
247 &expr,
248 &mut df_desc,
249 &self.config.features,
250 )?;
251 df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
252
253 let subscribe_conn = SubscribeSinkConnection::default();
255 let sink_description = ComputeSinkDesc {
256 from: self.view_id,
257 from_desc: desc.clone(),
258 connection: ComputeSinkConnection::Subscribe(subscribe_conn),
259 with_snapshot: self.with_snapshot,
260 up_to: self.up_to.map(Antichain::from_elem).unwrap_or_default(),
261 non_null_assertions: vec![],
263 refresh_schedule: None,
265 };
266 df_desc.export_sink(self.sink_id, sink_description);
267 }
268 };
269
270 let style = ExprPrepMaintained;
272 df_desc.visit_children(
273 |r| style.prep_relation_expr(r),
274 |s| style.prep_scalar_expr(s),
275 )?;
276
277 let mut transform_ctx = TransformCtx::global(
279 &df_builder,
280 &mz_transform::EmptyStatisticsOracle, &self.config.features,
282 &self.repr_typecheck_ctx,
283 &mut df_meta,
284 Some(&mut self.metrics),
285 );
286 mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
288
289 if self.config.mode == OptimizeMode::Explain {
290 trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
292 }
293
294 self.duration += time.elapsed();
295
296 Ok(GlobalMirPlan {
298 df_desc,
299 df_meta,
300 phantom: PhantomData::<Unresolved>,
301 })
302 }
303}
304
305impl GlobalMirPlan<Unresolved> {
306 pub fn resolve(mut self, as_of: Antichain<Timestamp>) -> GlobalMirPlan<Resolved> {
312 soft_assert_or_log!(
315 self.df_desc.index_exports.is_empty(),
316 "unexpectedly setting until for a DataflowDescription with an index",
317 );
318
319 self.df_desc.set_as_of(as_of);
321
322 self.df_desc.until = Antichain::from_elem(Timestamp::MIN);
326 for (_, sink) in &self.df_desc.sink_exports {
327 self.df_desc.until.join_assign(&sink.up_to);
328 }
329
330 GlobalMirPlan {
331 df_desc: self.df_desc,
332 df_meta: self.df_meta,
333 phantom: PhantomData::<Resolved>,
334 }
335 }
336}
337
338impl Optimize<GlobalMirPlan<Resolved>> for Optimizer {
339 type To = GlobalLirPlan;
340
341 fn optimize(&mut self, plan: GlobalMirPlan<Resolved>) -> Result<Self::To, OptimizerError> {
342 let time = Instant::now();
343
344 let GlobalMirPlan {
345 mut df_desc,
346 df_meta,
347 phantom: _,
348 } = plan;
349
350 for build in df_desc.objects_to_build.iter_mut() {
352 normalize_lets(&mut build.plan.0, &self.config.features)?
353 }
354
355 if self.config.subscribe_snapshot_optimization {
356 optimize_dataflow_snapshot(&mut df_desc)?;
358 }
359
360 let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
364
365 self.duration += time.elapsed();
366 self.metrics
367 .observe_e2e_optimization_time("subscribe", self.duration);
368
369 Ok(GlobalLirPlan { df_desc, df_meta })
371 }
372}
373
374impl GlobalLirPlan {
375 pub fn unapply(self) -> (LirDataflowDescription, DataflowMetainfo) {
377 (self.df_desc, self.df_meta)
378 }
379}