1use std::fmt::Debug;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use mz_compute_types::ComputeInstanceId;
17use mz_compute_types::plan::Plan;
18use mz_compute_types::sinks::{
19 ComputeSinkConnection, ComputeSinkDesc, CopyToS3OneshotSinkConnection,
20};
21use mz_expr::{MirRelationExpr, OptimizedMirRelationExpr};
22use mz_repr::explain::trace_plan;
23use mz_repr::{GlobalId, Timestamp};
24use mz_sql::optimizer_metrics::OptimizerMetrics;
25use mz_sql::plan::HirRelationExpr;
26use mz_sql::session::metadata::SessionMetadata;
27use mz_storage_types::connections::Connection;
28use mz_storage_types::sinks::S3UploadInfo;
29use mz_transform::dataflow::DataflowMetainfo;
30use mz_transform::normalize_lets::normalize_lets;
31use mz_transform::typecheck::{SharedTypecheckingContext, empty_typechecking_context};
32use mz_transform::{StatisticsOracle, TransformCtx};
33use timely::progress::Antichain;
34use tracing::warn;
35
36use crate::TimestampContext;
37use crate::catalog::Catalog;
38use crate::coord::CopyToContext;
39use crate::optimize::dataflows::{
40 ComputeInstanceSnapshot, DataflowBuilder, EvalTime, ExprPrep, ExprPrepOneShot,
41};
42use crate::optimize::{
43 LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerConfig,
44 OptimizerError, optimize_mir_local, trace_plan,
45};
46
47pub struct Optimizer {
48 typecheck_ctx: SharedTypecheckingContext,
50 catalog: Arc<Catalog>,
52 compute_instance: ComputeInstanceSnapshot,
54 select_id: GlobalId,
56 copy_to_context: CopyToContext,
58 config: OptimizerConfig,
60 metrics: OptimizerMetrics,
62 duration: Duration,
64}
65
66impl Optimizer {
67 pub fn new(
68 catalog: Arc<Catalog>,
69 compute_instance: ComputeInstanceSnapshot,
70 select_id: GlobalId,
71 copy_to_context: CopyToContext,
72 config: OptimizerConfig,
73 metrics: OptimizerMetrics,
74 ) -> Self {
75 Self {
76 typecheck_ctx: empty_typechecking_context(),
77 catalog,
78 compute_instance,
79 select_id,
80 copy_to_context,
81 config,
82 metrics,
83 duration: Default::default(),
84 }
85 }
86
87 pub fn cluster_id(&self) -> ComputeInstanceId {
88 self.compute_instance.instance_id()
89 }
90}
91
92impl Debug for Optimizer {
98 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99 f.debug_struct("OptimizePeek")
100 .field("config", &self.config)
101 .finish_non_exhaustive()
102 }
103}
104
105pub struct Unresolved;
108
109#[derive(Clone)]
112pub struct LocalMirPlan<T = Unresolved> {
113 expr: MirRelationExpr,
114 df_meta: DataflowMetainfo,
115 context: T,
116}
117
118pub struct Resolved<'s> {
121 timestamp_ctx: TimestampContext<Timestamp>,
122 stats: Box<dyn StatisticsOracle>,
123 session: &'s dyn SessionMetadata,
124}
125
126#[derive(Debug)]
135pub struct GlobalLirPlan {
136 df_desc: LirDataflowDescription,
137 df_meta: DataflowMetainfo,
138}
139
140impl GlobalLirPlan {
141 pub fn df_desc(&self) -> &LirDataflowDescription {
142 &self.df_desc
143 }
144
145 pub fn sink_id(&self) -> GlobalId {
151 self.df_desc.sink_id()
152 }
153}
154
155impl Optimize<HirRelationExpr> for Optimizer {
156 type To = LocalMirPlan;
157
158 fn optimize(&mut self, expr: HirRelationExpr) -> Result<Self::To, OptimizerError> {
159 let time = Instant::now();
160
161 trace_plan!(at: "raw", &expr);
163
164 let expr = expr.lower(&self.config, Some(&self.metrics))?;
166
167 let mut df_meta = DataflowMetainfo::default();
169 let mut transform_ctx = TransformCtx::local(
170 &self.config.features,
171 &self.typecheck_ctx,
172 &mut df_meta,
173 Some(&mut self.metrics),
174 Some(self.select_id),
175 );
176 let expr = optimize_mir_local(expr, &mut transform_ctx)?.into_inner();
177
178 self.duration += time.elapsed();
179
180 Ok(LocalMirPlan {
182 expr,
183 df_meta,
184 context: Unresolved,
185 })
186 }
187}
188
189impl LocalMirPlan<Unresolved> {
190 pub fn resolve(
193 self,
194 timestamp_ctx: TimestampContext<Timestamp>,
195 session: &dyn SessionMetadata,
196 stats: Box<dyn StatisticsOracle>,
197 ) -> LocalMirPlan<Resolved<'_>> {
198 LocalMirPlan {
199 expr: self.expr,
200 df_meta: self.df_meta,
201 context: Resolved {
202 timestamp_ctx,
203 session,
204 stats,
205 },
206 }
207 }
208}
209
210impl<'s> Optimize<LocalMirPlan<Resolved<'s>>> for Optimizer {
211 type To = GlobalLirPlan;
212
213 fn optimize(&mut self, plan: LocalMirPlan<Resolved<'s>>) -> Result<Self::To, OptimizerError> {
214 let time = Instant::now();
215
216 let LocalMirPlan {
217 expr,
218 mut df_meta,
219 context:
220 Resolved {
221 timestamp_ctx,
222 stats,
223 session,
224 },
225 } = plan;
226
227 let expr = OptimizedMirRelationExpr(expr);
228
229 let mut df_builder = {
231 let catalog = self.catalog.state();
232 let compute = self.compute_instance.clone();
233 DataflowBuilder::new(catalog, compute).with_config(&self.config)
234 };
235
236 let debug_name = format!("copy-to-{}", self.select_id);
237 let mut df_desc = MirDataflowDescription::new(debug_name.to_string());
238
239 df_builder.import_view_into_dataflow(
240 &self.select_id,
241 &expr,
242 &mut df_desc,
243 &self.config.features,
244 )?;
245 df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
246
247 let connection = match &self.copy_to_context.connection {
251 Connection::Aws(aws_connection) => {
252 ComputeSinkConnection::CopyToS3Oneshot(CopyToS3OneshotSinkConnection {
253 upload_info: S3UploadInfo {
254 uri: self.copy_to_context.uri.to_string(),
255 max_file_size: self.copy_to_context.max_file_size,
256 desc: self.copy_to_context.desc.clone(),
257 format: self.copy_to_context.format.clone(),
258 },
259 aws_connection: aws_connection.clone(),
260 connection_id: self.copy_to_context.connection_id,
261 output_batch_count: self
262 .copy_to_context
263 .output_batch_count
264 .expect("output_batch_count should be set in sequencer"),
265 })
266 }
267 _ => {
268 let msg = "only aws connection is supported in COPY TO";
271 return Err(OptimizerError::Internal(msg.to_string()));
272 }
273 };
274 let sink_description = ComputeSinkDesc {
275 from_desc: self.copy_to_context.desc.clone(),
276 from: self.select_id,
277 connection,
278 with_snapshot: true,
279 up_to: Default::default(),
281 non_null_assertions: Vec::new(),
283 refresh_schedule: None,
285 };
286 df_desc.export_sink(self.select_id, sink_description);
287
288 df_desc.set_as_of(timestamp_ctx.antichain());
290
291 let as_of = df_desc
293 .as_of
294 .clone()
295 .expect("as_of antichain")
296 .into_option()
297 .expect("unique as_of element");
298
299 let style = ExprPrepOneShot {
301 logical_time: EvalTime::Time(as_of),
302 session,
303 catalog_state: self.catalog.state(),
304 };
305 df_desc.visit_children(
306 |r| style.prep_relation_expr(r),
307 |s| style.prep_scalar_expr(s),
308 )?;
309
310 if let Some(as_of) = timestamp_ctx.timestamp() {
318 if let Some(until) = as_of.checked_add(1) {
319 df_desc.until = Antichain::from_elem(until);
320 for (_, sink) in &mut df_desc.sink_exports {
322 sink.up_to.clone_from(&df_desc.until);
323 }
324 } else {
325 warn!(as_of = %as_of, "as_of + 1 overflow");
326 }
327 }
328
329 let mut transform_ctx = TransformCtx::global(
331 &df_builder,
332 &*stats,
333 &self.config.features,
334 &self.typecheck_ctx,
335 &mut df_meta,
336 Some(&mut self.metrics),
337 );
338 mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
340
341 if self.config.mode == OptimizeMode::Explain {
342 trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
344 }
345
346 for build in df_desc.objects_to_build.iter_mut() {
348 normalize_lets(&mut build.plan.0, &self.config.features)?
349 }
350
351 let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
355
356 trace_plan(&df_desc);
358
359 self.duration += time.elapsed();
360 self.metrics
361 .observe_e2e_optimization_time("copy_to", self.duration);
362
363 Ok(GlobalLirPlan { df_desc, df_meta })
364 }
365}
366
367impl GlobalLirPlan {
368 pub fn unapply(self) -> (LirDataflowDescription, DataflowMetainfo) {
370 (self.df_desc, self.df_meta)
371 }
372}