1use std::fmt::Debug;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use mz_compute_types::ComputeInstanceId;
17use mz_compute_types::plan::Plan;
18use mz_compute_types::sinks::{
19 ComputeSinkConnection, ComputeSinkDesc, CopyToS3OneshotSinkConnection,
20};
21use mz_expr::{MirRelationExpr, OptimizedMirRelationExpr};
22use mz_repr::explain::trace_plan;
23use mz_repr::{GlobalId, Timestamp};
24use mz_sql::optimizer_metrics::OptimizerMetrics;
25use mz_sql::plan::HirRelationExpr;
26use mz_sql::session::metadata::SessionMetadata;
27use mz_storage_types::connections::Connection;
28use mz_storage_types::sinks::S3UploadInfo;
29use mz_transform::dataflow::DataflowMetainfo;
30use mz_transform::normalize_lets::normalize_lets;
31use mz_transform::reprtypecheck::{
32 SharedContext as ReprTypecheckContext, empty_context as empty_repr_context,
33};
34use mz_transform::{StatisticsOracle, TransformCtx};
35use timely::progress::Antichain;
36use tracing::warn;
37
38use crate::TimestampContext;
39use crate::catalog::Catalog;
40use crate::coord::CopyToContext;
41use crate::optimize::dataflows::{
42 ComputeInstanceSnapshot, DataflowBuilder, EvalTime, ExprPrep, ExprPrepOneShot,
43};
44use crate::optimize::{
45 LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerConfig,
46 OptimizerError, optimize_mir_local, trace_plan,
47};
48
49pub struct Optimizer {
50 repr_typecheck_ctx: ReprTypecheckContext,
52 catalog: Arc<Catalog>,
54 compute_instance: ComputeInstanceSnapshot,
56 select_id: GlobalId,
58 copy_to_context: CopyToContext,
60 config: OptimizerConfig,
62 metrics: OptimizerMetrics,
64 duration: Duration,
66}
67
68impl Optimizer {
69 pub fn new(
70 catalog: Arc<Catalog>,
71 compute_instance: ComputeInstanceSnapshot,
72 select_id: GlobalId,
73 copy_to_context: CopyToContext,
74 config: OptimizerConfig,
75 metrics: OptimizerMetrics,
76 ) -> Self {
77 Self {
78 repr_typecheck_ctx: empty_repr_context(),
79 catalog,
80 compute_instance,
81 select_id,
82 copy_to_context,
83 config,
84 metrics,
85 duration: Default::default(),
86 }
87 }
88
89 pub fn cluster_id(&self) -> ComputeInstanceId {
90 self.compute_instance.instance_id()
91 }
92}
93
94impl Debug for Optimizer {
100 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101 f.debug_struct("OptimizePeek")
102 .field("config", &self.config)
103 .finish_non_exhaustive()
104 }
105}
106
107pub struct Unresolved;
110
111#[derive(Clone)]
114pub struct LocalMirPlan<T = Unresolved> {
115 expr: MirRelationExpr,
116 df_meta: DataflowMetainfo,
117 context: T,
118}
119
120pub struct Resolved<'s> {
123 timestamp_ctx: TimestampContext<Timestamp>,
124 stats: Box<dyn StatisticsOracle>,
125 session: &'s dyn SessionMetadata,
126}
127
128#[derive(Debug)]
137pub struct GlobalLirPlan {
138 df_desc: LirDataflowDescription,
139 df_meta: DataflowMetainfo,
140}
141
142impl GlobalLirPlan {
143 pub fn df_desc(&self) -> &LirDataflowDescription {
144 &self.df_desc
145 }
146
147 pub fn sink_id(&self) -> GlobalId {
153 self.df_desc.sink_id()
154 }
155}
156
157impl Optimize<HirRelationExpr> for Optimizer {
158 type To = LocalMirPlan;
159
160 fn optimize(&mut self, expr: HirRelationExpr) -> Result<Self::To, OptimizerError> {
161 let time = Instant::now();
162
163 trace_plan!(at: "raw", &expr);
165
166 let expr = expr.lower(&self.config, Some(&self.metrics))?;
168
169 let mut df_meta = DataflowMetainfo::default();
171 let mut transform_ctx = TransformCtx::local(
172 &self.config.features,
173 &self.repr_typecheck_ctx,
174 &mut df_meta,
175 Some(&mut self.metrics),
176 Some(self.select_id),
177 );
178 let expr = optimize_mir_local(expr, &mut transform_ctx)?.into_inner();
179
180 self.duration += time.elapsed();
181
182 Ok(LocalMirPlan {
184 expr,
185 df_meta,
186 context: Unresolved,
187 })
188 }
189}
190
191impl LocalMirPlan<Unresolved> {
192 pub fn resolve(
195 self,
196 timestamp_ctx: TimestampContext<Timestamp>,
197 session: &dyn SessionMetadata,
198 stats: Box<dyn StatisticsOracle>,
199 ) -> LocalMirPlan<Resolved<'_>> {
200 LocalMirPlan {
201 expr: self.expr,
202 df_meta: self.df_meta,
203 context: Resolved {
204 timestamp_ctx,
205 session,
206 stats,
207 },
208 }
209 }
210}
211
212impl<'s> Optimize<LocalMirPlan<Resolved<'s>>> for Optimizer {
213 type To = GlobalLirPlan;
214
215 fn optimize(&mut self, plan: LocalMirPlan<Resolved<'s>>) -> Result<Self::To, OptimizerError> {
216 let time = Instant::now();
217
218 let LocalMirPlan {
219 expr,
220 mut df_meta,
221 context:
222 Resolved {
223 timestamp_ctx,
224 stats,
225 session,
226 },
227 } = plan;
228
229 let expr = OptimizedMirRelationExpr(expr);
230
231 let mut df_builder = {
233 let catalog = self.catalog.state();
234 let compute = self.compute_instance.clone();
235 DataflowBuilder::new(catalog, compute).with_config(&self.config)
236 };
237
238 let debug_name = format!("copy-to-{}", self.select_id);
239 let mut df_desc = MirDataflowDescription::new(debug_name.to_string());
240
241 df_builder.import_view_into_dataflow(
242 &self.select_id,
243 &expr,
244 &mut df_desc,
245 &self.config.features,
246 )?;
247 df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
248
249 let connection = match &self.copy_to_context.connection {
253 Connection::Aws(aws_connection) => {
254 ComputeSinkConnection::CopyToS3Oneshot(CopyToS3OneshotSinkConnection {
255 upload_info: S3UploadInfo {
256 uri: self.copy_to_context.uri.to_string(),
257 max_file_size: self.copy_to_context.max_file_size,
258 desc: self.copy_to_context.desc.clone(),
259 format: self.copy_to_context.format.clone(),
260 },
261 aws_connection: aws_connection.clone(),
262 connection_id: self.copy_to_context.connection_id,
263 output_batch_count: self
264 .copy_to_context
265 .output_batch_count
266 .expect("output_batch_count should be set in sequencer"),
267 })
268 }
269 _ => {
270 let msg = "only aws connection is supported in COPY TO";
273 return Err(OptimizerError::Internal(msg.to_string()));
274 }
275 };
276 let sink_description = ComputeSinkDesc {
277 from_desc: self.copy_to_context.desc.clone(),
278 from: self.select_id,
279 connection,
280 with_snapshot: true,
281 up_to: Default::default(),
283 non_null_assertions: Vec::new(),
285 refresh_schedule: None,
287 };
288 df_desc.export_sink(self.select_id, sink_description);
289
290 let style = ExprPrepOneShot {
295 logical_time: EvalTime::Deferred,
296 session,
297 catalog_state: self.catalog.state(),
298 };
299 df_desc.visit_children(
300 |r| style.prep_relation_expr(r),
301 |s| style.prep_scalar_expr(s),
302 )?;
303
304 df_desc.set_as_of(timestamp_ctx.antichain());
306
307 let as_of = df_desc
309 .as_of
310 .clone()
311 .expect("as_of antichain")
312 .into_option()
313 .expect("unique as_of element");
314
315 let style = ExprPrepOneShot {
317 logical_time: EvalTime::Time(as_of),
318 session,
319 catalog_state: self.catalog.state(),
320 };
321 df_desc.visit_children(
322 |r| style.prep_relation_expr(r),
323 |s| style.prep_scalar_expr(s),
324 )?;
325
326 if let Some(as_of) = timestamp_ctx.timestamp() {
334 if let Some(until) = as_of.checked_add(1) {
335 df_desc.until = Antichain::from_elem(until);
336 for (_, sink) in &mut df_desc.sink_exports {
338 sink.up_to.clone_from(&df_desc.until);
339 }
340 } else {
341 warn!(as_of = %as_of, "as_of + 1 overflow");
342 }
343 }
344
345 let mut transform_ctx = TransformCtx::global(
347 &df_builder,
348 &*stats,
349 &self.config.features,
350 &self.repr_typecheck_ctx,
351 &mut df_meta,
352 Some(&mut self.metrics),
353 );
354 mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
356
357 if self.config.mode == OptimizeMode::Explain {
358 trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
360 }
361
362 for build in df_desc.objects_to_build.iter_mut() {
364 normalize_lets(&mut build.plan.0, &self.config.features)?
365 }
366
367 let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
371
372 trace_plan(&df_desc);
374
375 self.duration += time.elapsed();
376 self.metrics
377 .observe_e2e_optimization_time("copy_to", self.duration);
378
379 Ok(GlobalLirPlan { df_desc, df_meta })
380 }
381}
382
383impl GlobalLirPlan {
384 pub fn unapply(self) -> (LirDataflowDescription, DataflowMetainfo) {
386 (self.df_desc, self.df_meta)
387 }
388}