1use std::fmt::Debug;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use mz_compute_types::ComputeInstanceId;
17use mz_compute_types::plan::Plan;
18use mz_compute_types::sinks::{
19 ComputeSinkConnection, ComputeSinkDesc, CopyToS3OneshotSinkConnection,
20};
21use mz_expr::{MirRelationExpr, OptimizedMirRelationExpr};
22use mz_repr::explain::trace_plan;
23use mz_repr::{GlobalId, Timestamp};
24use mz_sql::optimizer_metrics::OptimizerMetrics;
25use mz_sql::plan::HirRelationExpr;
26use mz_sql::session::metadata::SessionMetadata;
27use mz_storage_types::connections::Connection;
28use mz_storage_types::sinks::S3UploadInfo;
29use mz_transform::dataflow::DataflowMetainfo;
30use mz_transform::normalize_lets::normalize_lets;
31use mz_transform::typecheck::{SharedContext as TypecheckContext, empty_context};
32use mz_transform::{StatisticsOracle, TransformCtx};
33use timely::progress::Antichain;
34use tracing::warn;
35
36use crate::TimestampContext;
37use crate::catalog::Catalog;
38use crate::coord::CopyToContext;
39use crate::optimize::dataflows::{
40 ComputeInstanceSnapshot, DataflowBuilder, EvalTime, ExprPrepStyle, prep_relation_expr,
41 prep_scalar_expr,
42};
43use crate::optimize::{
44 LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerConfig,
45 OptimizerError, optimize_mir_local, trace_plan,
46};
47
48pub struct Optimizer {
49 typecheck_ctx: TypecheckContext,
51 catalog: Arc<Catalog>,
53 compute_instance: ComputeInstanceSnapshot,
55 select_id: GlobalId,
57 copy_to_context: CopyToContext,
59 config: OptimizerConfig,
61 metrics: OptimizerMetrics,
63 duration: Duration,
65}
66
67impl Optimizer {
68 pub fn new(
69 catalog: Arc<Catalog>,
70 compute_instance: ComputeInstanceSnapshot,
71 select_id: GlobalId,
72 copy_to_context: CopyToContext,
73 config: OptimizerConfig,
74 metrics: OptimizerMetrics,
75 ) -> Self {
76 Self {
77 typecheck_ctx: empty_context(),
78 catalog,
79 compute_instance,
80 select_id,
81 copy_to_context,
82 config,
83 metrics,
84 duration: Default::default(),
85 }
86 }
87
88 pub fn cluster_id(&self) -> ComputeInstanceId {
89 self.compute_instance.instance_id()
90 }
91}
92
93impl Debug for Optimizer {
99 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 f.debug_struct("OptimizePeek")
101 .field("config", &self.config)
102 .finish_non_exhaustive()
103 }
104}
105
106pub struct Unresolved;
109
110#[derive(Clone)]
113pub struct LocalMirPlan<T = Unresolved> {
114 expr: MirRelationExpr,
115 df_meta: DataflowMetainfo,
116 context: T,
117}
118
119pub struct Resolved<'s> {
122 timestamp_ctx: TimestampContext<Timestamp>,
123 stats: Box<dyn StatisticsOracle>,
124 session: &'s dyn SessionMetadata,
125}
126
127#[derive(Debug)]
136pub struct GlobalLirPlan {
137 df_desc: LirDataflowDescription,
138 df_meta: DataflowMetainfo,
139}
140
141impl GlobalLirPlan {
142 pub fn df_desc(&self) -> &LirDataflowDescription {
143 &self.df_desc
144 }
145
146 pub fn sink_id(&self) -> GlobalId {
147 let sink_exports = &self.df_desc.sink_exports;
148 let sink_id = sink_exports.keys().next().expect("valid sink");
149 *sink_id
150 }
151}
152
153impl Optimize<HirRelationExpr> for Optimizer {
154 type To = LocalMirPlan;
155
156 fn optimize(&mut self, expr: HirRelationExpr) -> Result<Self::To, OptimizerError> {
157 let time = Instant::now();
158
159 trace_plan!(at: "raw", &expr);
161
162 let expr = expr.lower(&self.config, Some(&self.metrics))?;
164
165 let mut df_meta = DataflowMetainfo::default();
167 let mut transform_ctx = TransformCtx::local(
168 &self.config.features,
169 &self.typecheck_ctx,
170 &mut df_meta,
171 Some(&self.metrics),
172 );
173 let expr = optimize_mir_local(expr, &mut transform_ctx)?.into_inner();
174
175 self.duration += time.elapsed();
176
177 Ok(LocalMirPlan {
179 expr,
180 df_meta,
181 context: Unresolved,
182 })
183 }
184}
185
186impl LocalMirPlan<Unresolved> {
187 pub fn resolve(
190 self,
191 timestamp_ctx: TimestampContext<Timestamp>,
192 session: &dyn SessionMetadata,
193 stats: Box<dyn StatisticsOracle>,
194 ) -> LocalMirPlan<Resolved> {
195 LocalMirPlan {
196 expr: self.expr,
197 df_meta: self.df_meta,
198 context: Resolved {
199 timestamp_ctx,
200 session,
201 stats,
202 },
203 }
204 }
205}
206
207impl<'s> Optimize<LocalMirPlan<Resolved<'s>>> for Optimizer {
208 type To = GlobalLirPlan;
209
210 fn optimize(&mut self, plan: LocalMirPlan<Resolved<'s>>) -> Result<Self::To, OptimizerError> {
211 let time = Instant::now();
212
213 let LocalMirPlan {
214 expr,
215 mut df_meta,
216 context:
217 Resolved {
218 timestamp_ctx,
219 stats,
220 session,
221 },
222 } = plan;
223
224 let expr = OptimizedMirRelationExpr(expr);
225
226 let mut df_builder = {
228 let catalog = self.catalog.state();
229 let compute = self.compute_instance.clone();
230 DataflowBuilder::new(catalog, compute).with_config(&self.config)
231 };
232
233 let debug_name = format!("copy-to-{}", self.select_id);
234 let mut df_desc = MirDataflowDescription::new(debug_name.to_string());
235
236 df_builder.import_view_into_dataflow(
237 &self.select_id,
238 &expr,
239 &mut df_desc,
240 &self.config.features,
241 )?;
242 df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
243
244 let connection = match &self.copy_to_context.connection {
248 Connection::Aws(aws_connection) => {
249 ComputeSinkConnection::CopyToS3Oneshot(CopyToS3OneshotSinkConnection {
250 upload_info: S3UploadInfo {
251 uri: self.copy_to_context.uri.to_string(),
252 max_file_size: self.copy_to_context.max_file_size,
253 desc: self.copy_to_context.desc.clone(),
254 format: self.copy_to_context.format.clone(),
255 },
256 aws_connection: aws_connection.clone(),
257 connection_id: self.copy_to_context.connection_id,
258 output_batch_count: self
259 .copy_to_context
260 .output_batch_count
261 .expect("output_batch_count should be set in sequencer"),
262 })
263 }
264 _ => {
265 let msg = "only aws connection is supported in COPY TO";
268 return Err(OptimizerError::Internal(msg.to_string()));
269 }
270 };
271 let sink_description = ComputeSinkDesc {
272 from_desc: self.copy_to_context.desc.clone(),
273 from: self.select_id,
274 connection,
275 with_snapshot: true,
276 up_to: Default::default(),
278 non_null_assertions: Vec::new(),
280 refresh_schedule: None,
282 };
283 df_desc.export_sink(self.select_id, sink_description);
284
285 let style = ExprPrepStyle::OneShot {
290 logical_time: EvalTime::Deferred,
291 session,
292 catalog_state: self.catalog.state(),
293 };
294 df_desc.visit_children(
295 |r| prep_relation_expr(r, style),
296 |s| prep_scalar_expr(s, style),
297 )?;
298
299 df_desc.set_as_of(timestamp_ctx.antichain());
301
302 if let Some(as_of) = timestamp_ctx.timestamp() {
310 if let Some(until) = as_of.checked_add(1) {
311 df_desc.until = Antichain::from_elem(until);
312 for (_, sink) in &mut df_desc.sink_exports {
314 sink.up_to.clone_from(&df_desc.until);
315 }
316 } else {
317 warn!(as_of = %as_of, "as_of + 1 overflow");
318 }
319 }
320
321 let mut transform_ctx = TransformCtx::global(
323 &df_builder,
324 &*stats,
325 &self.config.features,
326 &self.typecheck_ctx,
327 &mut df_meta,
328 Some(&self.metrics),
329 );
330 mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
332
333 if self.config.mode == OptimizeMode::Explain {
334 trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
336 }
337
338 let as_of = df_desc
340 .as_of
341 .clone()
342 .expect("as_of antichain")
343 .into_option()
344 .expect("unique as_of element");
345
346 let style = ExprPrepStyle::OneShot {
348 logical_time: EvalTime::Time(as_of),
349 session,
350 catalog_state: self.catalog.state(),
351 };
352 df_desc.visit_children(
353 |r| prep_relation_expr(r, style),
354 |s| prep_scalar_expr(s, style),
355 )?;
356
357 for build in df_desc.objects_to_build.iter_mut() {
359 normalize_lets(&mut build.plan.0, &self.config.features)?
360 }
361
362 let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
366
367 trace_plan(&df_desc);
369
370 self.duration += time.elapsed();
371 self.metrics
372 .observe_e2e_optimization_time("copy_to", self.duration);
373
374 Ok(GlobalLirPlan { df_desc, df_meta })
375 }
376}
377
378impl GlobalLirPlan {
379 pub fn unapply(self) -> (LirDataflowDescription, DataflowMetainfo) {
381 (self.df_desc, self.df_meta)
382 }
383}