mz_adapter/optimize/
copy_to.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Optimizer implementation for `COPY TO` statements.
11
12use std::fmt::Debug;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use mz_compute_types::ComputeInstanceId;
17use mz_compute_types::plan::Plan;
18use mz_compute_types::sinks::{
19    ComputeSinkConnection, ComputeSinkDesc, CopyToS3OneshotSinkConnection,
20};
21use mz_expr::{MirRelationExpr, OptimizedMirRelationExpr};
22use mz_repr::explain::trace_plan;
23use mz_repr::{GlobalId, Timestamp};
24use mz_sql::optimizer_metrics::OptimizerMetrics;
25use mz_sql::plan::HirRelationExpr;
26use mz_sql::session::metadata::SessionMetadata;
27use mz_storage_types::connections::Connection;
28use mz_storage_types::sinks::S3UploadInfo;
29use mz_transform::dataflow::DataflowMetainfo;
30use mz_transform::normalize_lets::normalize_lets;
31use mz_transform::reprtypecheck::{
32    SharedContext as ReprTypecheckContext, empty_context as empty_repr_context,
33};
34use mz_transform::{StatisticsOracle, TransformCtx};
35use timely::progress::Antichain;
36use tracing::warn;
37
38use crate::TimestampContext;
39use crate::catalog::Catalog;
40use crate::coord::CopyToContext;
41use crate::optimize::dataflows::{
42    ComputeInstanceSnapshot, DataflowBuilder, EvalTime, ExprPrepStyle, prep_relation_expr,
43    prep_scalar_expr,
44};
45use crate::optimize::{
46    LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerConfig,
47    OptimizerError, optimize_mir_local, trace_plan,
48};
49
50pub struct Optimizer {
51    /// A representation typechecking context to use throughout the optimizer pipeline.
52    repr_typecheck_ctx: ReprTypecheckContext,
53    /// A snapshot of the catalog state.
54    catalog: Arc<Catalog>,
55    /// A snapshot of the cluster that will run the dataflows.
56    compute_instance: ComputeInstanceSnapshot,
57    /// A transient GlobalId to be used when constructing the dataflow.
58    select_id: GlobalId,
59    /// Data required to do a COPY TO query.
60    copy_to_context: CopyToContext,
61    /// Optimizer config.
62    config: OptimizerConfig,
63    /// Optimizer metrics.
64    metrics: OptimizerMetrics,
65    /// The time spent performing optimization so far.
66    duration: Duration,
67}
68
69impl Optimizer {
70    pub fn new(
71        catalog: Arc<Catalog>,
72        compute_instance: ComputeInstanceSnapshot,
73        select_id: GlobalId,
74        copy_to_context: CopyToContext,
75        config: OptimizerConfig,
76        metrics: OptimizerMetrics,
77    ) -> Self {
78        Self {
79            repr_typecheck_ctx: empty_repr_context(),
80            catalog,
81            compute_instance,
82            select_id,
83            copy_to_context,
84            config,
85            metrics,
86            duration: Default::default(),
87        }
88    }
89
90    pub fn cluster_id(&self) -> ComputeInstanceId {
91        self.compute_instance.instance_id()
92    }
93}
94
95// A bogey `Debug` implementation that hides fields. This is needed to make the
96// `event!` call in `sequence_peek_stage` not emit a lot of data.
97//
98// For now, we skip almost all fields, but we might revisit that bit if it turns
99// out that we really need those for debugging purposes.
100impl Debug for Optimizer {
101    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102        f.debug_struct("OptimizePeek")
103            .field("config", &self.config)
104            .finish_non_exhaustive()
105    }
106}
107
108/// Marker type for [`LocalMirPlan`] representing an optimization result without
109/// context.
110pub struct Unresolved;
111
112/// The (sealed intermediate) result after HIR ⇒ MIR lowering and decorrelation
113/// and MIR optimization.
114#[derive(Clone)]
115pub struct LocalMirPlan<T = Unresolved> {
116    expr: MirRelationExpr,
117    df_meta: DataflowMetainfo,
118    context: T,
119}
120
121/// Marker type for [`LocalMirPlan`] structs representing an optimization result
122/// with attached environment context required for the next optimization stage.
123pub struct Resolved<'s> {
124    timestamp_ctx: TimestampContext<Timestamp>,
125    stats: Box<dyn StatisticsOracle>,
126    session: &'s dyn SessionMetadata,
127}
128
129/// The (final) result after
130///
131/// 1. embedding a [`LocalMirPlan`] into a `DataflowDescription`,
132/// 2. transitively inlining referenced views,
133/// 3. timestamp resolution,
134/// 4. optimizing the resulting `DataflowDescription` with `MIR` plans.
135/// 5. MIR ⇒ LIR lowering, and
136/// 6. optimizing the resulting `DataflowDescription` with `LIR` plans.
137#[derive(Debug)]
138pub struct GlobalLirPlan {
139    df_desc: LirDataflowDescription,
140    df_meta: DataflowMetainfo,
141}
142
143impl GlobalLirPlan {
144    pub fn df_desc(&self) -> &LirDataflowDescription {
145        &self.df_desc
146    }
147
148    /// Returns the id of the dataflow's sink export.
149    ///
150    /// # Panics
151    ///
152    /// Panics if the dataflow has no sink exports or has more than one.
153    pub fn sink_id(&self) -> GlobalId {
154        self.df_desc.sink_id()
155    }
156}
157
158impl Optimize<HirRelationExpr> for Optimizer {
159    type To = LocalMirPlan;
160
161    fn optimize(&mut self, expr: HirRelationExpr) -> Result<Self::To, OptimizerError> {
162        let time = Instant::now();
163
164        // Trace the pipeline input under `optimize/raw`.
165        trace_plan!(at: "raw", &expr);
166
167        // HIR ⇒ MIR lowering and decorrelation
168        let expr = expr.lower(&self.config, Some(&self.metrics))?;
169
170        // MIR ⇒ MIR optimization (local)
171        let mut df_meta = DataflowMetainfo::default();
172        let mut transform_ctx = TransformCtx::local(
173            &self.config.features,
174            &self.repr_typecheck_ctx,
175            &mut df_meta,
176            Some(&mut self.metrics),
177            Some(self.select_id),
178        );
179        let expr = optimize_mir_local(expr, &mut transform_ctx)?.into_inner();
180
181        self.duration += time.elapsed();
182
183        // Return the (sealed) plan at the end of this optimization step.
184        Ok(LocalMirPlan {
185            expr,
186            df_meta,
187            context: Unresolved,
188        })
189    }
190}
191
192impl LocalMirPlan<Unresolved> {
193    /// Produces the [`LocalMirPlan`] with [`Resolved`] contextual information
194    /// required for the next stage.
195    pub fn resolve(
196        self,
197        timestamp_ctx: TimestampContext<Timestamp>,
198        session: &dyn SessionMetadata,
199        stats: Box<dyn StatisticsOracle>,
200    ) -> LocalMirPlan<Resolved<'_>> {
201        LocalMirPlan {
202            expr: self.expr,
203            df_meta: self.df_meta,
204            context: Resolved {
205                timestamp_ctx,
206                session,
207                stats,
208            },
209        }
210    }
211}
212
213impl<'s> Optimize<LocalMirPlan<Resolved<'s>>> for Optimizer {
214    type To = GlobalLirPlan;
215
216    fn optimize(&mut self, plan: LocalMirPlan<Resolved<'s>>) -> Result<Self::To, OptimizerError> {
217        let time = Instant::now();
218
219        let LocalMirPlan {
220            expr,
221            mut df_meta,
222            context:
223                Resolved {
224                    timestamp_ctx,
225                    stats,
226                    session,
227                },
228        } = plan;
229
230        let expr = OptimizedMirRelationExpr(expr);
231
232        // The assembled dataflow contains a view and a sink on that view.
233        let mut df_builder = {
234            let catalog = self.catalog.state();
235            let compute = self.compute_instance.clone();
236            DataflowBuilder::new(catalog, compute).with_config(&self.config)
237        };
238
239        let debug_name = format!("copy-to-{}", self.select_id);
240        let mut df_desc = MirDataflowDescription::new(debug_name.to_string());
241
242        df_builder.import_view_into_dataflow(
243            &self.select_id,
244            &expr,
245            &mut df_desc,
246            &self.config.features,
247        )?;
248        df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
249
250        // Creating an S3 sink as currently only s3 sinks are supported. It
251        // might be possible in the future for COPY TO to write to different
252        // sinks, which should be set here depending upon the url scheme.
253        let connection = match &self.copy_to_context.connection {
254            Connection::Aws(aws_connection) => {
255                ComputeSinkConnection::CopyToS3Oneshot(CopyToS3OneshotSinkConnection {
256                    upload_info: S3UploadInfo {
257                        uri: self.copy_to_context.uri.to_string(),
258                        max_file_size: self.copy_to_context.max_file_size,
259                        desc: self.copy_to_context.desc.clone(),
260                        format: self.copy_to_context.format.clone(),
261                    },
262                    aws_connection: aws_connection.clone(),
263                    connection_id: self.copy_to_context.connection_id,
264                    output_batch_count: self
265                        .copy_to_context
266                        .output_batch_count
267                        .expect("output_batch_count should be set in sequencer"),
268                })
269            }
270            _ => {
271                // Currently only s3 sinks are supported. It was already validated in planning that this
272                // is an aws connection.
273                let msg = "only aws connection is supported in COPY TO";
274                return Err(OptimizerError::Internal(msg.to_string()));
275            }
276        };
277        let sink_description = ComputeSinkDesc {
278            from_desc: self.copy_to_context.desc.clone(),
279            from: self.select_id,
280            connection,
281            with_snapshot: true,
282            // This will get updated  when the GlobalMirPlan is resolved with as_of below.
283            up_to: Default::default(),
284            // No `FORCE NOT NULL` for copy_to.
285            non_null_assertions: Vec::new(),
286            // No `REFRESH` for copy_to.
287            refresh_schedule: None,
288        };
289        df_desc.export_sink(self.select_id, sink_description);
290
291        // Prepare expressions in the assembled dataflow.
292        //
293        // Resolve all unmaterializable function calls except mz_now(), because
294        // we don't yet have a timestamp.
295        let style = ExprPrepStyle::OneShot {
296            logical_time: EvalTime::Deferred,
297            session,
298            catalog_state: self.catalog.state(),
299        };
300        df_desc.visit_children(
301            |r| prep_relation_expr(r, style),
302            |s| prep_scalar_expr(s, style),
303        )?;
304
305        // Set the `as_of` and `until` timestamps for the dataflow.
306        df_desc.set_as_of(timestamp_ctx.antichain());
307
308        // Get the single timestamp representing the `as_of` time.
309        let as_of = df_desc
310            .as_of
311            .clone()
312            .expect("as_of antichain")
313            .into_option()
314            .expect("unique as_of element");
315
316        // Resolve all unmaterializable function calls including mz_now().
317        let style = ExprPrepStyle::OneShot {
318            logical_time: EvalTime::Time(as_of),
319            session,
320            catalog_state: self.catalog.state(),
321        };
322        df_desc.visit_children(
323            |r| prep_relation_expr(r, style),
324            |s| prep_scalar_expr(s, style),
325        )?;
326
327        // Use the opportunity to name an `until` frontier that will prevent
328        // work we needn't perform. By default, `until` will be
329        // `Antichain::new()`, which prevents no updates and is safe.
330        //
331        // If `timestamp_ctx.antichain()` is empty, `timestamp_ctx.timestamp()`
332        // will return `None` and we use the default (empty) `until`. Otherwise,
333        // we expect to be able to set `until = as_of + 1` without an overflow.
334        if let Some(as_of) = timestamp_ctx.timestamp() {
335            if let Some(until) = as_of.checked_add(1) {
336                df_desc.until = Antichain::from_elem(until);
337                // Also updating the sink up_to
338                for (_, sink) in &mut df_desc.sink_exports {
339                    sink.up_to.clone_from(&df_desc.until);
340                }
341            } else {
342                warn!(as_of = %as_of, "as_of + 1 overflow");
343            }
344        }
345
346        // Construct TransformCtx for global optimization.
347        let mut transform_ctx = TransformCtx::global(
348            &df_builder,
349            &*stats,
350            &self.config.features,
351            &self.repr_typecheck_ctx,
352            &mut df_meta,
353            Some(&mut self.metrics),
354        );
355        // Run global optimization.
356        mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
357
358        if self.config.mode == OptimizeMode::Explain {
359            // Collect the list of indexes used by the dataflow at this point.
360            trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
361        }
362
363        // Ensure all expressions are normalized before finalizing.
364        for build in df_desc.objects_to_build.iter_mut() {
365            normalize_lets(&mut build.plan.0, &self.config.features)?
366        }
367
368        // Finalize the dataflow. This includes:
369        // - MIR ⇒ LIR lowering
370        // - LIR ⇒ LIR transforms
371        let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
372
373        // Trace the pipeline output under `optimize`.
374        trace_plan(&df_desc);
375
376        self.duration += time.elapsed();
377        self.metrics
378            .observe_e2e_optimization_time("copy_to", self.duration);
379
380        Ok(GlobalLirPlan { df_desc, df_meta })
381    }
382}
383
384impl GlobalLirPlan {
385    /// Unwraps the parts of the final result of the optimization pipeline.
386    pub fn unapply(self) -> (LirDataflowDescription, DataflowMetainfo) {
387        (self.df_desc, self.df_meta)
388    }
389}