Skip to main content

mz_adapter/optimize/
materialized_view.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Optimizer implementation for `CREATE MATERIALIZED VIEW` statements.
11//!
12//! Note that, in contrast to other optimization pipelines, timestamp selection is not part of
13//! MV optimization. Instead users are expected to separately set the as-of on the optimized
14//! `DataflowDescription` received from `GlobalLirPlan::unapply`. Reasons for choosing to exclude
15//! timestamp selection from the MV optimization pipeline are:
16//!
17//!  (a) MVs don't support non-empty `until` frontiers, so they don't provide opportunity for
18//!      optimizations based on the selected timestamp.
19//!  (b) We want to generate dataflow plans early during environment bootstrapping, before we have
20//!      access to all information required for timestamp selection.
21//!
22//! None of this is set in stone though. If we find an opportunity for optimizing MVs based on
23//! their timestamps, we'll want to make timestamp selection part of the MV optimization again and
24//! find a different approach to bootstrapping.
25//!
26//! See also MaterializeInc/materialize#22940.
27
28use std::sync::Arc;
29use std::time::{Duration, Instant};
30
31use mz_compute_types::plan::Plan;
32use mz_compute_types::sinks::{
33    ComputeSinkConnection, ComputeSinkDesc, MaterializedViewSinkConnection,
34};
35use mz_expr::{MirRelationExpr, OptimizedMirRelationExpr};
36use mz_repr::explain::trace_plan;
37use mz_repr::refresh_schedule::RefreshSchedule;
38use mz_repr::{ColumnName, GlobalId, RelationDesc, SqlRelationType};
39use mz_sql::optimizer_metrics::OptimizerMetrics;
40use mz_sql::plan::HirRelationExpr;
41use mz_transform::TransformCtx;
42use mz_transform::dataflow::DataflowMetainfo;
43use mz_transform::normalize_lets::normalize_lets;
44use mz_transform::typecheck::{SharedTypecheckingContext, empty_typechecking_context};
45use timely::progress::Antichain;
46
47use crate::coord::infer_sql_type_for_catalog;
48use crate::optimize::dataflows::{
49    ComputeInstanceSnapshot, DataflowBuilder, ExprPrep, ExprPrepMaintained,
50};
51use crate::optimize::{
52    LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerCatalog,
53    OptimizerConfig, OptimizerError, optimize_mir_local, trace_plan,
54};
55
56pub struct Optimizer {
57    /// A representation typechecking context to use throughout the optimizer pipeline.
58    typecheck_ctx: SharedTypecheckingContext,
59    /// A snapshot of the catalog state.
60    catalog: Arc<dyn OptimizerCatalog>,
61    /// A snapshot of the cluster that will run the dataflows.
62    compute_instance: ComputeInstanceSnapshot,
63    /// A durable GlobalId to be used with the exported materialized view sink.
64    sink_id: GlobalId,
65    /// A transient GlobalId to be used when constructing the dataflow.
66    view_id: GlobalId,
67    /// The resulting column names.
68    column_names: Vec<ColumnName>,
69    /// Output columns that are asserted to be not null in the `CREATE VIEW`
70    /// statement.
71    non_null_assertions: Vec<usize>,
72    /// Refresh schedule, e.g., `REFRESH EVERY '1 day'`
73    refresh_schedule: Option<RefreshSchedule>,
74    /// A human-readable name exposed internally (useful for debugging).
75    debug_name: String,
76    /// Optimizer config.
77    config: OptimizerConfig,
78    /// Optimizer metrics.
79    metrics: OptimizerMetrics,
80    /// The time spent performing optimization so far.
81    duration: Duration,
82}
83
84impl Optimizer {
85    pub fn new(
86        catalog: Arc<dyn OptimizerCatalog>,
87        compute_instance: ComputeInstanceSnapshot,
88        sink_id: GlobalId,
89        view_id: GlobalId,
90        column_names: Vec<ColumnName>,
91        non_null_assertions: Vec<usize>,
92        refresh_schedule: Option<RefreshSchedule>,
93        debug_name: String,
94        config: OptimizerConfig,
95        metrics: OptimizerMetrics,
96    ) -> Self {
97        Self {
98            typecheck_ctx: empty_typechecking_context(),
99            catalog,
100            compute_instance,
101            sink_id,
102            view_id,
103            column_names,
104            non_null_assertions,
105            refresh_schedule,
106            debug_name,
107            config,
108            metrics,
109            duration: Default::default(),
110        }
111    }
112}
113
114/// The (sealed intermediate) result after HIR ⇒ MIR lowering and decorrelation
115/// and MIR optimization.
116#[derive(Clone, Debug)]
117pub struct LocalMirPlan {
118    expr: MirRelationExpr,
119    df_meta: DataflowMetainfo,
120    typ: SqlRelationType,
121}
122
123/// The (sealed intermediate) result after:
124///
125/// 1. embedding a [`LocalMirPlan`] into a [`MirDataflowDescription`],
126/// 2. transitively inlining referenced views, and
127/// 3. jointly optimizing the `MIR` plans in the [`MirDataflowDescription`].
128#[derive(Clone, Debug)]
129pub struct GlobalMirPlan {
130    df_desc: MirDataflowDescription,
131    df_meta: DataflowMetainfo,
132}
133
134impl GlobalMirPlan {
135    pub fn df_desc(&self) -> &MirDataflowDescription {
136        &self.df_desc
137    }
138}
139
140/// The (final) result after MIR ⇒ LIR lowering and optimizing the resulting
141/// `DataflowDescription` with `LIR` plans.
142#[derive(Clone, Debug)]
143pub struct GlobalLirPlan {
144    df_desc: LirDataflowDescription,
145    df_meta: DataflowMetainfo,
146}
147
148impl GlobalLirPlan {
149    pub fn df_desc(&self) -> &LirDataflowDescription {
150        &self.df_desc
151    }
152
153    pub fn df_meta(&self) -> &DataflowMetainfo {
154        &self.df_meta
155    }
156
157    pub fn desc(&self) -> &RelationDesc {
158        let sink_exports = &self.df_desc.sink_exports;
159        let sink = sink_exports.values().next().expect("valid sink");
160        &sink.from_desc
161    }
162}
163
164impl Optimize<HirRelationExpr> for Optimizer {
165    type To = LocalMirPlan;
166
167    fn optimize(&mut self, expr: HirRelationExpr) -> Result<Self::To, OptimizerError> {
168        let time = Instant::now();
169
170        // Trace the pipeline input under `optimize/raw`.
171        trace_plan!(at: "raw", &expr);
172
173        // HIR ⇒ MIR lowering and decorrelation
174        let mir_expr = expr.clone().lower(&self.config, Some(&self.metrics))?;
175
176        // MIR ⇒ MIR optimization (local)
177        let mut df_meta = DataflowMetainfo::default();
178        let mut transform_ctx = TransformCtx::local(
179            &self.config.features,
180            &self.typecheck_ctx,
181            &mut df_meta,
182            Some(&mut self.metrics),
183            Some(self.view_id),
184        );
185        let mir_expr = optimize_mir_local(mir_expr, &mut transform_ctx)?.into_inner();
186        let typ = infer_sql_type_for_catalog(&expr, &mir_expr);
187
188        self.duration += time.elapsed();
189
190        // Return the (sealed) plan at the end of this optimization step.
191        Ok(LocalMirPlan {
192            expr: mir_expr,
193            df_meta,
194            typ,
195        })
196    }
197}
198
199impl LocalMirPlan {
200    pub fn expr(&self) -> OptimizedMirRelationExpr {
201        OptimizedMirRelationExpr(self.expr.clone())
202    }
203}
204
205/// This is needed only because the pipeline in the bootstrap code starts from an
206/// [`OptimizedMirRelationExpr`] attached to a [`mz_catalog::memory::objects::CatalogItem`].
207impl Optimize<(OptimizedMirRelationExpr, SqlRelationType)> for Optimizer {
208    type To = GlobalMirPlan;
209
210    fn optimize(
211        &mut self,
212        (expr, typ): (OptimizedMirRelationExpr, SqlRelationType),
213    ) -> Result<Self::To, OptimizerError> {
214        let expr = expr.into_inner();
215        let df_meta = DataflowMetainfo::default();
216        self.optimize(LocalMirPlan { expr, df_meta, typ })
217    }
218}
219
220impl Optimize<LocalMirPlan> for Optimizer {
221    type To = GlobalMirPlan;
222
223    fn optimize(&mut self, plan: LocalMirPlan) -> Result<Self::To, OptimizerError> {
224        let time = Instant::now();
225
226        let expr = OptimizedMirRelationExpr(plan.expr);
227        let mut df_meta = plan.df_meta;
228
229        let mut rel_typ = plan.typ;
230        for &i in self.non_null_assertions.iter() {
231            rel_typ.column_types[i].nullable = false;
232        }
233        let rel_desc = RelationDesc::new(rel_typ, self.column_names.clone());
234
235        let mut df_builder = {
236            let compute = self.compute_instance.clone();
237            DataflowBuilder::new(&*self.catalog, compute).with_config(&self.config)
238        };
239        let mut df_desc = MirDataflowDescription::new(self.debug_name.clone());
240
241        df_desc.refresh_schedule.clone_from(&self.refresh_schedule);
242
243        df_builder.import_view_into_dataflow(
244            &self.view_id,
245            &expr,
246            &mut df_desc,
247            &self.config.features,
248        )?;
249        df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
250
251        let sink_description = ComputeSinkDesc {
252            from: self.view_id,
253            from_desc: rel_desc.clone(),
254            connection: ComputeSinkConnection::MaterializedView(MaterializedViewSinkConnection {
255                value_desc: rel_desc,
256                storage_metadata: (),
257            }),
258            with_snapshot: true,
259            up_to: Antichain::default(),
260            non_null_assertions: self.non_null_assertions.clone(),
261            refresh_schedule: self.refresh_schedule.clone(),
262        };
263        df_desc.export_sink(self.sink_id, sink_description);
264
265        // Prepare expressions in the assembled dataflow.
266        let style = ExprPrepMaintained;
267        df_desc.visit_children(
268            |r| style.prep_relation_expr(r),
269            |s| style.prep_scalar_expr(s),
270        )?;
271
272        // Construct TransformCtx for global optimization.
273        let mut transform_ctx = TransformCtx::global(
274            &df_builder,
275            &mz_transform::EmptyStatisticsOracle, // TODO: wire proper stats
276            &self.config.features,
277            &self.typecheck_ctx,
278            &mut df_meta,
279            Some(&mut self.metrics),
280        );
281        // Run global optimization.
282        mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
283
284        if self.config.mode == OptimizeMode::Explain {
285            // Collect the list of indexes used by the dataflow at this point.
286            trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
287        }
288
289        self.duration += time.elapsed();
290
291        // Return the (sealed) plan at the end of this optimization step.
292        Ok(GlobalMirPlan { df_desc, df_meta })
293    }
294}
295
296impl Optimize<GlobalMirPlan> for Optimizer {
297    type To = GlobalLirPlan;
298
299    fn optimize(&mut self, plan: GlobalMirPlan) -> Result<Self::To, OptimizerError> {
300        let time = Instant::now();
301
302        let GlobalMirPlan {
303            mut df_desc,
304            df_meta,
305        } = plan;
306
307        // Ensure all expressions are normalized before finalizing.
308        for build in df_desc.objects_to_build.iter_mut() {
309            normalize_lets(&mut build.plan.0, &self.config.features)?
310        }
311
312        // Finalize the dataflow. This includes:
313        // - MIR ⇒ LIR lowering
314        // - LIR ⇒ LIR transforms
315        let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
316
317        // Trace the pipeline output under `optimize`.
318        trace_plan(&df_desc);
319
320        self.duration += time.elapsed();
321        self.metrics
322            .observe_e2e_optimization_time("materialized_view", self.duration);
323
324        // Return the plan at the end of this `optimize` step.
325        Ok(GlobalLirPlan { df_desc, df_meta })
326    }
327}
328
329impl GlobalLirPlan {
330    /// Unwraps the parts of the final result of the optimization pipeline.
331    pub fn unapply(self) -> (LirDataflowDescription, DataflowMetainfo) {
332        (self.df_desc, self.df_meta)
333    }
334}