1use std::fmt::Debug;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use mz_compute_types::ComputeInstanceId;
17use mz_compute_types::plan::Plan;
18use mz_compute_types::sinks::{
19 ComputeSinkConnection, ComputeSinkDesc, CopyToS3OneshotSinkConnection,
20};
21use mz_expr::{MirRelationExpr, OptimizedMirRelationExpr};
22use mz_repr::explain::trace_plan;
23use mz_repr::{GlobalId, Timestamp};
24use mz_sql::optimizer_metrics::OptimizerMetrics;
25use mz_sql::plan::HirRelationExpr;
26use mz_sql::session::metadata::SessionMetadata;
27use mz_storage_types::connections::Connection;
28use mz_storage_types::sinks::S3UploadInfo;
29use mz_transform::dataflow::DataflowMetainfo;
30use mz_transform::normalize_lets::normalize_lets;
31use mz_transform::reprtypecheck::{
32 SharedContext as ReprTypecheckContext, empty_context as empty_repr_context,
33};
34use mz_transform::typecheck::{SharedContext as TypecheckContext, empty_context};
35use mz_transform::{StatisticsOracle, TransformCtx};
36use timely::progress::Antichain;
37use tracing::warn;
38
39use crate::TimestampContext;
40use crate::catalog::Catalog;
41use crate::coord::CopyToContext;
42use crate::optimize::dataflows::{
43 ComputeInstanceSnapshot, DataflowBuilder, EvalTime, ExprPrepStyle, prep_relation_expr,
44 prep_scalar_expr,
45};
46use crate::optimize::{
47 LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerConfig,
48 OptimizerError, optimize_mir_local, trace_plan,
49};
50
51pub struct Optimizer {
52 typecheck_ctx: TypecheckContext,
54 repr_typecheck_ctx: ReprTypecheckContext,
56 catalog: Arc<Catalog>,
58 compute_instance: ComputeInstanceSnapshot,
60 select_id: GlobalId,
62 copy_to_context: CopyToContext,
64 config: OptimizerConfig,
66 metrics: OptimizerMetrics,
68 duration: Duration,
70}
71
72impl Optimizer {
73 pub fn new(
74 catalog: Arc<Catalog>,
75 compute_instance: ComputeInstanceSnapshot,
76 select_id: GlobalId,
77 copy_to_context: CopyToContext,
78 config: OptimizerConfig,
79 metrics: OptimizerMetrics,
80 ) -> Self {
81 Self {
82 typecheck_ctx: empty_context(),
83 repr_typecheck_ctx: empty_repr_context(),
84 catalog,
85 compute_instance,
86 select_id,
87 copy_to_context,
88 config,
89 metrics,
90 duration: Default::default(),
91 }
92 }
93
94 pub fn cluster_id(&self) -> ComputeInstanceId {
95 self.compute_instance.instance_id()
96 }
97}
98
99impl Debug for Optimizer {
105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106 f.debug_struct("OptimizePeek")
107 .field("config", &self.config)
108 .finish_non_exhaustive()
109 }
110}
111
112pub struct Unresolved;
115
116#[derive(Clone)]
119pub struct LocalMirPlan<T = Unresolved> {
120 expr: MirRelationExpr,
121 df_meta: DataflowMetainfo,
122 context: T,
123}
124
125pub struct Resolved<'s> {
128 timestamp_ctx: TimestampContext<Timestamp>,
129 stats: Box<dyn StatisticsOracle>,
130 session: &'s dyn SessionMetadata,
131}
132
133#[derive(Debug)]
142pub struct GlobalLirPlan {
143 df_desc: LirDataflowDescription,
144 df_meta: DataflowMetainfo,
145}
146
147impl GlobalLirPlan {
148 pub fn df_desc(&self) -> &LirDataflowDescription {
149 &self.df_desc
150 }
151
152 pub fn sink_id(&self) -> GlobalId {
158 self.df_desc.sink_id()
159 }
160}
161
162impl Optimize<HirRelationExpr> for Optimizer {
163 type To = LocalMirPlan;
164
165 fn optimize(&mut self, expr: HirRelationExpr) -> Result<Self::To, OptimizerError> {
166 let time = Instant::now();
167
168 trace_plan!(at: "raw", &expr);
170
171 let expr = expr.lower(&self.config, Some(&self.metrics))?;
173
174 let mut df_meta = DataflowMetainfo::default();
176 let mut transform_ctx = TransformCtx::local(
177 &self.config.features,
178 &self.typecheck_ctx,
179 &self.repr_typecheck_ctx,
180 &mut df_meta,
181 Some(&mut self.metrics),
182 Some(self.select_id),
183 );
184 let expr = optimize_mir_local(expr, &mut transform_ctx)?.into_inner();
185
186 self.duration += time.elapsed();
187
188 Ok(LocalMirPlan {
190 expr,
191 df_meta,
192 context: Unresolved,
193 })
194 }
195}
196
197impl LocalMirPlan<Unresolved> {
198 pub fn resolve(
201 self,
202 timestamp_ctx: TimestampContext<Timestamp>,
203 session: &dyn SessionMetadata,
204 stats: Box<dyn StatisticsOracle>,
205 ) -> LocalMirPlan<Resolved<'_>> {
206 LocalMirPlan {
207 expr: self.expr,
208 df_meta: self.df_meta,
209 context: Resolved {
210 timestamp_ctx,
211 session,
212 stats,
213 },
214 }
215 }
216}
217
218impl<'s> Optimize<LocalMirPlan<Resolved<'s>>> for Optimizer {
219 type To = GlobalLirPlan;
220
221 fn optimize(&mut self, plan: LocalMirPlan<Resolved<'s>>) -> Result<Self::To, OptimizerError> {
222 let time = Instant::now();
223
224 let LocalMirPlan {
225 expr,
226 mut df_meta,
227 context:
228 Resolved {
229 timestamp_ctx,
230 stats,
231 session,
232 },
233 } = plan;
234
235 let expr = OptimizedMirRelationExpr(expr);
236
237 let mut df_builder = {
239 let catalog = self.catalog.state();
240 let compute = self.compute_instance.clone();
241 DataflowBuilder::new(catalog, compute).with_config(&self.config)
242 };
243
244 let debug_name = format!("copy-to-{}", self.select_id);
245 let mut df_desc = MirDataflowDescription::new(debug_name.to_string());
246
247 df_builder.import_view_into_dataflow(
248 &self.select_id,
249 &expr,
250 &mut df_desc,
251 &self.config.features,
252 )?;
253 df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
254
255 let connection = match &self.copy_to_context.connection {
259 Connection::Aws(aws_connection) => {
260 ComputeSinkConnection::CopyToS3Oneshot(CopyToS3OneshotSinkConnection {
261 upload_info: S3UploadInfo {
262 uri: self.copy_to_context.uri.to_string(),
263 max_file_size: self.copy_to_context.max_file_size,
264 desc: self.copy_to_context.desc.clone(),
265 format: self.copy_to_context.format.clone(),
266 },
267 aws_connection: aws_connection.clone(),
268 connection_id: self.copy_to_context.connection_id,
269 output_batch_count: self
270 .copy_to_context
271 .output_batch_count
272 .expect("output_batch_count should be set in sequencer"),
273 })
274 }
275 _ => {
276 let msg = "only aws connection is supported in COPY TO";
279 return Err(OptimizerError::Internal(msg.to_string()));
280 }
281 };
282 let sink_description = ComputeSinkDesc {
283 from_desc: self.copy_to_context.desc.clone(),
284 from: self.select_id,
285 connection,
286 with_snapshot: true,
287 up_to: Default::default(),
289 non_null_assertions: Vec::new(),
291 refresh_schedule: None,
293 };
294 df_desc.export_sink(self.select_id, sink_description);
295
296 let style = ExprPrepStyle::OneShot {
301 logical_time: EvalTime::Deferred,
302 session,
303 catalog_state: self.catalog.state(),
304 };
305 df_desc.visit_children(
306 |r| prep_relation_expr(r, style),
307 |s| prep_scalar_expr(s, style),
308 )?;
309
310 df_desc.set_as_of(timestamp_ctx.antichain());
312
313 let as_of = df_desc
315 .as_of
316 .clone()
317 .expect("as_of antichain")
318 .into_option()
319 .expect("unique as_of element");
320
321 let style = ExprPrepStyle::OneShot {
323 logical_time: EvalTime::Time(as_of),
324 session,
325 catalog_state: self.catalog.state(),
326 };
327 df_desc.visit_children(
328 |r| prep_relation_expr(r, style),
329 |s| prep_scalar_expr(s, style),
330 )?;
331
332 if let Some(as_of) = timestamp_ctx.timestamp() {
340 if let Some(until) = as_of.checked_add(1) {
341 df_desc.until = Antichain::from_elem(until);
342 for (_, sink) in &mut df_desc.sink_exports {
344 sink.up_to.clone_from(&df_desc.until);
345 }
346 } else {
347 warn!(as_of = %as_of, "as_of + 1 overflow");
348 }
349 }
350
351 let mut transform_ctx = TransformCtx::global(
353 &df_builder,
354 &*stats,
355 &self.config.features,
356 &self.typecheck_ctx,
357 &self.repr_typecheck_ctx,
358 &mut df_meta,
359 Some(&mut self.metrics),
360 );
361 mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
363
364 if self.config.mode == OptimizeMode::Explain {
365 trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
367 }
368
369 for build in df_desc.objects_to_build.iter_mut() {
371 normalize_lets(&mut build.plan.0, &self.config.features)?
372 }
373
374 let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
378
379 trace_plan(&df_desc);
381
382 self.duration += time.elapsed();
383 self.metrics
384 .observe_e2e_optimization_time("copy_to", self.duration);
385
386 Ok(GlobalLirPlan { df_desc, df_meta })
387 }
388}
389
390impl GlobalLirPlan {
391 pub fn unapply(self) -> (LirDataflowDescription, DataflowMetainfo) {
393 (self.df_desc, self.df_meta)
394 }
395}