1use std::fmt::Debug;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use mz_compute_types::ComputeInstanceId;
17use mz_compute_types::plan::Plan;
18use mz_compute_types::sinks::{
19 ComputeSinkConnection, ComputeSinkDesc, CopyToS3OneshotSinkConnection,
20};
21use mz_expr::{MirRelationExpr, OptimizedMirRelationExpr};
22use mz_repr::explain::trace_plan;
23use mz_repr::{GlobalId, Timestamp};
24use mz_sql::optimizer_metrics::OptimizerMetrics;
25use mz_sql::plan::HirRelationExpr;
26use mz_sql::session::metadata::SessionMetadata;
27use mz_storage_types::connections::Connection;
28use mz_storage_types::sinks::S3UploadInfo;
29use mz_transform::dataflow::DataflowMetainfo;
30use mz_transform::normalize_lets::normalize_lets;
31use mz_transform::typecheck::{SharedContext as TypecheckContext, empty_context};
32use mz_transform::{StatisticsOracle, TransformCtx};
33use timely::progress::Antichain;
34use tracing::warn;
35
36use crate::TimestampContext;
37use crate::catalog::Catalog;
38use crate::coord::CopyToContext;
39use crate::optimize::dataflows::{
40 ComputeInstanceSnapshot, DataflowBuilder, EvalTime, ExprPrepStyle, prep_relation_expr,
41 prep_scalar_expr,
42};
43use crate::optimize::{
44 LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerConfig,
45 OptimizerError, optimize_mir_local, trace_plan,
46};
47
48pub struct Optimizer {
49 typecheck_ctx: TypecheckContext,
51 catalog: Arc<Catalog>,
53 compute_instance: ComputeInstanceSnapshot,
55 select_id: GlobalId,
57 copy_to_context: CopyToContext,
59 config: OptimizerConfig,
61 metrics: OptimizerMetrics,
63 duration: Duration,
65}
66
67impl Optimizer {
68 pub fn new(
69 catalog: Arc<Catalog>,
70 compute_instance: ComputeInstanceSnapshot,
71 select_id: GlobalId,
72 copy_to_context: CopyToContext,
73 config: OptimizerConfig,
74 metrics: OptimizerMetrics,
75 ) -> Self {
76 Self {
77 typecheck_ctx: empty_context(),
78 catalog,
79 compute_instance,
80 select_id,
81 copy_to_context,
82 config,
83 metrics,
84 duration: Default::default(),
85 }
86 }
87
88 pub fn cluster_id(&self) -> ComputeInstanceId {
89 self.compute_instance.instance_id()
90 }
91}
92
93impl Debug for Optimizer {
99 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 f.debug_struct("OptimizePeek")
101 .field("config", &self.config)
102 .finish_non_exhaustive()
103 }
104}
105
106pub struct Unresolved;
109
110#[derive(Clone)]
113pub struct LocalMirPlan<T = Unresolved> {
114 expr: MirRelationExpr,
115 df_meta: DataflowMetainfo,
116 context: T,
117}
118
119pub struct Resolved<'s> {
122 timestamp_ctx: TimestampContext<Timestamp>,
123 stats: Box<dyn StatisticsOracle>,
124 session: &'s dyn SessionMetadata,
125}
126
127#[derive(Debug)]
136pub struct GlobalLirPlan {
137 df_desc: LirDataflowDescription,
138 df_meta: DataflowMetainfo,
139}
140
141impl GlobalLirPlan {
142 pub fn df_desc(&self) -> &LirDataflowDescription {
143 &self.df_desc
144 }
145
146 pub fn sink_id(&self) -> GlobalId {
147 let sink_exports = &self.df_desc.sink_exports;
148 let sink_id = sink_exports.keys().next().expect("valid sink");
149 *sink_id
150 }
151}
152
153impl Optimize<HirRelationExpr> for Optimizer {
154 type To = LocalMirPlan;
155
156 fn optimize(&mut self, expr: HirRelationExpr) -> Result<Self::To, OptimizerError> {
157 let time = Instant::now();
158
159 trace_plan!(at: "raw", &expr);
161
162 let expr = expr.lower(&self.config, Some(&self.metrics))?;
164
165 let mut df_meta = DataflowMetainfo::default();
167 let mut transform_ctx = TransformCtx::local(
168 &self.config.features,
169 &self.typecheck_ctx,
170 &mut df_meta,
171 Some(&self.metrics),
172 Some(self.select_id),
173 );
174 let expr = optimize_mir_local(expr, &mut transform_ctx)?.into_inner();
175
176 self.duration += time.elapsed();
177
178 Ok(LocalMirPlan {
180 expr,
181 df_meta,
182 context: Unresolved,
183 })
184 }
185}
186
187impl LocalMirPlan<Unresolved> {
188 pub fn resolve(
191 self,
192 timestamp_ctx: TimestampContext<Timestamp>,
193 session: &dyn SessionMetadata,
194 stats: Box<dyn StatisticsOracle>,
195 ) -> LocalMirPlan<Resolved<'_>> {
196 LocalMirPlan {
197 expr: self.expr,
198 df_meta: self.df_meta,
199 context: Resolved {
200 timestamp_ctx,
201 session,
202 stats,
203 },
204 }
205 }
206}
207
208impl<'s> Optimize<LocalMirPlan<Resolved<'s>>> for Optimizer {
209 type To = GlobalLirPlan;
210
211 fn optimize(&mut self, plan: LocalMirPlan<Resolved<'s>>) -> Result<Self::To, OptimizerError> {
212 let time = Instant::now();
213
214 let LocalMirPlan {
215 expr,
216 mut df_meta,
217 context:
218 Resolved {
219 timestamp_ctx,
220 stats,
221 session,
222 },
223 } = plan;
224
225 let expr = OptimizedMirRelationExpr(expr);
226
227 let mut df_builder = {
229 let catalog = self.catalog.state();
230 let compute = self.compute_instance.clone();
231 DataflowBuilder::new(catalog, compute).with_config(&self.config)
232 };
233
234 let debug_name = format!("copy-to-{}", self.select_id);
235 let mut df_desc = MirDataflowDescription::new(debug_name.to_string());
236
237 df_builder.import_view_into_dataflow(
238 &self.select_id,
239 &expr,
240 &mut df_desc,
241 &self.config.features,
242 )?;
243 df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
244
245 let connection = match &self.copy_to_context.connection {
249 Connection::Aws(aws_connection) => {
250 ComputeSinkConnection::CopyToS3Oneshot(CopyToS3OneshotSinkConnection {
251 upload_info: S3UploadInfo {
252 uri: self.copy_to_context.uri.to_string(),
253 max_file_size: self.copy_to_context.max_file_size,
254 desc: self.copy_to_context.desc.clone(),
255 format: self.copy_to_context.format.clone(),
256 },
257 aws_connection: aws_connection.clone(),
258 connection_id: self.copy_to_context.connection_id,
259 output_batch_count: self
260 .copy_to_context
261 .output_batch_count
262 .expect("output_batch_count should be set in sequencer"),
263 })
264 }
265 _ => {
266 let msg = "only aws connection is supported in COPY TO";
269 return Err(OptimizerError::Internal(msg.to_string()));
270 }
271 };
272 let sink_description = ComputeSinkDesc {
273 from_desc: self.copy_to_context.desc.clone(),
274 from: self.select_id,
275 connection,
276 with_snapshot: true,
277 up_to: Default::default(),
279 non_null_assertions: Vec::new(),
281 refresh_schedule: None,
283 };
284 df_desc.export_sink(self.select_id, sink_description);
285
286 let style = ExprPrepStyle::OneShot {
291 logical_time: EvalTime::Deferred,
292 session,
293 catalog_state: self.catalog.state(),
294 };
295 df_desc.visit_children(
296 |r| prep_relation_expr(r, style),
297 |s| prep_scalar_expr(s, style),
298 )?;
299
300 df_desc.set_as_of(timestamp_ctx.antichain());
302
303 if let Some(as_of) = timestamp_ctx.timestamp() {
311 if let Some(until) = as_of.checked_add(1) {
312 df_desc.until = Antichain::from_elem(until);
313 for (_, sink) in &mut df_desc.sink_exports {
315 sink.up_to.clone_from(&df_desc.until);
316 }
317 } else {
318 warn!(as_of = %as_of, "as_of + 1 overflow");
319 }
320 }
321
322 let mut transform_ctx = TransformCtx::global(
324 &df_builder,
325 &*stats,
326 &self.config.features,
327 &self.typecheck_ctx,
328 &mut df_meta,
329 Some(&self.metrics),
330 );
331 mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
333
334 if self.config.mode == OptimizeMode::Explain {
335 trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
337 }
338
339 let as_of = df_desc
341 .as_of
342 .clone()
343 .expect("as_of antichain")
344 .into_option()
345 .expect("unique as_of element");
346
347 let style = ExprPrepStyle::OneShot {
349 logical_time: EvalTime::Time(as_of),
350 session,
351 catalog_state: self.catalog.state(),
352 };
353 df_desc.visit_children(
354 |r| prep_relation_expr(r, style),
355 |s| prep_scalar_expr(s, style),
356 )?;
357
358 for build in df_desc.objects_to_build.iter_mut() {
360 normalize_lets(&mut build.plan.0, &self.config.features)?
361 }
362
363 let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
367
368 trace_plan(&df_desc);
370
371 self.duration += time.elapsed();
372 self.metrics
373 .observe_e2e_optimization_time("copy_to", self.duration);
374
375 Ok(GlobalLirPlan { df_desc, df_meta })
376 }
377}
378
379impl GlobalLirPlan {
380 pub fn unapply(self) -> (LirDataflowDescription, DataflowMetainfo) {
382 (self.df_desc, self.df_meta)
383 }
384}