1use std::marker::PhantomData;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use differential_dataflow::lattice::Lattice;
17use mz_adapter_types::connection::ConnectionId;
18use mz_compute_types::ComputeInstanceId;
19use mz_compute_types::plan::Plan;
20use mz_compute_types::sinks::{ComputeSinkConnection, ComputeSinkDesc, SubscribeSinkConnection};
21use mz_ore::collections::CollectionExt;
22use mz_ore::soft_assert_or_log;
23use mz_repr::{GlobalId, RelationDesc, Timestamp};
24use mz_sql::optimizer_metrics::OptimizerMetrics;
25use mz_sql::plan::SubscribeFrom;
26use mz_transform::TransformCtx;
27use mz_transform::dataflow::DataflowMetainfo;
28use mz_transform::normalize_lets::normalize_lets;
29use mz_transform::typecheck::{SharedContext as TypecheckContext, empty_context};
30use timely::progress::Antichain;
31
32use crate::CollectionIdBundle;
33use crate::optimize::dataflows::{
34 ComputeInstanceSnapshot, DataflowBuilder, ExprPrepStyle, dataflow_import_id_bundle,
35 prep_relation_expr, prep_scalar_expr,
36};
37use crate::optimize::{
38 LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerCatalog,
39 OptimizerConfig, OptimizerError, optimize_mir_local, trace_plan,
40};
41
42pub struct Optimizer {
43 typecheck_ctx: TypecheckContext,
45 catalog: Arc<dyn OptimizerCatalog>,
47 compute_instance: ComputeInstanceSnapshot,
49 sink_id: GlobalId,
51 view_id: GlobalId,
54 conn_id: Option<ConnectionId>,
56 with_snapshot: bool,
58 up_to: Option<Timestamp>,
60 debug_name: String,
62 config: OptimizerConfig,
64 metrics: OptimizerMetrics,
66 duration: Duration,
68}
69
70impl std::fmt::Debug for Optimizer {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 f.debug_struct("Optimizer")
78 .field("config", &self.config)
79 .finish_non_exhaustive()
80 }
81}
82
83impl Optimizer {
84 pub fn new(
85 catalog: Arc<dyn OptimizerCatalog>,
86 compute_instance: ComputeInstanceSnapshot,
87 view_id: GlobalId,
88 sink_id: GlobalId,
89 conn_id: Option<ConnectionId>,
90 with_snapshot: bool,
91 up_to: Option<Timestamp>,
92 debug_name: String,
93 config: OptimizerConfig,
94 metrics: OptimizerMetrics,
95 ) -> Self {
96 Self {
97 typecheck_ctx: empty_context(),
98 catalog,
99 compute_instance,
100 view_id,
101 sink_id,
102 conn_id,
103 with_snapshot,
104 up_to,
105 debug_name,
106 config,
107 metrics,
108 duration: Default::default(),
109 }
110 }
111
112 pub fn cluster_id(&self) -> ComputeInstanceId {
113 self.compute_instance.instance_id()
114 }
115
116 pub fn up_to(&self) -> Option<Timestamp> {
117 self.up_to.clone()
118 }
119}
120
121#[derive(Clone, Debug)]
127pub struct GlobalMirPlan<T: Clone> {
128 df_desc: MirDataflowDescription,
129 df_meta: DataflowMetainfo,
130 phantom: PhantomData<T>,
131}
132
133impl<T: Clone> GlobalMirPlan<T> {
134 pub fn id_bundle(&self, compute_instance_id: ComputeInstanceId) -> CollectionIdBundle {
136 dataflow_import_id_bundle(&self.df_desc, compute_instance_id)
137 }
138}
139
140#[derive(Clone, Debug)]
143pub struct GlobalLirPlan {
144 df_desc: LirDataflowDescription,
145 df_meta: DataflowMetainfo,
146}
147
148impl GlobalLirPlan {
149 pub fn sink_id(&self) -> GlobalId {
150 let sink_exports = &self.df_desc.sink_exports;
151 let sink_id = sink_exports.keys().next().expect("valid sink");
152 *sink_id
153 }
154
155 pub fn as_of(&self) -> Option<Timestamp> {
156 self.df_desc.as_of.clone().map(|as_of| as_of.into_element())
157 }
158
159 pub fn sink_desc(&self) -> &ComputeSinkDesc {
160 let sink_exports = &self.df_desc.sink_exports;
161 let sink_desc = sink_exports.values().next().expect("valid sink");
162 sink_desc
163 }
164}
165
166#[derive(Clone, Debug)]
169pub struct Unresolved;
170
171#[derive(Clone, Debug)]
177pub struct Resolved;
178
179impl Optimize<SubscribeFrom> for Optimizer {
180 type To = GlobalMirPlan<Unresolved>;
181
182 fn optimize(&mut self, plan: SubscribeFrom) -> Result<Self::To, OptimizerError> {
183 let time = Instant::now();
184
185 let mut df_builder = {
186 let compute = self.compute_instance.clone();
187 DataflowBuilder::new(&*self.catalog, compute).with_config(&self.config)
188 };
189 let mut df_desc = MirDataflowDescription::new(self.debug_name.clone());
190 let mut df_meta = DataflowMetainfo::default();
191
192 match plan {
193 SubscribeFrom::Id(from_id) => {
194 let from = self.catalog.get_entry(&from_id);
195 let from_desc = from
196 .desc(
197 &self
198 .catalog
199 .resolve_full_name(from.name(), self.conn_id.as_ref()),
200 )
201 .expect("subscribes can only be run on items with descs")
202 .into_owned();
203
204 df_builder.import_into_dataflow(&from_id, &mut df_desc, &self.config.features)?;
205 df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
206
207 let sink_description = ComputeSinkDesc {
209 from: from_id,
210 from_desc,
211 connection: ComputeSinkConnection::Subscribe(SubscribeSinkConnection::default()),
212 with_snapshot: self.with_snapshot,
213 up_to: self.up_to.map(Antichain::from_elem).unwrap_or_default(),
214 non_null_assertions: vec![],
216 refresh_schedule: None,
218 };
219 df_desc.export_sink(self.sink_id, sink_description);
220 }
221 SubscribeFrom::Query { expr, desc } => {
222 let mut transform_ctx = TransformCtx::local(
230 &self.config.features,
231 &self.typecheck_ctx,
232 &mut df_meta,
233 Some(&self.metrics),
234 Some(self.view_id),
235 );
236 let expr = optimize_mir_local(expr, &mut transform_ctx)?;
237
238 df_builder.import_view_into_dataflow(
239 &self.view_id,
240 &expr,
241 &mut df_desc,
242 &self.config.features,
243 )?;
244 df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
245
246 let sink_description = ComputeSinkDesc {
248 from: self.view_id,
249 from_desc: RelationDesc::new(expr.typ(), desc.iter_names()),
250 connection: ComputeSinkConnection::Subscribe(SubscribeSinkConnection::default()),
251 with_snapshot: self.with_snapshot,
252 up_to: self.up_to.map(Antichain::from_elem).unwrap_or_default(),
253 non_null_assertions: vec![],
255 refresh_schedule: None,
257 };
258 df_desc.export_sink(self.sink_id, sink_description);
259 }
260 };
261
262 let style = ExprPrepStyle::Maintained;
264 df_desc.visit_children(
265 |r| prep_relation_expr(r, style),
266 |s| prep_scalar_expr(s, style),
267 )?;
268
269 let mut transform_ctx = TransformCtx::global(
271 &df_builder,
272 &mz_transform::EmptyStatisticsOracle, &self.config.features,
274 &self.typecheck_ctx,
275 &mut df_meta,
276 Some(&self.metrics),
277 );
278 mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
280
281 if self.config.mode == OptimizeMode::Explain {
282 trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
284 }
285
286 self.duration += time.elapsed();
287
288 Ok(GlobalMirPlan {
290 df_desc,
291 df_meta,
292 phantom: PhantomData::<Unresolved>,
293 })
294 }
295}
296
297impl GlobalMirPlan<Unresolved> {
298 pub fn resolve(mut self, as_of: Antichain<Timestamp>) -> GlobalMirPlan<Resolved> {
304 soft_assert_or_log!(
307 self.df_desc.index_exports.is_empty(),
308 "unexpectedly setting until for a DataflowDescription with an index",
309 );
310
311 self.df_desc.set_as_of(as_of);
313
314 self.df_desc.until = Antichain::from_elem(Timestamp::MIN);
318 for (_, sink) in &self.df_desc.sink_exports {
319 self.df_desc.until.join_assign(&sink.up_to);
320 }
321
322 GlobalMirPlan {
323 df_desc: self.df_desc,
324 df_meta: self.df_meta,
325 phantom: PhantomData::<Resolved>,
326 }
327 }
328}
329
330impl Optimize<GlobalMirPlan<Resolved>> for Optimizer {
331 type To = GlobalLirPlan;
332
333 fn optimize(&mut self, plan: GlobalMirPlan<Resolved>) -> Result<Self::To, OptimizerError> {
334 let time = Instant::now();
335
336 let GlobalMirPlan {
337 mut df_desc,
338 df_meta,
339 phantom: _,
340 } = plan;
341
342 for build in df_desc.objects_to_build.iter_mut() {
344 normalize_lets(&mut build.plan.0, &self.config.features)?
345 }
346
347 let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
351
352 self.duration += time.elapsed();
353 self.metrics
354 .observe_e2e_optimization_time("subscribe", self.duration);
355
356 Ok(GlobalLirPlan { df_desc, df_meta })
358 }
359}
360
361impl GlobalLirPlan {
362 pub fn unapply(self) -> (LirDataflowDescription, DataflowMetainfo) {
364 (self.df_desc, self.df_meta)
365 }
366}