1use std::fmt::Debug;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use mz_compute_types::ComputeInstanceId;
17use mz_compute_types::plan::Plan;
18use mz_compute_types::sinks::{
19 ComputeSinkConnection, ComputeSinkDesc, CopyToS3OneshotSinkConnection,
20};
21use mz_expr::{MirRelationExpr, OptimizedMirRelationExpr};
22use mz_repr::explain::trace_plan;
23use mz_repr::{GlobalId, Timestamp};
24use mz_sql::optimizer_metrics::OptimizerMetrics;
25use mz_sql::plan::HirRelationExpr;
26use mz_sql::session::metadata::SessionMetadata;
27use mz_storage_types::connections::Connection;
28use mz_storage_types::sinks::S3UploadInfo;
29use mz_transform::dataflow::DataflowMetainfo;
30use mz_transform::normalize_lets::normalize_lets;
31use mz_transform::reprtypecheck::{
32 SharedContext as ReprTypecheckContext, empty_context as empty_repr_context,
33};
34use mz_transform::{StatisticsOracle, TransformCtx};
35use timely::progress::Antichain;
36use tracing::warn;
37
38use crate::TimestampContext;
39use crate::catalog::Catalog;
40use crate::coord::CopyToContext;
41use crate::optimize::dataflows::{
42 ComputeInstanceSnapshot, DataflowBuilder, EvalTime, ExprPrepStyle, prep_relation_expr,
43 prep_scalar_expr,
44};
45use crate::optimize::{
46 LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerConfig,
47 OptimizerError, optimize_mir_local, trace_plan,
48};
49
50pub struct Optimizer {
51 repr_typecheck_ctx: ReprTypecheckContext,
53 catalog: Arc<Catalog>,
55 compute_instance: ComputeInstanceSnapshot,
57 select_id: GlobalId,
59 copy_to_context: CopyToContext,
61 config: OptimizerConfig,
63 metrics: OptimizerMetrics,
65 duration: Duration,
67}
68
69impl Optimizer {
70 pub fn new(
71 catalog: Arc<Catalog>,
72 compute_instance: ComputeInstanceSnapshot,
73 select_id: GlobalId,
74 copy_to_context: CopyToContext,
75 config: OptimizerConfig,
76 metrics: OptimizerMetrics,
77 ) -> Self {
78 Self {
79 repr_typecheck_ctx: empty_repr_context(),
80 catalog,
81 compute_instance,
82 select_id,
83 copy_to_context,
84 config,
85 metrics,
86 duration: Default::default(),
87 }
88 }
89
90 pub fn cluster_id(&self) -> ComputeInstanceId {
91 self.compute_instance.instance_id()
92 }
93}
94
95impl Debug for Optimizer {
101 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102 f.debug_struct("OptimizePeek")
103 .field("config", &self.config)
104 .finish_non_exhaustive()
105 }
106}
107
108pub struct Unresolved;
111
112#[derive(Clone)]
115pub struct LocalMirPlan<T = Unresolved> {
116 expr: MirRelationExpr,
117 df_meta: DataflowMetainfo,
118 context: T,
119}
120
121pub struct Resolved<'s> {
124 timestamp_ctx: TimestampContext<Timestamp>,
125 stats: Box<dyn StatisticsOracle>,
126 session: &'s dyn SessionMetadata,
127}
128
129#[derive(Debug)]
138pub struct GlobalLirPlan {
139 df_desc: LirDataflowDescription,
140 df_meta: DataflowMetainfo,
141}
142
143impl GlobalLirPlan {
144 pub fn df_desc(&self) -> &LirDataflowDescription {
145 &self.df_desc
146 }
147
148 pub fn sink_id(&self) -> GlobalId {
154 self.df_desc.sink_id()
155 }
156}
157
158impl Optimize<HirRelationExpr> for Optimizer {
159 type To = LocalMirPlan;
160
161 fn optimize(&mut self, expr: HirRelationExpr) -> Result<Self::To, OptimizerError> {
162 let time = Instant::now();
163
164 trace_plan!(at: "raw", &expr);
166
167 let expr = expr.lower(&self.config, Some(&self.metrics))?;
169
170 let mut df_meta = DataflowMetainfo::default();
172 let mut transform_ctx = TransformCtx::local(
173 &self.config.features,
174 &self.repr_typecheck_ctx,
175 &mut df_meta,
176 Some(&mut self.metrics),
177 Some(self.select_id),
178 );
179 let expr = optimize_mir_local(expr, &mut transform_ctx)?.into_inner();
180
181 self.duration += time.elapsed();
182
183 Ok(LocalMirPlan {
185 expr,
186 df_meta,
187 context: Unresolved,
188 })
189 }
190}
191
192impl LocalMirPlan<Unresolved> {
193 pub fn resolve(
196 self,
197 timestamp_ctx: TimestampContext<Timestamp>,
198 session: &dyn SessionMetadata,
199 stats: Box<dyn StatisticsOracle>,
200 ) -> LocalMirPlan<Resolved<'_>> {
201 LocalMirPlan {
202 expr: self.expr,
203 df_meta: self.df_meta,
204 context: Resolved {
205 timestamp_ctx,
206 session,
207 stats,
208 },
209 }
210 }
211}
212
213impl<'s> Optimize<LocalMirPlan<Resolved<'s>>> for Optimizer {
214 type To = GlobalLirPlan;
215
216 fn optimize(&mut self, plan: LocalMirPlan<Resolved<'s>>) -> Result<Self::To, OptimizerError> {
217 let time = Instant::now();
218
219 let LocalMirPlan {
220 expr,
221 mut df_meta,
222 context:
223 Resolved {
224 timestamp_ctx,
225 stats,
226 session,
227 },
228 } = plan;
229
230 let expr = OptimizedMirRelationExpr(expr);
231
232 let mut df_builder = {
234 let catalog = self.catalog.state();
235 let compute = self.compute_instance.clone();
236 DataflowBuilder::new(catalog, compute).with_config(&self.config)
237 };
238
239 let debug_name = format!("copy-to-{}", self.select_id);
240 let mut df_desc = MirDataflowDescription::new(debug_name.to_string());
241
242 df_builder.import_view_into_dataflow(
243 &self.select_id,
244 &expr,
245 &mut df_desc,
246 &self.config.features,
247 )?;
248 df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
249
250 let connection = match &self.copy_to_context.connection {
254 Connection::Aws(aws_connection) => {
255 ComputeSinkConnection::CopyToS3Oneshot(CopyToS3OneshotSinkConnection {
256 upload_info: S3UploadInfo {
257 uri: self.copy_to_context.uri.to_string(),
258 max_file_size: self.copy_to_context.max_file_size,
259 desc: self.copy_to_context.desc.clone(),
260 format: self.copy_to_context.format.clone(),
261 },
262 aws_connection: aws_connection.clone(),
263 connection_id: self.copy_to_context.connection_id,
264 output_batch_count: self
265 .copy_to_context
266 .output_batch_count
267 .expect("output_batch_count should be set in sequencer"),
268 })
269 }
270 _ => {
271 let msg = "only aws connection is supported in COPY TO";
274 return Err(OptimizerError::Internal(msg.to_string()));
275 }
276 };
277 let sink_description = ComputeSinkDesc {
278 from_desc: self.copy_to_context.desc.clone(),
279 from: self.select_id,
280 connection,
281 with_snapshot: true,
282 up_to: Default::default(),
284 non_null_assertions: Vec::new(),
286 refresh_schedule: None,
288 };
289 df_desc.export_sink(self.select_id, sink_description);
290
291 let style = ExprPrepStyle::OneShot {
296 logical_time: EvalTime::Deferred,
297 session,
298 catalog_state: self.catalog.state(),
299 };
300 df_desc.visit_children(
301 |r| prep_relation_expr(r, style),
302 |s| prep_scalar_expr(s, style),
303 )?;
304
305 df_desc.set_as_of(timestamp_ctx.antichain());
307
308 let as_of = df_desc
310 .as_of
311 .clone()
312 .expect("as_of antichain")
313 .into_option()
314 .expect("unique as_of element");
315
316 let style = ExprPrepStyle::OneShot {
318 logical_time: EvalTime::Time(as_of),
319 session,
320 catalog_state: self.catalog.state(),
321 };
322 df_desc.visit_children(
323 |r| prep_relation_expr(r, style),
324 |s| prep_scalar_expr(s, style),
325 )?;
326
327 if let Some(as_of) = timestamp_ctx.timestamp() {
335 if let Some(until) = as_of.checked_add(1) {
336 df_desc.until = Antichain::from_elem(until);
337 for (_, sink) in &mut df_desc.sink_exports {
339 sink.up_to.clone_from(&df_desc.until);
340 }
341 } else {
342 warn!(as_of = %as_of, "as_of + 1 overflow");
343 }
344 }
345
346 let mut transform_ctx = TransformCtx::global(
348 &df_builder,
349 &*stats,
350 &self.config.features,
351 &self.repr_typecheck_ctx,
352 &mut df_meta,
353 Some(&mut self.metrics),
354 );
355 mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
357
358 if self.config.mode == OptimizeMode::Explain {
359 trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
361 }
362
363 for build in df_desc.objects_to_build.iter_mut() {
365 normalize_lets(&mut build.plan.0, &self.config.features)?
366 }
367
368 let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
372
373 trace_plan(&df_desc);
375
376 self.duration += time.elapsed();
377 self.metrics
378 .observe_e2e_optimization_time("copy_to", self.duration);
379
380 Ok(GlobalLirPlan { df_desc, df_meta })
381 }
382}
383
384impl GlobalLirPlan {
385 pub fn unapply(self) -> (LirDataflowDescription, DataflowMetainfo) {
387 (self.df_desc, self.df_meta)
388 }
389}