1use std::marker::PhantomData;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use differential_dataflow::lattice::Lattice;
17use mz_adapter_types::connection::ConnectionId;
18use mz_compute_types::ComputeInstanceId;
19use mz_compute_types::plan::Plan;
20use mz_compute_types::sinks::{ComputeSinkConnection, ComputeSinkDesc, SubscribeSinkConnection};
21use mz_ore::collections::CollectionExt;
22use mz_ore::soft_assert_or_log;
23use mz_repr::{GlobalId, RelationDesc, Timestamp};
24use mz_sql::optimizer_metrics::OptimizerMetrics;
25use mz_sql::plan::SubscribeFrom;
26use mz_transform::TransformCtx;
27use mz_transform::dataflow::DataflowMetainfo;
28use mz_transform::normalize_lets::normalize_lets;
29use mz_transform::reprtypecheck::{
30 SharedContext as ReprTypecheckContext, empty_context as empty_repr_context,
31};
32use mz_transform::typecheck::{SharedContext as TypecheckContext, empty_context};
33use timely::progress::Antichain;
34
35use crate::CollectionIdBundle;
36use crate::optimize::dataflows::{
37 ComputeInstanceSnapshot, DataflowBuilder, ExprPrepStyle, dataflow_import_id_bundle,
38 prep_relation_expr, prep_scalar_expr,
39};
40use crate::optimize::{
41 LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerCatalog,
42 OptimizerConfig, OptimizerError, optimize_mir_local, trace_plan,
43};
44
45pub struct Optimizer {
46 typecheck_ctx: TypecheckContext,
48 repr_typecheck_ctx: ReprTypecheckContext,
50 catalog: Arc<dyn OptimizerCatalog>,
52 compute_instance: ComputeInstanceSnapshot,
54 sink_id: GlobalId,
56 view_id: GlobalId,
59 conn_id: Option<ConnectionId>,
61 with_snapshot: bool,
63 up_to: Option<Timestamp>,
65 debug_name: String,
67 config: OptimizerConfig,
69 metrics: OptimizerMetrics,
71 duration: Duration,
73}
74
75impl std::fmt::Debug for Optimizer {
81 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82 f.debug_struct("Optimizer")
83 .field("config", &self.config)
84 .finish_non_exhaustive()
85 }
86}
87
88impl Optimizer {
89 pub fn new(
90 catalog: Arc<dyn OptimizerCatalog>,
91 compute_instance: ComputeInstanceSnapshot,
92 view_id: GlobalId,
93 sink_id: GlobalId,
94 conn_id: Option<ConnectionId>,
95 with_snapshot: bool,
96 up_to: Option<Timestamp>,
97 debug_name: String,
98 config: OptimizerConfig,
99 metrics: OptimizerMetrics,
100 ) -> Self {
101 Self {
102 typecheck_ctx: empty_context(),
103 repr_typecheck_ctx: empty_repr_context(),
104 catalog,
105 compute_instance,
106 view_id,
107 sink_id,
108 conn_id,
109 with_snapshot,
110 up_to,
111 debug_name,
112 config,
113 metrics,
114 duration: Default::default(),
115 }
116 }
117
118 pub fn cluster_id(&self) -> ComputeInstanceId {
119 self.compute_instance.instance_id()
120 }
121
122 pub fn up_to(&self) -> Option<Timestamp> {
123 self.up_to.clone()
124 }
125}
126
127#[derive(Clone, Debug)]
133pub struct GlobalMirPlan<T: Clone> {
134 df_desc: MirDataflowDescription,
135 df_meta: DataflowMetainfo,
136 phantom: PhantomData<T>,
137}
138
139impl<T: Clone> GlobalMirPlan<T> {
140 pub fn id_bundle(&self, compute_instance_id: ComputeInstanceId) -> CollectionIdBundle {
142 dataflow_import_id_bundle(&self.df_desc, compute_instance_id)
143 }
144}
145
146#[derive(Clone, Debug)]
149pub struct GlobalLirPlan {
150 df_desc: LirDataflowDescription,
151 df_meta: DataflowMetainfo,
152}
153
154impl GlobalLirPlan {
155 pub fn sink_id(&self) -> GlobalId {
161 self.df_desc.sink_id()
162 }
163
164 pub fn as_of(&self) -> Option<Timestamp> {
165 self.df_desc.as_of.clone().map(|as_of| as_of.into_element())
166 }
167
168 pub fn sink_desc(&self) -> &ComputeSinkDesc {
174 let sink_exports = &self.df_desc.sink_exports;
175 let sink_desc = sink_exports.values().into_element();
176 sink_desc
177 }
178}
179
180#[derive(Clone, Debug)]
183pub struct Unresolved;
184
185#[derive(Clone, Debug)]
191pub struct Resolved;
192
193impl Optimize<SubscribeFrom> for Optimizer {
194 type To = GlobalMirPlan<Unresolved>;
195
196 fn optimize(&mut self, plan: SubscribeFrom) -> Result<Self::To, OptimizerError> {
197 let time = Instant::now();
198
199 let mut df_builder = {
200 let compute = self.compute_instance.clone();
201 DataflowBuilder::new(&*self.catalog, compute).with_config(&self.config)
202 };
203 let mut df_desc = MirDataflowDescription::new(self.debug_name.clone());
204 let mut df_meta = DataflowMetainfo::default();
205
206 match plan {
207 SubscribeFrom::Id(from_id) => {
208 let from = self.catalog.get_entry(&from_id);
209 let from_desc = from
210 .desc(
211 &self
212 .catalog
213 .resolve_full_name(from.name(), self.conn_id.as_ref()),
214 )
215 .expect("subscribes can only be run on items with descs")
216 .into_owned();
217
218 df_builder.import_into_dataflow(&from_id, &mut df_desc, &self.config.features)?;
219 df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
220
221 let sink_description = ComputeSinkDesc {
223 from: from_id,
224 from_desc,
225 connection: ComputeSinkConnection::Subscribe(SubscribeSinkConnection::default()),
226 with_snapshot: self.with_snapshot,
227 up_to: self.up_to.map(Antichain::from_elem).unwrap_or_default(),
228 non_null_assertions: vec![],
230 refresh_schedule: None,
232 };
233 df_desc.export_sink(self.sink_id, sink_description);
234 }
235 SubscribeFrom::Query { expr, desc } => {
236 let mut transform_ctx = TransformCtx::local(
244 &self.config.features,
245 &self.typecheck_ctx,
246 &self.repr_typecheck_ctx,
247 &mut df_meta,
248 Some(&mut self.metrics),
249 Some(self.view_id),
250 );
251 let expr = optimize_mir_local(expr, &mut transform_ctx)?;
252
253 df_builder.import_view_into_dataflow(
254 &self.view_id,
255 &expr,
256 &mut df_desc,
257 &self.config.features,
258 )?;
259 df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
260
261 let sink_description = ComputeSinkDesc {
263 from: self.view_id,
264 from_desc: RelationDesc::new(expr.typ(), desc.iter_names()),
265 connection: ComputeSinkConnection::Subscribe(SubscribeSinkConnection::default()),
266 with_snapshot: self.with_snapshot,
267 up_to: self.up_to.map(Antichain::from_elem).unwrap_or_default(),
268 non_null_assertions: vec![],
270 refresh_schedule: None,
272 };
273 df_desc.export_sink(self.sink_id, sink_description);
274 }
275 };
276
277 let style = ExprPrepStyle::Maintained;
279 df_desc.visit_children(
280 |r| prep_relation_expr(r, style),
281 |s| prep_scalar_expr(s, style),
282 )?;
283
284 let mut transform_ctx = TransformCtx::global(
286 &df_builder,
287 &mz_transform::EmptyStatisticsOracle, &self.config.features,
289 &self.typecheck_ctx,
290 &self.repr_typecheck_ctx,
291 &mut df_meta,
292 Some(&mut self.metrics),
293 );
294 mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
296
297 if self.config.mode == OptimizeMode::Explain {
298 trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
300 }
301
302 self.duration += time.elapsed();
303
304 Ok(GlobalMirPlan {
306 df_desc,
307 df_meta,
308 phantom: PhantomData::<Unresolved>,
309 })
310 }
311}
312
313impl GlobalMirPlan<Unresolved> {
314 pub fn resolve(mut self, as_of: Antichain<Timestamp>) -> GlobalMirPlan<Resolved> {
320 soft_assert_or_log!(
323 self.df_desc.index_exports.is_empty(),
324 "unexpectedly setting until for a DataflowDescription with an index",
325 );
326
327 self.df_desc.set_as_of(as_of);
329
330 self.df_desc.until = Antichain::from_elem(Timestamp::MIN);
334 for (_, sink) in &self.df_desc.sink_exports {
335 self.df_desc.until.join_assign(&sink.up_to);
336 }
337
338 GlobalMirPlan {
339 df_desc: self.df_desc,
340 df_meta: self.df_meta,
341 phantom: PhantomData::<Resolved>,
342 }
343 }
344}
345
346impl Optimize<GlobalMirPlan<Resolved>> for Optimizer {
347 type To = GlobalLirPlan;
348
349 fn optimize(&mut self, plan: GlobalMirPlan<Resolved>) -> Result<Self::To, OptimizerError> {
350 let time = Instant::now();
351
352 let GlobalMirPlan {
353 mut df_desc,
354 df_meta,
355 phantom: _,
356 } = plan;
357
358 for build in df_desc.objects_to_build.iter_mut() {
360 normalize_lets(&mut build.plan.0, &self.config.features)?
361 }
362
363 let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
367
368 self.duration += time.elapsed();
369 self.metrics
370 .observe_e2e_optimization_time("subscribe", self.duration);
371
372 Ok(GlobalLirPlan { df_desc, df_meta })
374 }
375}
376
377impl GlobalLirPlan {
378 pub fn unapply(self) -> (LirDataflowDescription, DataflowMetainfo) {
380 (self.df_desc, self.df_meta)
381 }
382}