1use std::fmt::Debug;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use mz_compute_types::ComputeInstanceId;
17use mz_compute_types::plan::Plan;
18use mz_compute_types::sinks::{
19 ComputeSinkConnection, ComputeSinkDesc, CopyToS3OneshotSinkConnection,
20};
21use mz_expr::{MirRelationExpr, OptimizedMirRelationExpr};
22use mz_repr::explain::trace_plan;
23use mz_repr::{GlobalId, Timestamp};
24use mz_sql::optimizer_metrics::OptimizerMetrics;
25use mz_sql::plan::HirRelationExpr;
26use mz_sql::session::metadata::SessionMetadata;
27use mz_storage_types::connections::Connection;
28use mz_storage_types::sinks::S3UploadInfo;
29use mz_transform::dataflow::DataflowMetainfo;
30use mz_transform::normalize_lets::normalize_lets;
31use mz_transform::reprtypecheck::{
32 SharedContext as ReprTypecheckContext, empty_context as empty_repr_context,
33};
34use mz_transform::typecheck::{SharedContext as TypecheckContext, empty_context};
35use mz_transform::{StatisticsOracle, TransformCtx};
36use timely::progress::Antichain;
37use tracing::warn;
38
39use crate::TimestampContext;
40use crate::catalog::Catalog;
41use crate::coord::CopyToContext;
42use crate::optimize::dataflows::{
43 ComputeInstanceSnapshot, DataflowBuilder, EvalTime, ExprPrepStyle, prep_relation_expr,
44 prep_scalar_expr,
45};
46use crate::optimize::{
47 LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerConfig,
48 OptimizerError, optimize_mir_local, trace_plan,
49};
50
51pub struct Optimizer {
52 typecheck_ctx: TypecheckContext,
54 repr_typecheck_ctx: ReprTypecheckContext,
56 catalog: Arc<Catalog>,
58 compute_instance: ComputeInstanceSnapshot,
60 select_id: GlobalId,
62 copy_to_context: CopyToContext,
64 config: OptimizerConfig,
66 metrics: OptimizerMetrics,
68 duration: Duration,
70}
71
72impl Optimizer {
73 pub fn new(
74 catalog: Arc<Catalog>,
75 compute_instance: ComputeInstanceSnapshot,
76 select_id: GlobalId,
77 copy_to_context: CopyToContext,
78 config: OptimizerConfig,
79 metrics: OptimizerMetrics,
80 ) -> Self {
81 Self {
82 typecheck_ctx: empty_context(),
83 repr_typecheck_ctx: empty_repr_context(),
84 catalog,
85 compute_instance,
86 select_id,
87 copy_to_context,
88 config,
89 metrics,
90 duration: Default::default(),
91 }
92 }
93
94 pub fn cluster_id(&self) -> ComputeInstanceId {
95 self.compute_instance.instance_id()
96 }
97}
98
99impl Debug for Optimizer {
105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106 f.debug_struct("OptimizePeek")
107 .field("config", &self.config)
108 .finish_non_exhaustive()
109 }
110}
111
112pub struct Unresolved;
115
116#[derive(Clone)]
119pub struct LocalMirPlan<T = Unresolved> {
120 expr: MirRelationExpr,
121 df_meta: DataflowMetainfo,
122 context: T,
123}
124
125pub struct Resolved<'s> {
128 timestamp_ctx: TimestampContext<Timestamp>,
129 stats: Box<dyn StatisticsOracle>,
130 session: &'s dyn SessionMetadata,
131}
132
133#[derive(Debug)]
142pub struct GlobalLirPlan {
143 df_desc: LirDataflowDescription,
144 df_meta: DataflowMetainfo,
145}
146
147impl GlobalLirPlan {
148 pub fn df_desc(&self) -> &LirDataflowDescription {
149 &self.df_desc
150 }
151
152 pub fn sink_id(&self) -> GlobalId {
153 let sink_exports = &self.df_desc.sink_exports;
154 let sink_id = sink_exports.keys().next().expect("valid sink");
155 *sink_id
156 }
157}
158
159impl Optimize<HirRelationExpr> for Optimizer {
160 type To = LocalMirPlan;
161
162 fn optimize(&mut self, expr: HirRelationExpr) -> Result<Self::To, OptimizerError> {
163 let time = Instant::now();
164
165 trace_plan!(at: "raw", &expr);
167
168 let expr = expr.lower(&self.config, Some(&self.metrics))?;
170
171 let mut df_meta = DataflowMetainfo::default();
173 let mut transform_ctx = TransformCtx::local(
174 &self.config.features,
175 &self.typecheck_ctx,
176 &self.repr_typecheck_ctx,
177 &mut df_meta,
178 Some(&self.metrics),
179 Some(self.select_id),
180 );
181 let expr = optimize_mir_local(expr, &mut transform_ctx)?.into_inner();
182
183 self.duration += time.elapsed();
184
185 Ok(LocalMirPlan {
187 expr,
188 df_meta,
189 context: Unresolved,
190 })
191 }
192}
193
194impl LocalMirPlan<Unresolved> {
195 pub fn resolve(
198 self,
199 timestamp_ctx: TimestampContext<Timestamp>,
200 session: &dyn SessionMetadata,
201 stats: Box<dyn StatisticsOracle>,
202 ) -> LocalMirPlan<Resolved<'_>> {
203 LocalMirPlan {
204 expr: self.expr,
205 df_meta: self.df_meta,
206 context: Resolved {
207 timestamp_ctx,
208 session,
209 stats,
210 },
211 }
212 }
213}
214
215impl<'s> Optimize<LocalMirPlan<Resolved<'s>>> for Optimizer {
216 type To = GlobalLirPlan;
217
218 fn optimize(&mut self, plan: LocalMirPlan<Resolved<'s>>) -> Result<Self::To, OptimizerError> {
219 let time = Instant::now();
220
221 let LocalMirPlan {
222 expr,
223 mut df_meta,
224 context:
225 Resolved {
226 timestamp_ctx,
227 stats,
228 session,
229 },
230 } = plan;
231
232 let expr = OptimizedMirRelationExpr(expr);
233
234 let mut df_builder = {
236 let catalog = self.catalog.state();
237 let compute = self.compute_instance.clone();
238 DataflowBuilder::new(catalog, compute).with_config(&self.config)
239 };
240
241 let debug_name = format!("copy-to-{}", self.select_id);
242 let mut df_desc = MirDataflowDescription::new(debug_name.to_string());
243
244 df_builder.import_view_into_dataflow(
245 &self.select_id,
246 &expr,
247 &mut df_desc,
248 &self.config.features,
249 )?;
250 df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
251
252 let connection = match &self.copy_to_context.connection {
256 Connection::Aws(aws_connection) => {
257 ComputeSinkConnection::CopyToS3Oneshot(CopyToS3OneshotSinkConnection {
258 upload_info: S3UploadInfo {
259 uri: self.copy_to_context.uri.to_string(),
260 max_file_size: self.copy_to_context.max_file_size,
261 desc: self.copy_to_context.desc.clone(),
262 format: self.copy_to_context.format.clone(),
263 },
264 aws_connection: aws_connection.clone(),
265 connection_id: self.copy_to_context.connection_id,
266 output_batch_count: self
267 .copy_to_context
268 .output_batch_count
269 .expect("output_batch_count should be set in sequencer"),
270 })
271 }
272 _ => {
273 let msg = "only aws connection is supported in COPY TO";
276 return Err(OptimizerError::Internal(msg.to_string()));
277 }
278 };
279 let sink_description = ComputeSinkDesc {
280 from_desc: self.copy_to_context.desc.clone(),
281 from: self.select_id,
282 connection,
283 with_snapshot: true,
284 up_to: Default::default(),
286 non_null_assertions: Vec::new(),
288 refresh_schedule: None,
290 };
291 df_desc.export_sink(self.select_id, sink_description);
292
293 let style = ExprPrepStyle::OneShot {
298 logical_time: EvalTime::Deferred,
299 session,
300 catalog_state: self.catalog.state(),
301 };
302 df_desc.visit_children(
303 |r| prep_relation_expr(r, style),
304 |s| prep_scalar_expr(s, style),
305 )?;
306
307 df_desc.set_as_of(timestamp_ctx.antichain());
309
310 let as_of = df_desc
312 .as_of
313 .clone()
314 .expect("as_of antichain")
315 .into_option()
316 .expect("unique as_of element");
317
318 let style = ExprPrepStyle::OneShot {
320 logical_time: EvalTime::Time(as_of),
321 session,
322 catalog_state: self.catalog.state(),
323 };
324 df_desc.visit_children(
325 |r| prep_relation_expr(r, style),
326 |s| prep_scalar_expr(s, style),
327 )?;
328
329 if let Some(as_of) = timestamp_ctx.timestamp() {
337 if let Some(until) = as_of.checked_add(1) {
338 df_desc.until = Antichain::from_elem(until);
339 for (_, sink) in &mut df_desc.sink_exports {
341 sink.up_to.clone_from(&df_desc.until);
342 }
343 } else {
344 warn!(as_of = %as_of, "as_of + 1 overflow");
345 }
346 }
347
348 let mut transform_ctx = TransformCtx::global(
350 &df_builder,
351 &*stats,
352 &self.config.features,
353 &self.typecheck_ctx,
354 &self.repr_typecheck_ctx,
355 &mut df_meta,
356 Some(&self.metrics),
357 );
358 mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
360
361 if self.config.mode == OptimizeMode::Explain {
362 trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
364 }
365
366 for build in df_desc.objects_to_build.iter_mut() {
368 normalize_lets(&mut build.plan.0, &self.config.features)?
369 }
370
371 let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
375
376 trace_plan(&df_desc);
378
379 self.duration += time.elapsed();
380 self.metrics
381 .observe_e2e_optimization_time("copy_to", self.duration);
382
383 Ok(GlobalLirPlan { df_desc, df_meta })
384 }
385}
386
387impl GlobalLirPlan {
388 pub fn unapply(self) -> (LirDataflowDescription, DataflowMetainfo) {
390 (self.df_desc, self.df_meta)
391 }
392}