mz_adapter/optimize/
copy_to.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Optimizer implementation for `COPY TO` statements.
11
12use std::fmt::Debug;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use mz_compute_types::ComputeInstanceId;
17use mz_compute_types::plan::Plan;
18use mz_compute_types::sinks::{
19    ComputeSinkConnection, ComputeSinkDesc, CopyToS3OneshotSinkConnection,
20};
21use mz_expr::{MirRelationExpr, OptimizedMirRelationExpr};
22use mz_repr::explain::trace_plan;
23use mz_repr::{GlobalId, Timestamp};
24use mz_sql::optimizer_metrics::OptimizerMetrics;
25use mz_sql::plan::HirRelationExpr;
26use mz_sql::session::metadata::SessionMetadata;
27use mz_storage_types::connections::Connection;
28use mz_storage_types::sinks::S3UploadInfo;
29use mz_transform::dataflow::DataflowMetainfo;
30use mz_transform::normalize_lets::normalize_lets;
31use mz_transform::reprtypecheck::{
32    SharedContext as ReprTypecheckContext, empty_context as empty_repr_context,
33};
34use mz_transform::{StatisticsOracle, TransformCtx};
35use timely::progress::Antichain;
36use tracing::warn;
37
38use crate::TimestampContext;
39use crate::catalog::Catalog;
40use crate::coord::CopyToContext;
41use crate::optimize::dataflows::{
42    ComputeInstanceSnapshot, DataflowBuilder, EvalTime, ExprPrep, ExprPrepOneShot,
43};
44use crate::optimize::{
45    LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerConfig,
46    OptimizerError, optimize_mir_local, trace_plan,
47};
48
49pub struct Optimizer {
50    /// A representation typechecking context to use throughout the optimizer pipeline.
51    repr_typecheck_ctx: ReprTypecheckContext,
52    /// A snapshot of the catalog state.
53    catalog: Arc<Catalog>,
54    /// A snapshot of the cluster that will run the dataflows.
55    compute_instance: ComputeInstanceSnapshot,
56    /// A transient GlobalId to be used when constructing the dataflow.
57    select_id: GlobalId,
58    /// Data required to do a COPY TO query.
59    copy_to_context: CopyToContext,
60    /// Optimizer config.
61    config: OptimizerConfig,
62    /// Optimizer metrics.
63    metrics: OptimizerMetrics,
64    /// The time spent performing optimization so far.
65    duration: Duration,
66}
67
68impl Optimizer {
69    pub fn new(
70        catalog: Arc<Catalog>,
71        compute_instance: ComputeInstanceSnapshot,
72        select_id: GlobalId,
73        copy_to_context: CopyToContext,
74        config: OptimizerConfig,
75        metrics: OptimizerMetrics,
76    ) -> Self {
77        Self {
78            repr_typecheck_ctx: empty_repr_context(),
79            catalog,
80            compute_instance,
81            select_id,
82            copy_to_context,
83            config,
84            metrics,
85            duration: Default::default(),
86        }
87    }
88
89    pub fn cluster_id(&self) -> ComputeInstanceId {
90        self.compute_instance.instance_id()
91    }
92}
93
94// A bogey `Debug` implementation that hides fields. This is needed to make the
95// `event!` call in `sequence_peek_stage` not emit a lot of data.
96//
97// For now, we skip almost all fields, but we might revisit that bit if it turns
98// out that we really need those for debugging purposes.
99impl Debug for Optimizer {
100    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101        f.debug_struct("OptimizePeek")
102            .field("config", &self.config)
103            .finish_non_exhaustive()
104    }
105}
106
107/// Marker type for [`LocalMirPlan`] representing an optimization result without
108/// context.
109pub struct Unresolved;
110
111/// The (sealed intermediate) result after HIR ⇒ MIR lowering and decorrelation
112/// and MIR optimization.
113#[derive(Clone)]
114pub struct LocalMirPlan<T = Unresolved> {
115    expr: MirRelationExpr,
116    df_meta: DataflowMetainfo,
117    context: T,
118}
119
120/// Marker type for [`LocalMirPlan`] structs representing an optimization result
121/// with attached environment context required for the next optimization stage.
122pub struct Resolved<'s> {
123    timestamp_ctx: TimestampContext<Timestamp>,
124    stats: Box<dyn StatisticsOracle>,
125    session: &'s dyn SessionMetadata,
126}
127
128/// The (final) result after
129///
130/// 1. embedding a [`LocalMirPlan`] into a `DataflowDescription`,
131/// 2. transitively inlining referenced views,
132/// 3. timestamp resolution,
133/// 4. optimizing the resulting `DataflowDescription` with `MIR` plans.
134/// 5. MIR ⇒ LIR lowering, and
135/// 6. optimizing the resulting `DataflowDescription` with `LIR` plans.
136#[derive(Debug)]
137pub struct GlobalLirPlan {
138    df_desc: LirDataflowDescription,
139    df_meta: DataflowMetainfo,
140}
141
142impl GlobalLirPlan {
143    pub fn df_desc(&self) -> &LirDataflowDescription {
144        &self.df_desc
145    }
146
147    /// Returns the id of the dataflow's sink export.
148    ///
149    /// # Panics
150    ///
151    /// Panics if the dataflow has no sink exports or has more than one.
152    pub fn sink_id(&self) -> GlobalId {
153        self.df_desc.sink_id()
154    }
155}
156
157impl Optimize<HirRelationExpr> for Optimizer {
158    type To = LocalMirPlan;
159
160    fn optimize(&mut self, expr: HirRelationExpr) -> Result<Self::To, OptimizerError> {
161        let time = Instant::now();
162
163        // Trace the pipeline input under `optimize/raw`.
164        trace_plan!(at: "raw", &expr);
165
166        // HIR ⇒ MIR lowering and decorrelation
167        let expr = expr.lower(&self.config, Some(&self.metrics))?;
168
169        // MIR ⇒ MIR optimization (local)
170        let mut df_meta = DataflowMetainfo::default();
171        let mut transform_ctx = TransformCtx::local(
172            &self.config.features,
173            &self.repr_typecheck_ctx,
174            &mut df_meta,
175            Some(&mut self.metrics),
176            Some(self.select_id),
177        );
178        let expr = optimize_mir_local(expr, &mut transform_ctx)?.into_inner();
179
180        self.duration += time.elapsed();
181
182        // Return the (sealed) plan at the end of this optimization step.
183        Ok(LocalMirPlan {
184            expr,
185            df_meta,
186            context: Unresolved,
187        })
188    }
189}
190
191impl LocalMirPlan<Unresolved> {
192    /// Produces the [`LocalMirPlan`] with [`Resolved`] contextual information
193    /// required for the next stage.
194    pub fn resolve(
195        self,
196        timestamp_ctx: TimestampContext<Timestamp>,
197        session: &dyn SessionMetadata,
198        stats: Box<dyn StatisticsOracle>,
199    ) -> LocalMirPlan<Resolved<'_>> {
200        LocalMirPlan {
201            expr: self.expr,
202            df_meta: self.df_meta,
203            context: Resolved {
204                timestamp_ctx,
205                session,
206                stats,
207            },
208        }
209    }
210}
211
212impl<'s> Optimize<LocalMirPlan<Resolved<'s>>> for Optimizer {
213    type To = GlobalLirPlan;
214
215    fn optimize(&mut self, plan: LocalMirPlan<Resolved<'s>>) -> Result<Self::To, OptimizerError> {
216        let time = Instant::now();
217
218        let LocalMirPlan {
219            expr,
220            mut df_meta,
221            context:
222                Resolved {
223                    timestamp_ctx,
224                    stats,
225                    session,
226                },
227        } = plan;
228
229        let expr = OptimizedMirRelationExpr(expr);
230
231        // The assembled dataflow contains a view and a sink on that view.
232        let mut df_builder = {
233            let catalog = self.catalog.state();
234            let compute = self.compute_instance.clone();
235            DataflowBuilder::new(catalog, compute).with_config(&self.config)
236        };
237
238        let debug_name = format!("copy-to-{}", self.select_id);
239        let mut df_desc = MirDataflowDescription::new(debug_name.to_string());
240
241        df_builder.import_view_into_dataflow(
242            &self.select_id,
243            &expr,
244            &mut df_desc,
245            &self.config.features,
246        )?;
247        df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
248
249        // Creating an S3 sink as currently only s3 sinks are supported. It
250        // might be possible in the future for COPY TO to write to different
251        // sinks, which should be set here depending upon the url scheme.
252        let connection = match &self.copy_to_context.connection {
253            Connection::Aws(aws_connection) => {
254                ComputeSinkConnection::CopyToS3Oneshot(CopyToS3OneshotSinkConnection {
255                    upload_info: S3UploadInfo {
256                        uri: self.copy_to_context.uri.to_string(),
257                        max_file_size: self.copy_to_context.max_file_size,
258                        desc: self.copy_to_context.desc.clone(),
259                        format: self.copy_to_context.format.clone(),
260                    },
261                    aws_connection: aws_connection.clone(),
262                    connection_id: self.copy_to_context.connection_id,
263                    output_batch_count: self
264                        .copy_to_context
265                        .output_batch_count
266                        .expect("output_batch_count should be set in sequencer"),
267                })
268            }
269            _ => {
270                // Currently only s3 sinks are supported. It was already validated in planning that this
271                // is an aws connection.
272                let msg = "only aws connection is supported in COPY TO";
273                return Err(OptimizerError::Internal(msg.to_string()));
274            }
275        };
276        let sink_description = ComputeSinkDesc {
277            from_desc: self.copy_to_context.desc.clone(),
278            from: self.select_id,
279            connection,
280            with_snapshot: true,
281            // This will get updated  when the GlobalMirPlan is resolved with as_of below.
282            up_to: Default::default(),
283            // No `FORCE NOT NULL` for copy_to.
284            non_null_assertions: Vec::new(),
285            // No `REFRESH` for copy_to.
286            refresh_schedule: None,
287        };
288        df_desc.export_sink(self.select_id, sink_description);
289
290        // Prepare expressions in the assembled dataflow.
291        //
292        // Resolve all unmaterializable function calls except mz_now(), because
293        // we don't yet have a timestamp.
294        let style = ExprPrepOneShot {
295            logical_time: EvalTime::Deferred,
296            session,
297            catalog_state: self.catalog.state(),
298        };
299        df_desc.visit_children(
300            |r| style.prep_relation_expr(r),
301            |s| style.prep_scalar_expr(s),
302        )?;
303
304        // Set the `as_of` and `until` timestamps for the dataflow.
305        df_desc.set_as_of(timestamp_ctx.antichain());
306
307        // Get the single timestamp representing the `as_of` time.
308        let as_of = df_desc
309            .as_of
310            .clone()
311            .expect("as_of antichain")
312            .into_option()
313            .expect("unique as_of element");
314
315        // Resolve all unmaterializable function calls including mz_now().
316        let style = ExprPrepOneShot {
317            logical_time: EvalTime::Time(as_of),
318            session,
319            catalog_state: self.catalog.state(),
320        };
321        df_desc.visit_children(
322            |r| style.prep_relation_expr(r),
323            |s| style.prep_scalar_expr(s),
324        )?;
325
326        // Use the opportunity to name an `until` frontier that will prevent
327        // work we needn't perform. By default, `until` will be
328        // `Antichain::new()`, which prevents no updates and is safe.
329        //
330        // If `timestamp_ctx.antichain()` is empty, `timestamp_ctx.timestamp()`
331        // will return `None` and we use the default (empty) `until`. Otherwise,
332        // we expect to be able to set `until = as_of + 1` without an overflow.
333        if let Some(as_of) = timestamp_ctx.timestamp() {
334            if let Some(until) = as_of.checked_add(1) {
335                df_desc.until = Antichain::from_elem(until);
336                // Also updating the sink up_to
337                for (_, sink) in &mut df_desc.sink_exports {
338                    sink.up_to.clone_from(&df_desc.until);
339                }
340            } else {
341                warn!(as_of = %as_of, "as_of + 1 overflow");
342            }
343        }
344
345        // Construct TransformCtx for global optimization.
346        let mut transform_ctx = TransformCtx::global(
347            &df_builder,
348            &*stats,
349            &self.config.features,
350            &self.repr_typecheck_ctx,
351            &mut df_meta,
352            Some(&mut self.metrics),
353        );
354        // Run global optimization.
355        mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
356
357        if self.config.mode == OptimizeMode::Explain {
358            // Collect the list of indexes used by the dataflow at this point.
359            trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
360        }
361
362        // Ensure all expressions are normalized before finalizing.
363        for build in df_desc.objects_to_build.iter_mut() {
364            normalize_lets(&mut build.plan.0, &self.config.features)?
365        }
366
367        // Finalize the dataflow. This includes:
368        // - MIR ⇒ LIR lowering
369        // - LIR ⇒ LIR transforms
370        let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
371
372        // Trace the pipeline output under `optimize`.
373        trace_plan(&df_desc);
374
375        self.duration += time.elapsed();
376        self.metrics
377            .observe_e2e_optimization_time("copy_to", self.duration);
378
379        Ok(GlobalLirPlan { df_desc, df_meta })
380    }
381}
382
383impl GlobalLirPlan {
384    /// Unwraps the parts of the final result of the optimization pipeline.
385    pub fn unapply(self) -> (LirDataflowDescription, DataflowMetainfo) {
386        (self.df_desc, self.df_meta)
387    }
388}