mz_adapter/optimize/
copy_to.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Optimizer implementation for `COPY TO` statements.
11
12use std::fmt::Debug;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use mz_compute_types::ComputeInstanceId;
17use mz_compute_types::plan::Plan;
18use mz_compute_types::sinks::{
19    ComputeSinkConnection, ComputeSinkDesc, CopyToS3OneshotSinkConnection,
20};
21use mz_expr::{MirRelationExpr, OptimizedMirRelationExpr};
22use mz_repr::explain::trace_plan;
23use mz_repr::{GlobalId, Timestamp};
24use mz_sql::optimizer_metrics::OptimizerMetrics;
25use mz_sql::plan::HirRelationExpr;
26use mz_sql::session::metadata::SessionMetadata;
27use mz_storage_types::connections::Connection;
28use mz_storage_types::sinks::S3UploadInfo;
29use mz_transform::dataflow::DataflowMetainfo;
30use mz_transform::normalize_lets::normalize_lets;
31use mz_transform::typecheck::{SharedContext as TypecheckContext, empty_context};
32use mz_transform::{StatisticsOracle, TransformCtx};
33use timely::progress::Antichain;
34use tracing::warn;
35
36use crate::TimestampContext;
37use crate::catalog::Catalog;
38use crate::coord::CopyToContext;
39use crate::optimize::dataflows::{
40    ComputeInstanceSnapshot, DataflowBuilder, EvalTime, ExprPrepStyle, prep_relation_expr,
41    prep_scalar_expr,
42};
43use crate::optimize::{
44    LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerConfig,
45    OptimizerError, optimize_mir_local, trace_plan,
46};
47
48pub struct Optimizer {
49    /// A typechecking context to use throughout the optimizer pipeline.
50    typecheck_ctx: TypecheckContext,
51    /// A snapshot of the catalog state.
52    catalog: Arc<Catalog>,
53    /// A snapshot of the cluster that will run the dataflows.
54    compute_instance: ComputeInstanceSnapshot,
55    /// A transient GlobalId to be used when constructing the dataflow.
56    select_id: GlobalId,
57    /// Data required to do a COPY TO query.
58    copy_to_context: CopyToContext,
59    /// Optimizer config.
60    config: OptimizerConfig,
61    /// Optimizer metrics.
62    metrics: OptimizerMetrics,
63    /// The time spent performing optimization so far.
64    duration: Duration,
65}
66
67impl Optimizer {
68    pub fn new(
69        catalog: Arc<Catalog>,
70        compute_instance: ComputeInstanceSnapshot,
71        select_id: GlobalId,
72        copy_to_context: CopyToContext,
73        config: OptimizerConfig,
74        metrics: OptimizerMetrics,
75    ) -> Self {
76        Self {
77            typecheck_ctx: empty_context(),
78            catalog,
79            compute_instance,
80            select_id,
81            copy_to_context,
82            config,
83            metrics,
84            duration: Default::default(),
85        }
86    }
87
88    pub fn cluster_id(&self) -> ComputeInstanceId {
89        self.compute_instance.instance_id()
90    }
91}
92
93// A bogey `Debug` implementation that hides fields. This is needed to make the
94// `event!` call in `sequence_peek_stage` not emit a lot of data.
95//
96// For now, we skip almost all fields, but we might revisit that bit if it turns
97// out that we really need those for debugging purposes.
98impl Debug for Optimizer {
99    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100        f.debug_struct("OptimizePeek")
101            .field("config", &self.config)
102            .finish_non_exhaustive()
103    }
104}
105
106/// Marker type for [`LocalMirPlan`] representing an optimization result without
107/// context.
108pub struct Unresolved;
109
110/// The (sealed intermediate) result after HIR ⇒ MIR lowering and decorrelation
111/// and MIR optimization.
112#[derive(Clone)]
113pub struct LocalMirPlan<T = Unresolved> {
114    expr: MirRelationExpr,
115    df_meta: DataflowMetainfo,
116    context: T,
117}
118
119/// Marker type for [`LocalMirPlan`] structs representing an optimization result
120/// with attached environment context required for the next optimization stage.
121pub struct Resolved<'s> {
122    timestamp_ctx: TimestampContext<Timestamp>,
123    stats: Box<dyn StatisticsOracle>,
124    session: &'s dyn SessionMetadata,
125}
126
127/// The (final) result after
128///
129/// 1. embedding a [`LocalMirPlan`] into a `DataflowDescription`,
130/// 2. transitively inlining referenced views,
131/// 3. timestamp resolution,
132/// 4. optimizing the resulting `DataflowDescription` with `MIR` plans.
133/// 5. MIR ⇒ LIR lowering, and
134/// 6. optimizing the resulting `DataflowDescription` with `LIR` plans.
135#[derive(Debug)]
136pub struct GlobalLirPlan {
137    df_desc: LirDataflowDescription,
138    df_meta: DataflowMetainfo,
139}
140
141impl GlobalLirPlan {
142    pub fn df_desc(&self) -> &LirDataflowDescription {
143        &self.df_desc
144    }
145
146    pub fn sink_id(&self) -> GlobalId {
147        let sink_exports = &self.df_desc.sink_exports;
148        let sink_id = sink_exports.keys().next().expect("valid sink");
149        *sink_id
150    }
151}
152
153impl Optimize<HirRelationExpr> for Optimizer {
154    type To = LocalMirPlan;
155
156    fn optimize(&mut self, expr: HirRelationExpr) -> Result<Self::To, OptimizerError> {
157        let time = Instant::now();
158
159        // Trace the pipeline input under `optimize/raw`.
160        trace_plan!(at: "raw", &expr);
161
162        // HIR ⇒ MIR lowering and decorrelation
163        let expr = expr.lower(&self.config, Some(&self.metrics))?;
164
165        // MIR ⇒ MIR optimization (local)
166        let mut df_meta = DataflowMetainfo::default();
167        let mut transform_ctx = TransformCtx::local(
168            &self.config.features,
169            &self.typecheck_ctx,
170            &mut df_meta,
171            Some(&self.metrics),
172            Some(self.select_id),
173        );
174        let expr = optimize_mir_local(expr, &mut transform_ctx)?.into_inner();
175
176        self.duration += time.elapsed();
177
178        // Return the (sealed) plan at the end of this optimization step.
179        Ok(LocalMirPlan {
180            expr,
181            df_meta,
182            context: Unresolved,
183        })
184    }
185}
186
187impl LocalMirPlan<Unresolved> {
188    /// Produces the [`LocalMirPlan`] with [`Resolved`] contextual information
189    /// required for the next stage.
190    pub fn resolve(
191        self,
192        timestamp_ctx: TimestampContext<Timestamp>,
193        session: &dyn SessionMetadata,
194        stats: Box<dyn StatisticsOracle>,
195    ) -> LocalMirPlan<Resolved<'_>> {
196        LocalMirPlan {
197            expr: self.expr,
198            df_meta: self.df_meta,
199            context: Resolved {
200                timestamp_ctx,
201                session,
202                stats,
203            },
204        }
205    }
206}
207
208impl<'s> Optimize<LocalMirPlan<Resolved<'s>>> for Optimizer {
209    type To = GlobalLirPlan;
210
211    fn optimize(&mut self, plan: LocalMirPlan<Resolved<'s>>) -> Result<Self::To, OptimizerError> {
212        let time = Instant::now();
213
214        let LocalMirPlan {
215            expr,
216            mut df_meta,
217            context:
218                Resolved {
219                    timestamp_ctx,
220                    stats,
221                    session,
222                },
223        } = plan;
224
225        let expr = OptimizedMirRelationExpr(expr);
226
227        // The assembled dataflow contains a view and a sink on that view.
228        let mut df_builder = {
229            let catalog = self.catalog.state();
230            let compute = self.compute_instance.clone();
231            DataflowBuilder::new(catalog, compute).with_config(&self.config)
232        };
233
234        let debug_name = format!("copy-to-{}", self.select_id);
235        let mut df_desc = MirDataflowDescription::new(debug_name.to_string());
236
237        df_builder.import_view_into_dataflow(
238            &self.select_id,
239            &expr,
240            &mut df_desc,
241            &self.config.features,
242        )?;
243        df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
244
245        // Creating an S3 sink as currently only s3 sinks are supported. It
246        // might be possible in the future for COPY TO to write to different
247        // sinks, which should be set here depending upon the url scheme.
248        let connection = match &self.copy_to_context.connection {
249            Connection::Aws(aws_connection) => {
250                ComputeSinkConnection::CopyToS3Oneshot(CopyToS3OneshotSinkConnection {
251                    upload_info: S3UploadInfo {
252                        uri: self.copy_to_context.uri.to_string(),
253                        max_file_size: self.copy_to_context.max_file_size,
254                        desc: self.copy_to_context.desc.clone(),
255                        format: self.copy_to_context.format.clone(),
256                    },
257                    aws_connection: aws_connection.clone(),
258                    connection_id: self.copy_to_context.connection_id,
259                    output_batch_count: self
260                        .copy_to_context
261                        .output_batch_count
262                        .expect("output_batch_count should be set in sequencer"),
263                })
264            }
265            _ => {
266                // Currently only s3 sinks are supported. It was already validated in planning that this
267                // is an aws connection.
268                let msg = "only aws connection is supported in COPY TO";
269                return Err(OptimizerError::Internal(msg.to_string()));
270            }
271        };
272        let sink_description = ComputeSinkDesc {
273            from_desc: self.copy_to_context.desc.clone(),
274            from: self.select_id,
275            connection,
276            with_snapshot: true,
277            // This will get updated  when the GlobalMirPlan is resolved with as_of below.
278            up_to: Default::default(),
279            // No `FORCE NOT NULL` for copy_to.
280            non_null_assertions: Vec::new(),
281            // No `REFRESH` for copy_to.
282            refresh_schedule: None,
283        };
284        df_desc.export_sink(self.select_id, sink_description);
285
286        // Prepare expressions in the assembled dataflow.
287        //
288        // Resolve all unmaterializable function calls except mz_now(), because
289        // we don't yet have a timestamp.
290        let style = ExprPrepStyle::OneShot {
291            logical_time: EvalTime::Deferred,
292            session,
293            catalog_state: self.catalog.state(),
294        };
295        df_desc.visit_children(
296            |r| prep_relation_expr(r, style),
297            |s| prep_scalar_expr(s, style),
298        )?;
299
300        // Set the `as_of` and `until` timestamps for the dataflow.
301        df_desc.set_as_of(timestamp_ctx.antichain());
302
303        // Get the single timestamp representing the `as_of` time.
304        let as_of = df_desc
305            .as_of
306            .clone()
307            .expect("as_of antichain")
308            .into_option()
309            .expect("unique as_of element");
310
311        // Resolve all unmaterializable function calls including mz_now().
312        let style = ExprPrepStyle::OneShot {
313            logical_time: EvalTime::Time(as_of),
314            session,
315            catalog_state: self.catalog.state(),
316        };
317        df_desc.visit_children(
318            |r| prep_relation_expr(r, style),
319            |s| prep_scalar_expr(s, style),
320        )?;
321
322        // Use the opportunity to name an `until` frontier that will prevent
323        // work we needn't perform. By default, `until` will be
324        // `Antichain::new()`, which prevents no updates and is safe.
325        //
326        // If `timestamp_ctx.antichain()` is empty, `timestamp_ctx.timestamp()`
327        // will return `None` and we use the default (empty) `until`. Otherwise,
328        // we expect to be able to set `until = as_of + 1` without an overflow.
329        if let Some(as_of) = timestamp_ctx.timestamp() {
330            if let Some(until) = as_of.checked_add(1) {
331                df_desc.until = Antichain::from_elem(until);
332                // Also updating the sink up_to
333                for (_, sink) in &mut df_desc.sink_exports {
334                    sink.up_to.clone_from(&df_desc.until);
335                }
336            } else {
337                warn!(as_of = %as_of, "as_of + 1 overflow");
338            }
339        }
340
341        // Construct TransformCtx for global optimization.
342        let mut transform_ctx = TransformCtx::global(
343            &df_builder,
344            &*stats,
345            &self.config.features,
346            &self.typecheck_ctx,
347            &mut df_meta,
348            Some(&self.metrics),
349        );
350        // Run global optimization.
351        mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
352
353        if self.config.mode == OptimizeMode::Explain {
354            // Collect the list of indexes used by the dataflow at this point.
355            trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
356        }
357
358        // Ensure all expressions are normalized before finalizing.
359        for build in df_desc.objects_to_build.iter_mut() {
360            normalize_lets(&mut build.plan.0, &self.config.features)?
361        }
362
363        // Finalize the dataflow. This includes:
364        // - MIR ⇒ LIR lowering
365        // - LIR ⇒ LIR transforms
366        let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
367
368        // Trace the pipeline output under `optimize`.
369        trace_plan(&df_desc);
370
371        self.duration += time.elapsed();
372        self.metrics
373            .observe_e2e_optimization_time("copy_to", self.duration);
374
375        Ok(GlobalLirPlan { df_desc, df_meta })
376    }
377}
378
379impl GlobalLirPlan {
380    /// Unwraps the parts of the final result of the optimization pipeline.
381    pub fn unapply(self) -> (LirDataflowDescription, DataflowMetainfo) {
382        (self.df_desc, self.df_meta)
383    }
384}