mz_adapter/optimize/
copy_to.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Optimizer implementation for `COPY TO` statements.
11
12use std::fmt::Debug;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use mz_compute_types::ComputeInstanceId;
17use mz_compute_types::plan::Plan;
18use mz_compute_types::sinks::{
19    ComputeSinkConnection, ComputeSinkDesc, CopyToS3OneshotSinkConnection,
20};
21use mz_expr::{MirRelationExpr, OptimizedMirRelationExpr};
22use mz_repr::explain::trace_plan;
23use mz_repr::{GlobalId, Timestamp};
24use mz_sql::optimizer_metrics::OptimizerMetrics;
25use mz_sql::plan::HirRelationExpr;
26use mz_sql::session::metadata::SessionMetadata;
27use mz_storage_types::connections::Connection;
28use mz_storage_types::sinks::S3UploadInfo;
29use mz_transform::dataflow::DataflowMetainfo;
30use mz_transform::normalize_lets::normalize_lets;
31use mz_transform::reprtypecheck::{
32    SharedContext as ReprTypecheckContext, empty_context as empty_repr_context,
33};
34use mz_transform::typecheck::{SharedContext as TypecheckContext, empty_context};
35use mz_transform::{StatisticsOracle, TransformCtx};
36use timely::progress::Antichain;
37use tracing::warn;
38
39use crate::TimestampContext;
40use crate::catalog::Catalog;
41use crate::coord::CopyToContext;
42use crate::optimize::dataflows::{
43    ComputeInstanceSnapshot, DataflowBuilder, EvalTime, ExprPrepStyle, prep_relation_expr,
44    prep_scalar_expr,
45};
46use crate::optimize::{
47    LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerConfig,
48    OptimizerError, optimize_mir_local, trace_plan,
49};
50
51pub struct Optimizer {
52    /// A typechecking context to use throughout the optimizer pipeline.
53    typecheck_ctx: TypecheckContext,
54    /// A representation typechecking context to use throughout the optimizer pipeline.
55    repr_typecheck_ctx: ReprTypecheckContext,
56    /// A snapshot of the catalog state.
57    catalog: Arc<Catalog>,
58    /// A snapshot of the cluster that will run the dataflows.
59    compute_instance: ComputeInstanceSnapshot,
60    /// A transient GlobalId to be used when constructing the dataflow.
61    select_id: GlobalId,
62    /// Data required to do a COPY TO query.
63    copy_to_context: CopyToContext,
64    /// Optimizer config.
65    config: OptimizerConfig,
66    /// Optimizer metrics.
67    metrics: OptimizerMetrics,
68    /// The time spent performing optimization so far.
69    duration: Duration,
70}
71
72impl Optimizer {
73    pub fn new(
74        catalog: Arc<Catalog>,
75        compute_instance: ComputeInstanceSnapshot,
76        select_id: GlobalId,
77        copy_to_context: CopyToContext,
78        config: OptimizerConfig,
79        metrics: OptimizerMetrics,
80    ) -> Self {
81        Self {
82            typecheck_ctx: empty_context(),
83            repr_typecheck_ctx: empty_repr_context(),
84            catalog,
85            compute_instance,
86            select_id,
87            copy_to_context,
88            config,
89            metrics,
90            duration: Default::default(),
91        }
92    }
93
94    pub fn cluster_id(&self) -> ComputeInstanceId {
95        self.compute_instance.instance_id()
96    }
97}
98
99// A bogey `Debug` implementation that hides fields. This is needed to make the
100// `event!` call in `sequence_peek_stage` not emit a lot of data.
101//
102// For now, we skip almost all fields, but we might revisit that bit if it turns
103// out that we really need those for debugging purposes.
104impl Debug for Optimizer {
105    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106        f.debug_struct("OptimizePeek")
107            .field("config", &self.config)
108            .finish_non_exhaustive()
109    }
110}
111
112/// Marker type for [`LocalMirPlan`] representing an optimization result without
113/// context.
114pub struct Unresolved;
115
116/// The (sealed intermediate) result after HIR ⇒ MIR lowering and decorrelation
117/// and MIR optimization.
118#[derive(Clone)]
119pub struct LocalMirPlan<T = Unresolved> {
120    expr: MirRelationExpr,
121    df_meta: DataflowMetainfo,
122    context: T,
123}
124
125/// Marker type for [`LocalMirPlan`] structs representing an optimization result
126/// with attached environment context required for the next optimization stage.
127pub struct Resolved<'s> {
128    timestamp_ctx: TimestampContext<Timestamp>,
129    stats: Box<dyn StatisticsOracle>,
130    session: &'s dyn SessionMetadata,
131}
132
133/// The (final) result after
134///
135/// 1. embedding a [`LocalMirPlan`] into a `DataflowDescription`,
136/// 2. transitively inlining referenced views,
137/// 3. timestamp resolution,
138/// 4. optimizing the resulting `DataflowDescription` with `MIR` plans.
139/// 5. MIR ⇒ LIR lowering, and
140/// 6. optimizing the resulting `DataflowDescription` with `LIR` plans.
141#[derive(Debug)]
142pub struct GlobalLirPlan {
143    df_desc: LirDataflowDescription,
144    df_meta: DataflowMetainfo,
145}
146
147impl GlobalLirPlan {
148    pub fn df_desc(&self) -> &LirDataflowDescription {
149        &self.df_desc
150    }
151
152    /// Returns the id of the dataflow's sink export.
153    ///
154    /// # Panics
155    ///
156    /// Panics if the dataflow has no sink exports or has more than one.
157    pub fn sink_id(&self) -> GlobalId {
158        self.df_desc.sink_id()
159    }
160}
161
162impl Optimize<HirRelationExpr> for Optimizer {
163    type To = LocalMirPlan;
164
165    fn optimize(&mut self, expr: HirRelationExpr) -> Result<Self::To, OptimizerError> {
166        let time = Instant::now();
167
168        // Trace the pipeline input under `optimize/raw`.
169        trace_plan!(at: "raw", &expr);
170
171        // HIR ⇒ MIR lowering and decorrelation
172        let expr = expr.lower(&self.config, Some(&self.metrics))?;
173
174        // MIR ⇒ MIR optimization (local)
175        let mut df_meta = DataflowMetainfo::default();
176        let mut transform_ctx = TransformCtx::local(
177            &self.config.features,
178            &self.typecheck_ctx,
179            &self.repr_typecheck_ctx,
180            &mut df_meta,
181            Some(&mut self.metrics),
182            Some(self.select_id),
183        );
184        let expr = optimize_mir_local(expr, &mut transform_ctx)?.into_inner();
185
186        self.duration += time.elapsed();
187
188        // Return the (sealed) plan at the end of this optimization step.
189        Ok(LocalMirPlan {
190            expr,
191            df_meta,
192            context: Unresolved,
193        })
194    }
195}
196
197impl LocalMirPlan<Unresolved> {
198    /// Produces the [`LocalMirPlan`] with [`Resolved`] contextual information
199    /// required for the next stage.
200    pub fn resolve(
201        self,
202        timestamp_ctx: TimestampContext<Timestamp>,
203        session: &dyn SessionMetadata,
204        stats: Box<dyn StatisticsOracle>,
205    ) -> LocalMirPlan<Resolved<'_>> {
206        LocalMirPlan {
207            expr: self.expr,
208            df_meta: self.df_meta,
209            context: Resolved {
210                timestamp_ctx,
211                session,
212                stats,
213            },
214        }
215    }
216}
217
218impl<'s> Optimize<LocalMirPlan<Resolved<'s>>> for Optimizer {
219    type To = GlobalLirPlan;
220
221    fn optimize(&mut self, plan: LocalMirPlan<Resolved<'s>>) -> Result<Self::To, OptimizerError> {
222        let time = Instant::now();
223
224        let LocalMirPlan {
225            expr,
226            mut df_meta,
227            context:
228                Resolved {
229                    timestamp_ctx,
230                    stats,
231                    session,
232                },
233        } = plan;
234
235        let expr = OptimizedMirRelationExpr(expr);
236
237        // The assembled dataflow contains a view and a sink on that view.
238        let mut df_builder = {
239            let catalog = self.catalog.state();
240            let compute = self.compute_instance.clone();
241            DataflowBuilder::new(catalog, compute).with_config(&self.config)
242        };
243
244        let debug_name = format!("copy-to-{}", self.select_id);
245        let mut df_desc = MirDataflowDescription::new(debug_name.to_string());
246
247        df_builder.import_view_into_dataflow(
248            &self.select_id,
249            &expr,
250            &mut df_desc,
251            &self.config.features,
252        )?;
253        df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
254
255        // Creating an S3 sink as currently only s3 sinks are supported. It
256        // might be possible in the future for COPY TO to write to different
257        // sinks, which should be set here depending upon the url scheme.
258        let connection = match &self.copy_to_context.connection {
259            Connection::Aws(aws_connection) => {
260                ComputeSinkConnection::CopyToS3Oneshot(CopyToS3OneshotSinkConnection {
261                    upload_info: S3UploadInfo {
262                        uri: self.copy_to_context.uri.to_string(),
263                        max_file_size: self.copy_to_context.max_file_size,
264                        desc: self.copy_to_context.desc.clone(),
265                        format: self.copy_to_context.format.clone(),
266                    },
267                    aws_connection: aws_connection.clone(),
268                    connection_id: self.copy_to_context.connection_id,
269                    output_batch_count: self
270                        .copy_to_context
271                        .output_batch_count
272                        .expect("output_batch_count should be set in sequencer"),
273                })
274            }
275            _ => {
276                // Currently only s3 sinks are supported. It was already validated in planning that this
277                // is an aws connection.
278                let msg = "only aws connection is supported in COPY TO";
279                return Err(OptimizerError::Internal(msg.to_string()));
280            }
281        };
282        let sink_description = ComputeSinkDesc {
283            from_desc: self.copy_to_context.desc.clone(),
284            from: self.select_id,
285            connection,
286            with_snapshot: true,
287            // This will get updated  when the GlobalMirPlan is resolved with as_of below.
288            up_to: Default::default(),
289            // No `FORCE NOT NULL` for copy_to.
290            non_null_assertions: Vec::new(),
291            // No `REFRESH` for copy_to.
292            refresh_schedule: None,
293        };
294        df_desc.export_sink(self.select_id, sink_description);
295
296        // Prepare expressions in the assembled dataflow.
297        //
298        // Resolve all unmaterializable function calls except mz_now(), because
299        // we don't yet have a timestamp.
300        let style = ExprPrepStyle::OneShot {
301            logical_time: EvalTime::Deferred,
302            session,
303            catalog_state: self.catalog.state(),
304        };
305        df_desc.visit_children(
306            |r| prep_relation_expr(r, style),
307            |s| prep_scalar_expr(s, style),
308        )?;
309
310        // Set the `as_of` and `until` timestamps for the dataflow.
311        df_desc.set_as_of(timestamp_ctx.antichain());
312
313        // Get the single timestamp representing the `as_of` time.
314        let as_of = df_desc
315            .as_of
316            .clone()
317            .expect("as_of antichain")
318            .into_option()
319            .expect("unique as_of element");
320
321        // Resolve all unmaterializable function calls including mz_now().
322        let style = ExprPrepStyle::OneShot {
323            logical_time: EvalTime::Time(as_of),
324            session,
325            catalog_state: self.catalog.state(),
326        };
327        df_desc.visit_children(
328            |r| prep_relation_expr(r, style),
329            |s| prep_scalar_expr(s, style),
330        )?;
331
332        // Use the opportunity to name an `until` frontier that will prevent
333        // work we needn't perform. By default, `until` will be
334        // `Antichain::new()`, which prevents no updates and is safe.
335        //
336        // If `timestamp_ctx.antichain()` is empty, `timestamp_ctx.timestamp()`
337        // will return `None` and we use the default (empty) `until`. Otherwise,
338        // we expect to be able to set `until = as_of + 1` without an overflow.
339        if let Some(as_of) = timestamp_ctx.timestamp() {
340            if let Some(until) = as_of.checked_add(1) {
341                df_desc.until = Antichain::from_elem(until);
342                // Also updating the sink up_to
343                for (_, sink) in &mut df_desc.sink_exports {
344                    sink.up_to.clone_from(&df_desc.until);
345                }
346            } else {
347                warn!(as_of = %as_of, "as_of + 1 overflow");
348            }
349        }
350
351        // Construct TransformCtx for global optimization.
352        let mut transform_ctx = TransformCtx::global(
353            &df_builder,
354            &*stats,
355            &self.config.features,
356            &self.typecheck_ctx,
357            &self.repr_typecheck_ctx,
358            &mut df_meta,
359            Some(&mut self.metrics),
360        );
361        // Run global optimization.
362        mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
363
364        if self.config.mode == OptimizeMode::Explain {
365            // Collect the list of indexes used by the dataflow at this point.
366            trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
367        }
368
369        // Ensure all expressions are normalized before finalizing.
370        for build in df_desc.objects_to_build.iter_mut() {
371            normalize_lets(&mut build.plan.0, &self.config.features)?
372        }
373
374        // Finalize the dataflow. This includes:
375        // - MIR ⇒ LIR lowering
376        // - LIR ⇒ LIR transforms
377        let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
378
379        // Trace the pipeline output under `optimize`.
380        trace_plan(&df_desc);
381
382        self.duration += time.elapsed();
383        self.metrics
384            .observe_e2e_optimization_time("copy_to", self.duration);
385
386        Ok(GlobalLirPlan { df_desc, df_meta })
387    }
388}
389
390impl GlobalLirPlan {
391    /// Unwraps the parts of the final result of the optimization pipeline.
392    pub fn unapply(self) -> (LirDataflowDescription, DataflowMetainfo) {
393        (self.df_desc, self.df_meta)
394    }
395}