Skip to main content

mz_adapter/optimize/
copy_to.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Optimizer implementation for `COPY TO` statements.
11
12use std::fmt::Debug;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use mz_compute_types::ComputeInstanceId;
17use mz_compute_types::plan::Plan;
18use mz_compute_types::sinks::{
19    ComputeSinkConnection, ComputeSinkDesc, CopyToS3OneshotSinkConnection,
20};
21use mz_expr::{MirRelationExpr, OptimizedMirRelationExpr};
22use mz_repr::explain::trace_plan;
23use mz_repr::{GlobalId, Timestamp};
24use mz_sql::optimizer_metrics::OptimizerMetrics;
25use mz_sql::plan::HirRelationExpr;
26use mz_sql::session::metadata::SessionMetadata;
27use mz_storage_types::connections::Connection;
28use mz_storage_types::sinks::S3UploadInfo;
29use mz_transform::dataflow::DataflowMetainfo;
30use mz_transform::normalize_lets::normalize_lets;
31use mz_transform::typecheck::{SharedTypecheckingContext, empty_typechecking_context};
32use mz_transform::{StatisticsOracle, TransformCtx};
33use timely::progress::Antichain;
34use tracing::warn;
35
36use crate::TimestampContext;
37use crate::catalog::Catalog;
38use crate::coord::CopyToContext;
39use crate::optimize::dataflows::{
40    ComputeInstanceSnapshot, DataflowBuilder, EvalTime, ExprPrep, ExprPrepOneShot,
41};
42use crate::optimize::{
43    LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerConfig,
44    OptimizerError, optimize_mir_local, trace_plan,
45};
46
47pub struct Optimizer {
48    /// A representation typechecking context to use throughout the optimizer pipeline.
49    typecheck_ctx: SharedTypecheckingContext,
50    /// A snapshot of the catalog state.
51    catalog: Arc<Catalog>,
52    /// A snapshot of the cluster that will run the dataflows.
53    compute_instance: ComputeInstanceSnapshot,
54    /// A transient GlobalId to be used when constructing the dataflow.
55    select_id: GlobalId,
56    /// Data required to do a COPY TO query.
57    copy_to_context: CopyToContext,
58    /// Optimizer config.
59    config: OptimizerConfig,
60    /// Optimizer metrics.
61    metrics: OptimizerMetrics,
62    /// The time spent performing optimization so far.
63    duration: Duration,
64}
65
66impl Optimizer {
67    pub fn new(
68        catalog: Arc<Catalog>,
69        compute_instance: ComputeInstanceSnapshot,
70        select_id: GlobalId,
71        copy_to_context: CopyToContext,
72        config: OptimizerConfig,
73        metrics: OptimizerMetrics,
74    ) -> Self {
75        Self {
76            typecheck_ctx: empty_typechecking_context(),
77            catalog,
78            compute_instance,
79            select_id,
80            copy_to_context,
81            config,
82            metrics,
83            duration: Default::default(),
84        }
85    }
86
87    pub fn cluster_id(&self) -> ComputeInstanceId {
88        self.compute_instance.instance_id()
89    }
90}
91
92// A bogey `Debug` implementation that hides fields. This is needed to make the
93// `event!` call in `sequence_peek_stage` not emit a lot of data.
94//
95// For now, we skip almost all fields, but we might revisit that bit if it turns
96// out that we really need those for debugging purposes.
97impl Debug for Optimizer {
98    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99        f.debug_struct("OptimizePeek")
100            .field("config", &self.config)
101            .finish_non_exhaustive()
102    }
103}
104
105/// Marker type for [`LocalMirPlan`] representing an optimization result without
106/// context.
107pub struct Unresolved;
108
109/// The (sealed intermediate) result after HIR ⇒ MIR lowering and decorrelation
110/// and MIR optimization.
111#[derive(Clone)]
112pub struct LocalMirPlan<T = Unresolved> {
113    expr: MirRelationExpr,
114    df_meta: DataflowMetainfo,
115    context: T,
116}
117
118/// Marker type for [`LocalMirPlan`] structs representing an optimization result
119/// with attached environment context required for the next optimization stage.
120pub struct Resolved<'s> {
121    timestamp_ctx: TimestampContext<Timestamp>,
122    stats: Box<dyn StatisticsOracle>,
123    session: &'s dyn SessionMetadata,
124}
125
126/// The (final) result after
127///
128/// 1. embedding a [`LocalMirPlan`] into a `DataflowDescription`,
129/// 2. transitively inlining referenced views,
130/// 3. timestamp resolution,
131/// 4. optimizing the resulting `DataflowDescription` with `MIR` plans.
132/// 5. MIR ⇒ LIR lowering, and
133/// 6. optimizing the resulting `DataflowDescription` with `LIR` plans.
134#[derive(Debug)]
135pub struct GlobalLirPlan {
136    df_desc: LirDataflowDescription,
137    df_meta: DataflowMetainfo,
138}
139
140impl GlobalLirPlan {
141    pub fn df_desc(&self) -> &LirDataflowDescription {
142        &self.df_desc
143    }
144
145    /// Returns the id of the dataflow's sink export.
146    ///
147    /// # Panics
148    ///
149    /// Panics if the dataflow has no sink exports or has more than one.
150    pub fn sink_id(&self) -> GlobalId {
151        self.df_desc.sink_id()
152    }
153}
154
155impl Optimize<HirRelationExpr> for Optimizer {
156    type To = LocalMirPlan;
157
158    fn optimize(&mut self, expr: HirRelationExpr) -> Result<Self::To, OptimizerError> {
159        let time = Instant::now();
160
161        // Trace the pipeline input under `optimize/raw`.
162        trace_plan!(at: "raw", &expr);
163
164        // HIR ⇒ MIR lowering and decorrelation
165        let expr = expr.lower(&self.config, Some(&self.metrics))?;
166
167        // MIR ⇒ MIR optimization (local)
168        let mut df_meta = DataflowMetainfo::default();
169        let mut transform_ctx = TransformCtx::local(
170            &self.config.features,
171            &self.typecheck_ctx,
172            &mut df_meta,
173            Some(&mut self.metrics),
174            Some(self.select_id),
175        );
176        let expr = optimize_mir_local(expr, &mut transform_ctx)?.into_inner();
177
178        self.duration += time.elapsed();
179
180        // Return the (sealed) plan at the end of this optimization step.
181        Ok(LocalMirPlan {
182            expr,
183            df_meta,
184            context: Unresolved,
185        })
186    }
187}
188
189impl LocalMirPlan<Unresolved> {
190    /// Produces the [`LocalMirPlan`] with [`Resolved`] contextual information
191    /// required for the next stage.
192    pub fn resolve(
193        self,
194        timestamp_ctx: TimestampContext<Timestamp>,
195        session: &dyn SessionMetadata,
196        stats: Box<dyn StatisticsOracle>,
197    ) -> LocalMirPlan<Resolved<'_>> {
198        LocalMirPlan {
199            expr: self.expr,
200            df_meta: self.df_meta,
201            context: Resolved {
202                timestamp_ctx,
203                session,
204                stats,
205            },
206        }
207    }
208}
209
210impl<'s> Optimize<LocalMirPlan<Resolved<'s>>> for Optimizer {
211    type To = GlobalLirPlan;
212
213    fn optimize(&mut self, plan: LocalMirPlan<Resolved<'s>>) -> Result<Self::To, OptimizerError> {
214        let time = Instant::now();
215
216        let LocalMirPlan {
217            expr,
218            mut df_meta,
219            context:
220                Resolved {
221                    timestamp_ctx,
222                    stats,
223                    session,
224                },
225        } = plan;
226
227        let expr = OptimizedMirRelationExpr(expr);
228
229        // The assembled dataflow contains a view and a sink on that view.
230        let mut df_builder = {
231            let catalog = self.catalog.state();
232            let compute = self.compute_instance.clone();
233            DataflowBuilder::new(catalog, compute).with_config(&self.config)
234        };
235
236        let debug_name = format!("copy-to-{}", self.select_id);
237        let mut df_desc = MirDataflowDescription::new(debug_name.to_string());
238
239        df_builder.import_view_into_dataflow(
240            &self.select_id,
241            &expr,
242            &mut df_desc,
243            &self.config.features,
244        )?;
245        df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
246
247        // Creating an S3 sink as currently only s3 sinks are supported. It
248        // might be possible in the future for COPY TO to write to different
249        // sinks, which should be set here depending upon the url scheme.
250        let connection = match &self.copy_to_context.connection {
251            Connection::Aws(aws_connection) => {
252                ComputeSinkConnection::CopyToS3Oneshot(CopyToS3OneshotSinkConnection {
253                    upload_info: S3UploadInfo {
254                        uri: self.copy_to_context.uri.to_string(),
255                        max_file_size: self.copy_to_context.max_file_size,
256                        desc: self.copy_to_context.desc.clone(),
257                        format: self.copy_to_context.format.clone(),
258                    },
259                    aws_connection: aws_connection.clone(),
260                    connection_id: self.copy_to_context.connection_id,
261                    output_batch_count: self
262                        .copy_to_context
263                        .output_batch_count
264                        .expect("output_batch_count should be set in sequencer"),
265                })
266            }
267            _ => {
268                // Currently only s3 sinks are supported. It was already validated in planning that this
269                // is an aws connection.
270                let msg = "only aws connection is supported in COPY TO";
271                return Err(OptimizerError::Internal(msg.to_string()));
272            }
273        };
274        let sink_description = ComputeSinkDesc {
275            from_desc: self.copy_to_context.desc.clone(),
276            from: self.select_id,
277            connection,
278            with_snapshot: true,
279            // This will get updated  when the GlobalMirPlan is resolved with as_of below.
280            up_to: Default::default(),
281            // No `FORCE NOT NULL` for copy_to.
282            non_null_assertions: Vec::new(),
283            // No `REFRESH` for copy_to.
284            refresh_schedule: None,
285        };
286        df_desc.export_sink(self.select_id, sink_description);
287
288        // Set the `as_of` and `until` timestamps for the dataflow.
289        df_desc.set_as_of(timestamp_ctx.antichain());
290
291        // Get the single timestamp representing the `as_of` time.
292        let as_of = df_desc
293            .as_of
294            .clone()
295            .expect("as_of antichain")
296            .into_option()
297            .expect("unique as_of element");
298
299        // Resolve all unmaterializable function calls including mz_now().
300        let style = ExprPrepOneShot {
301            logical_time: EvalTime::Time(as_of),
302            session,
303            catalog_state: self.catalog.state(),
304        };
305        df_desc.visit_children(
306            |r| style.prep_relation_expr(r),
307            |s| style.prep_scalar_expr(s),
308        )?;
309
310        // Use the opportunity to name an `until` frontier that will prevent
311        // work we needn't perform. By default, `until` will be
312        // `Antichain::new()`, which prevents no updates and is safe.
313        //
314        // If `timestamp_ctx.antichain()` is empty, `timestamp_ctx.timestamp()`
315        // will return `None` and we use the default (empty) `until`. Otherwise,
316        // we expect to be able to set `until = as_of + 1` without an overflow.
317        if let Some(as_of) = timestamp_ctx.timestamp() {
318            if let Some(until) = as_of.checked_add(1) {
319                df_desc.until = Antichain::from_elem(until);
320                // Also updating the sink up_to
321                for (_, sink) in &mut df_desc.sink_exports {
322                    sink.up_to.clone_from(&df_desc.until);
323                }
324            } else {
325                warn!(as_of = %as_of, "as_of + 1 overflow");
326            }
327        }
328
329        // Construct TransformCtx for global optimization.
330        let mut transform_ctx = TransformCtx::global(
331            &df_builder,
332            &*stats,
333            &self.config.features,
334            &self.typecheck_ctx,
335            &mut df_meta,
336            Some(&mut self.metrics),
337        );
338        // Run global optimization.
339        mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
340
341        if self.config.mode == OptimizeMode::Explain {
342            // Collect the list of indexes used by the dataflow at this point.
343            trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
344        }
345
346        // Ensure all expressions are normalized before finalizing.
347        for build in df_desc.objects_to_build.iter_mut() {
348            normalize_lets(&mut build.plan.0, &self.config.features)?
349        }
350
351        // Finalize the dataflow. This includes:
352        // - MIR ⇒ LIR lowering
353        // - LIR ⇒ LIR transforms
354        let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
355
356        // Trace the pipeline output under `optimize`.
357        trace_plan(&df_desc);
358
359        self.duration += time.elapsed();
360        self.metrics
361            .observe_e2e_optimization_time("copy_to", self.duration);
362
363        Ok(GlobalLirPlan { df_desc, df_meta })
364    }
365}
366
367impl GlobalLirPlan {
368    /// Unwraps the parts of the final result of the optimization pipeline.
369    pub fn unapply(self) -> (LirDataflowDescription, DataflowMetainfo) {
370        (self.df_desc, self.df_meta)
371    }
372}