1use std::marker::PhantomData;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use differential_dataflow::lattice::Lattice;
17use mz_compute_types::ComputeInstanceId;
18use mz_compute_types::plan::Plan;
19use mz_compute_types::sinks::{ComputeSinkConnection, ComputeSinkDesc, SubscribeSinkConnection};
20use mz_ore::collections::CollectionExt;
21use mz_ore::soft_assert_or_log;
22use mz_repr::{GlobalId, Timestamp};
23use mz_sql::optimizer_metrics::OptimizerMetrics;
24use mz_sql::plan::SubscribeFrom;
25use mz_transform::TransformCtx;
26use mz_transform::dataflow::{DataflowMetainfo, optimize_dataflow_snapshot};
27use mz_transform::normalize_lets::normalize_lets;
28use mz_transform::typecheck::{SharedTypecheckingContext, empty_typechecking_context};
29use timely::progress::Antichain;
30
31use crate::CollectionIdBundle;
32use crate::optimize::dataflows::{
33 ComputeInstanceSnapshot, DataflowBuilder, ExprPrep, ExprPrepMaintained,
34 dataflow_import_id_bundle,
35};
36use crate::optimize::{
37 LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerCatalog,
38 OptimizerConfig, OptimizerError, optimize_mir_local, trace_plan,
39};
40
41pub struct Optimizer {
42 typecheck_ctx: SharedTypecheckingContext,
44 catalog: Arc<dyn OptimizerCatalog>,
46 compute_instance: ComputeInstanceSnapshot,
48 sink_id: GlobalId,
50 view_id: GlobalId,
53 with_snapshot: bool,
55 up_to: Option<Timestamp>,
57 debug_name: String,
59 config: OptimizerConfig,
61 metrics: OptimizerMetrics,
63 duration: Duration,
65}
66
67impl std::fmt::Debug for Optimizer {
73 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74 f.debug_struct("Optimizer")
75 .field("config", &self.config)
76 .finish_non_exhaustive()
77 }
78}
79
80impl Optimizer {
81 pub fn new(
82 catalog: Arc<dyn OptimizerCatalog>,
83 compute_instance: ComputeInstanceSnapshot,
84 view_id: GlobalId,
85 sink_id: GlobalId,
86 with_snapshot: bool,
87 up_to: Option<Timestamp>,
88 debug_name: String,
89 config: OptimizerConfig,
90 metrics: OptimizerMetrics,
91 ) -> Self {
92 Self {
93 typecheck_ctx: empty_typechecking_context(),
94 catalog,
95 compute_instance,
96 view_id,
97 sink_id,
98 with_snapshot,
99 up_to,
100 debug_name,
101 config,
102 metrics,
103 duration: Default::default(),
104 }
105 }
106
107 pub fn cluster_id(&self) -> ComputeInstanceId {
108 self.compute_instance.instance_id()
109 }
110
111 pub fn up_to(&self) -> Option<Timestamp> {
112 self.up_to.clone()
113 }
114
115 pub fn sink_id(&self) -> GlobalId {
116 self.sink_id
117 }
118}
119
120#[derive(Clone, Debug)]
126pub struct GlobalMirPlan<T: Clone> {
127 df_desc: MirDataflowDescription,
128 df_meta: DataflowMetainfo,
129 phantom: PhantomData<T>,
130}
131
132impl<T: Clone> GlobalMirPlan<T> {
133 pub fn id_bundle(&self, compute_instance_id: ComputeInstanceId) -> CollectionIdBundle {
135 dataflow_import_id_bundle(&self.df_desc, compute_instance_id)
136 }
137}
138
139#[derive(Clone, Debug)]
142pub struct GlobalLirPlan {
143 df_desc: LirDataflowDescription,
144 df_meta: DataflowMetainfo,
145}
146
147impl GlobalLirPlan {
148 pub fn sink_id(&self) -> GlobalId {
154 self.df_desc.sink_id()
155 }
156
157 pub fn as_of(&self) -> Option<Timestamp> {
158 self.df_desc.as_of.clone().map(|as_of| as_of.into_element())
159 }
160
161 pub fn sink_desc(&self) -> &ComputeSinkDesc {
167 let sink_exports = &self.df_desc.sink_exports;
168 let sink_desc = sink_exports.values().into_element();
169 sink_desc
170 }
171}
172
173#[derive(Clone, Debug)]
176pub struct Unresolved;
177
178#[derive(Clone, Debug)]
184pub struct Resolved;
185
186impl Optimize<SubscribeFrom> for Optimizer {
187 type To = GlobalMirPlan<Unresolved>;
188
189 fn optimize(&mut self, plan: SubscribeFrom) -> Result<Self::To, OptimizerError> {
190 let time = Instant::now();
191
192 let mut df_builder = {
193 let compute = self.compute_instance.clone();
194 DataflowBuilder::new(&*self.catalog, compute).with_config(&self.config)
195 };
196 let mut df_desc = MirDataflowDescription::new(self.debug_name.clone());
197 let mut df_meta = DataflowMetainfo::default();
198
199 match plan {
200 SubscribeFrom::Id(from_id) => {
201 let from = self.catalog.get_entry(&from_id);
202 let from_desc = from
203 .relation_desc()
204 .expect("subscribes can only be run on items with descs")
205 .into_owned();
206
207 df_builder.import_into_dataflow(&from_id, &mut df_desc, &self.config.features)?;
208 df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
209
210 let subscribe_conn = SubscribeSinkConnection::default();
212 let sink_description = ComputeSinkDesc {
213 from: from_id,
214 from_desc,
215 connection: ComputeSinkConnection::Subscribe(subscribe_conn),
216 with_snapshot: self.with_snapshot,
217 up_to: self.up_to.map(Antichain::from_elem).unwrap_or_default(),
218 non_null_assertions: vec![],
220 refresh_schedule: None,
222 };
223 df_desc.export_sink(self.sink_id, sink_description);
224 }
225 SubscribeFrom::Query { expr, desc } => {
226 let mut transform_ctx = TransformCtx::local(
235 &self.config.features,
236 &self.typecheck_ctx,
237 &mut df_meta,
238 Some(&mut self.metrics),
239 Some(self.view_id),
240 );
241 let expr = optimize_mir_local(expr, &mut transform_ctx)?;
242
243 df_builder.import_view_into_dataflow(
244 &self.view_id,
245 &expr,
246 &mut df_desc,
247 &self.config.features,
248 )?;
249 df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
250
251 let subscribe_conn = SubscribeSinkConnection::default();
253 let sink_description = ComputeSinkDesc {
254 from: self.view_id,
255 from_desc: desc.clone(),
256 connection: ComputeSinkConnection::Subscribe(subscribe_conn),
257 with_snapshot: self.with_snapshot,
258 up_to: self.up_to.map(Antichain::from_elem).unwrap_or_default(),
259 non_null_assertions: vec![],
261 refresh_schedule: None,
263 };
264 df_desc.export_sink(self.sink_id, sink_description);
265 }
266 };
267
268 let style = ExprPrepMaintained;
270 df_desc.visit_children(
271 |r| style.prep_relation_expr(r),
272 |s| style.prep_scalar_expr(s),
273 )?;
274
275 let mut transform_ctx = TransformCtx::global(
277 &df_builder,
278 &mz_transform::EmptyStatisticsOracle, &self.config.features,
280 &self.typecheck_ctx,
281 &mut df_meta,
282 Some(&mut self.metrics),
283 );
284 mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
286
287 if self.config.mode == OptimizeMode::Explain {
288 trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
290 }
291
292 self.duration += time.elapsed();
293
294 Ok(GlobalMirPlan {
296 df_desc,
297 df_meta,
298 phantom: PhantomData::<Unresolved>,
299 })
300 }
301}
302
303impl GlobalMirPlan<Unresolved> {
304 pub fn resolve(mut self, as_of: Antichain<Timestamp>) -> GlobalMirPlan<Resolved> {
310 soft_assert_or_log!(
313 self.df_desc.index_exports.is_empty(),
314 "unexpectedly setting until for a DataflowDescription with an index",
315 );
316
317 self.df_desc.set_as_of(as_of);
319
320 self.df_desc.until = Antichain::from_elem(Timestamp::MIN);
324 for (_, sink) in &self.df_desc.sink_exports {
325 self.df_desc.until.join_assign(&sink.up_to);
326 }
327
328 GlobalMirPlan {
329 df_desc: self.df_desc,
330 df_meta: self.df_meta,
331 phantom: PhantomData::<Resolved>,
332 }
333 }
334}
335
336impl Optimize<GlobalMirPlan<Resolved>> for Optimizer {
337 type To = GlobalLirPlan;
338
339 fn optimize(&mut self, plan: GlobalMirPlan<Resolved>) -> Result<Self::To, OptimizerError> {
340 let time = Instant::now();
341
342 let GlobalMirPlan {
343 mut df_desc,
344 df_meta,
345 phantom: _,
346 } = plan;
347
348 for build in df_desc.objects_to_build.iter_mut() {
350 normalize_lets(&mut build.plan.0, &self.config.features)?
351 }
352
353 if self.config.subscribe_snapshot_optimization {
354 optimize_dataflow_snapshot(&mut df_desc)?;
356 }
357
358 let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
362
363 self.duration += time.elapsed();
364 self.metrics
365 .observe_e2e_optimization_time("subscribe", self.duration);
366
367 Ok(GlobalLirPlan { df_desc, df_meta })
369 }
370}
371
372impl GlobalLirPlan {
373 pub fn unapply(self) -> (LirDataflowDescription, DataflowMetainfo) {
375 (self.df_desc, self.df_meta)
376 }
377}