1use std::marker::PhantomData;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use differential_dataflow::lattice::Lattice;
17use mz_compute_types::ComputeInstanceId;
18use mz_compute_types::plan::Plan;
19use mz_compute_types::sinks::{ComputeSinkConnection, ComputeSinkDesc, SubscribeSinkConnection};
20use mz_ore::collections::CollectionExt;
21use mz_ore::soft_assert_or_log;
22use mz_repr::{GlobalId, RelationDesc, Timestamp};
23use mz_sql::optimizer_metrics::OptimizerMetrics;
24use mz_sql::plan::SubscribeFrom;
25use mz_transform::TransformCtx;
26use mz_transform::dataflow::DataflowMetainfo;
27use mz_transform::normalize_lets::normalize_lets;
28use mz_transform::reprtypecheck::{
29 SharedContext as ReprTypecheckContext, empty_context as empty_repr_context,
30};
31use timely::progress::Antichain;
32
33use crate::CollectionIdBundle;
34use crate::optimize::dataflows::{
35 ComputeInstanceSnapshot, DataflowBuilder, ExprPrepStyle, dataflow_import_id_bundle,
36 prep_relation_expr, prep_scalar_expr,
37};
38use crate::optimize::{
39 LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerCatalog,
40 OptimizerConfig, OptimizerError, optimize_mir_local, trace_plan,
41};
42
43pub struct Optimizer {
44 repr_typecheck_ctx: ReprTypecheckContext,
46 catalog: Arc<dyn OptimizerCatalog>,
48 compute_instance: ComputeInstanceSnapshot,
50 sink_id: GlobalId,
52 view_id: GlobalId,
55 with_snapshot: bool,
57 up_to: Option<Timestamp>,
59 debug_name: String,
61 config: OptimizerConfig,
63 metrics: OptimizerMetrics,
65 duration: Duration,
67}
68
69impl std::fmt::Debug for Optimizer {
75 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 f.debug_struct("Optimizer")
77 .field("config", &self.config)
78 .finish_non_exhaustive()
79 }
80}
81
82impl Optimizer {
83 pub fn new(
84 catalog: Arc<dyn OptimizerCatalog>,
85 compute_instance: ComputeInstanceSnapshot,
86 view_id: GlobalId,
87 sink_id: GlobalId,
88 with_snapshot: bool,
89 up_to: Option<Timestamp>,
90 debug_name: String,
91 config: OptimizerConfig,
92 metrics: OptimizerMetrics,
93 ) -> Self {
94 Self {
95 repr_typecheck_ctx: empty_repr_context(),
96 catalog,
97 compute_instance,
98 view_id,
99 sink_id,
100 with_snapshot,
101 up_to,
102 debug_name,
103 config,
104 metrics,
105 duration: Default::default(),
106 }
107 }
108
109 pub fn cluster_id(&self) -> ComputeInstanceId {
110 self.compute_instance.instance_id()
111 }
112
113 pub fn up_to(&self) -> Option<Timestamp> {
114 self.up_to.clone()
115 }
116}
117
118#[derive(Clone, Debug)]
124pub struct GlobalMirPlan<T: Clone> {
125 df_desc: MirDataflowDescription,
126 df_meta: DataflowMetainfo,
127 phantom: PhantomData<T>,
128}
129
130impl<T: Clone> GlobalMirPlan<T> {
131 pub fn id_bundle(&self, compute_instance_id: ComputeInstanceId) -> CollectionIdBundle {
133 dataflow_import_id_bundle(&self.df_desc, compute_instance_id)
134 }
135}
136
137#[derive(Clone, Debug)]
140pub struct GlobalLirPlan {
141 df_desc: LirDataflowDescription,
142 df_meta: DataflowMetainfo,
143}
144
145impl GlobalLirPlan {
146 pub fn sink_id(&self) -> GlobalId {
152 self.df_desc.sink_id()
153 }
154
155 pub fn as_of(&self) -> Option<Timestamp> {
156 self.df_desc.as_of.clone().map(|as_of| as_of.into_element())
157 }
158
159 pub fn sink_desc(&self) -> &ComputeSinkDesc {
165 let sink_exports = &self.df_desc.sink_exports;
166 let sink_desc = sink_exports.values().into_element();
167 sink_desc
168 }
169}
170
171#[derive(Clone, Debug)]
174pub struct Unresolved;
175
176#[derive(Clone, Debug)]
182pub struct Resolved;
183
184impl Optimize<SubscribeFrom> for Optimizer {
185 type To = GlobalMirPlan<Unresolved>;
186
187 fn optimize(&mut self, plan: SubscribeFrom) -> Result<Self::To, OptimizerError> {
188 let time = Instant::now();
189
190 let mut df_builder = {
191 let compute = self.compute_instance.clone();
192 DataflowBuilder::new(&*self.catalog, compute).with_config(&self.config)
193 };
194 let mut df_desc = MirDataflowDescription::new(self.debug_name.clone());
195 let mut df_meta = DataflowMetainfo::default();
196
197 match plan {
198 SubscribeFrom::Id(from_id) => {
199 let from = self.catalog.get_entry(&from_id);
200 let from_desc = from
201 .relation_desc()
202 .expect("subscribes can only be run on items with descs")
203 .into_owned();
204
205 df_builder.import_into_dataflow(&from_id, &mut df_desc, &self.config.features)?;
206 df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
207
208 let sink_description = ComputeSinkDesc {
210 from: from_id,
211 from_desc,
212 connection: ComputeSinkConnection::Subscribe(SubscribeSinkConnection::default()),
213 with_snapshot: self.with_snapshot,
214 up_to: self.up_to.map(Antichain::from_elem).unwrap_or_default(),
215 non_null_assertions: vec![],
217 refresh_schedule: None,
219 };
220 df_desc.export_sink(self.sink_id, sink_description);
221 }
222 SubscribeFrom::Query { expr, desc } => {
223 let mut transform_ctx = TransformCtx::local(
231 &self.config.features,
232 &self.repr_typecheck_ctx,
233 &mut df_meta,
234 Some(&mut self.metrics),
235 Some(self.view_id),
236 );
237 let expr = optimize_mir_local(expr, &mut transform_ctx)?;
238
239 df_builder.import_view_into_dataflow(
240 &self.view_id,
241 &expr,
242 &mut df_desc,
243 &self.config.features,
244 )?;
245 df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
246
247 let sink_description = ComputeSinkDesc {
249 from: self.view_id,
250 from_desc: RelationDesc::new(expr.typ(), desc.iter_names()),
251 connection: ComputeSinkConnection::Subscribe(SubscribeSinkConnection::default()),
252 with_snapshot: self.with_snapshot,
253 up_to: self.up_to.map(Antichain::from_elem).unwrap_or_default(),
254 non_null_assertions: vec![],
256 refresh_schedule: None,
258 };
259 df_desc.export_sink(self.sink_id, sink_description);
260 }
261 };
262
263 let style = ExprPrepStyle::Maintained;
265 df_desc.visit_children(
266 |r| prep_relation_expr(r, style),
267 |s| prep_scalar_expr(s, style),
268 )?;
269
270 let mut transform_ctx = TransformCtx::global(
272 &df_builder,
273 &mz_transform::EmptyStatisticsOracle, &self.config.features,
275 &self.repr_typecheck_ctx,
276 &mut df_meta,
277 Some(&mut self.metrics),
278 );
279 mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
281
282 if self.config.mode == OptimizeMode::Explain {
283 trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
285 }
286
287 self.duration += time.elapsed();
288
289 Ok(GlobalMirPlan {
291 df_desc,
292 df_meta,
293 phantom: PhantomData::<Unresolved>,
294 })
295 }
296}
297
298impl GlobalMirPlan<Unresolved> {
299 pub fn resolve(mut self, as_of: Antichain<Timestamp>) -> GlobalMirPlan<Resolved> {
305 soft_assert_or_log!(
308 self.df_desc.index_exports.is_empty(),
309 "unexpectedly setting until for a DataflowDescription with an index",
310 );
311
312 self.df_desc.set_as_of(as_of);
314
315 self.df_desc.until = Antichain::from_elem(Timestamp::MIN);
319 for (_, sink) in &self.df_desc.sink_exports {
320 self.df_desc.until.join_assign(&sink.up_to);
321 }
322
323 GlobalMirPlan {
324 df_desc: self.df_desc,
325 df_meta: self.df_meta,
326 phantom: PhantomData::<Resolved>,
327 }
328 }
329}
330
331impl Optimize<GlobalMirPlan<Resolved>> for Optimizer {
332 type To = GlobalLirPlan;
333
334 fn optimize(&mut self, plan: GlobalMirPlan<Resolved>) -> Result<Self::To, OptimizerError> {
335 let time = Instant::now();
336
337 let GlobalMirPlan {
338 mut df_desc,
339 df_meta,
340 phantom: _,
341 } = plan;
342
343 for build in df_desc.objects_to_build.iter_mut() {
345 normalize_lets(&mut build.plan.0, &self.config.features)?
346 }
347
348 let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
352
353 self.duration += time.elapsed();
354 self.metrics
355 .observe_e2e_optimization_time("subscribe", self.duration);
356
357 Ok(GlobalLirPlan { df_desc, df_meta })
359 }
360}
361
362impl GlobalLirPlan {
363 pub fn unapply(self) -> (LirDataflowDescription, DataflowMetainfo) {
365 (self.df_desc, self.df_meta)
366 }
367}