Skip to main content

mz_adapter/optimize/
materialized_view.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Optimizer implementation for `CREATE MATERIALIZED VIEW` statements.
11//!
12//! Note that, in contrast to other optimization pipelines, timestamp selection is not part of
13//! MV optimization. Instead users are expected to separately set the as-of on the optimized
14//! `DataflowDescription` received from `GlobalLirPlan::unapply`. Reasons for choosing to exclude
15//! timestamp selection from the MV optimization pipeline are:
16//!
17//!  (a) MVs don't support non-empty `until` frontiers, so they don't provide opportunity for
18//!      optimizations based on the selected timestamp.
19//!  (b) We want to generate dataflow plans early during environment bootstrapping, before we have
20//!      access to all information required for timestamp selection.
21//!
22//! None of this is set in stone though. If we find an opportunity for optimizing MVs based on
23//! their timestamps, we'll want to make timestamp selection part of the MV optimization again and
24//! find a different approach to bootstrapping.
25//!
26//! See also MaterializeInc/materialize#22940.
27
28use std::collections::BTreeSet;
29use std::sync::Arc;
30use std::time::{Duration, Instant};
31
32use mz_compute_types::plan::Plan;
33use mz_compute_types::sinks::{
34    ComputeSinkConnection, ComputeSinkDesc, MaterializedViewSinkConnection,
35};
36use mz_expr::{MirRelationExpr, OptimizedMirRelationExpr};
37use mz_repr::explain::trace_plan;
38use mz_repr::refresh_schedule::RefreshSchedule;
39use mz_repr::{ColumnName, GlobalId, RelationDesc, SqlRelationType};
40use mz_sql::optimizer_metrics::OptimizerMetrics;
41use mz_sql::plan::HirRelationExpr;
42use mz_transform::TransformCtx;
43use mz_transform::dataflow::DataflowMetainfo;
44use mz_transform::normalize_lets::normalize_lets;
45use mz_transform::reprtypecheck::{
46    SharedContext as ReprTypecheckContext, empty_context as empty_repr_context,
47};
48use timely::progress::Antichain;
49
50use crate::coord::infer_sql_type_for_catalog;
51use crate::optimize::dataflows::{
52    ComputeInstanceSnapshot, DataflowBuilder, ExprPrep, ExprPrepMaintained,
53};
54use crate::optimize::{
55    LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerCatalog,
56    OptimizerConfig, OptimizerError, optimize_mir_local, trace_plan,
57};
58
59pub struct Optimizer {
60    /// A representation typechecking context to use throughout the optimizer pipeline.
61    repr_typecheck_ctx: ReprTypecheckContext,
62    /// A snapshot of the catalog state.
63    catalog: Arc<dyn OptimizerCatalog>,
64    /// A snapshot of the cluster that will run the dataflows.
65    compute_instance: ComputeInstanceSnapshot,
66    /// A durable GlobalId to be used with the exported materialized view sink.
67    sink_id: GlobalId,
68    /// A transient GlobalId to be used when constructing the dataflow.
69    view_id: GlobalId,
70    /// The resulting column names.
71    column_names: Vec<ColumnName>,
72    /// Output columns that are asserted to be not null in the `CREATE VIEW`
73    /// statement.
74    non_null_assertions: Vec<usize>,
75    /// Refresh schedule, e.g., `REFRESH EVERY '1 day'`
76    refresh_schedule: Option<RefreshSchedule>,
77    /// A human-readable name exposed internally (useful for debugging).
78    debug_name: String,
79    /// Optimizer config.
80    config: OptimizerConfig,
81    /// Optimizer metrics.
82    metrics: OptimizerMetrics,
83    /// The time spent performing optimization so far.
84    duration: Duration,
85    /// Overrides monotonicity for the given source collections.
86    ///
87    /// This is here only for continual tasks, which at runtime introduce
88    /// synthetic retractions to "input sources". If/when we split a CT
89    /// optimizer out of the MV optimizer, this can be removed.
90    ///
91    /// TODO(ct3): There are other differences between a GlobalId used as a CT
92    /// input vs as a normal collection, such as the statistical size estimates.
93    /// Plus, at the moment, it is not possible to use the same GlobalId as both
94    /// an "input" and a "reference" in a CT. So, better than this approach
95    /// would be for the optimizer itself to somehow understand the distinction
96    /// between a CT input and a normal collection.
97    ///
98    /// In the meantime, it might be desirable to refactor the MV optimizer to
99    /// have a small amount of knowledge about CTs, in particular producing the
100    /// CT sink connection directly. This would allow us to replace this field
101    /// with something derived directly from that sink connection.
102    force_source_non_monotonic: BTreeSet<GlobalId>,
103}
104
105impl Optimizer {
106    pub fn new(
107        catalog: Arc<dyn OptimizerCatalog>,
108        compute_instance: ComputeInstanceSnapshot,
109        sink_id: GlobalId,
110        view_id: GlobalId,
111        column_names: Vec<ColumnName>,
112        non_null_assertions: Vec<usize>,
113        refresh_schedule: Option<RefreshSchedule>,
114        debug_name: String,
115        config: OptimizerConfig,
116        metrics: OptimizerMetrics,
117        force_source_non_monotonic: BTreeSet<GlobalId>,
118    ) -> Self {
119        Self {
120            repr_typecheck_ctx: empty_repr_context(),
121            catalog,
122            compute_instance,
123            sink_id,
124            view_id,
125            column_names,
126            non_null_assertions,
127            refresh_schedule,
128            debug_name,
129            config,
130            metrics,
131            duration: Default::default(),
132            force_source_non_monotonic,
133        }
134    }
135}
136
137/// The (sealed intermediate) result after HIR ⇒ MIR lowering and decorrelation
138/// and MIR optimization.
139#[derive(Clone, Debug)]
140pub struct LocalMirPlan {
141    expr: MirRelationExpr,
142    df_meta: DataflowMetainfo,
143    typ: SqlRelationType,
144}
145
146/// The (sealed intermediate) result after:
147///
148/// 1. embedding a [`LocalMirPlan`] into a [`MirDataflowDescription`],
149/// 2. transitively inlining referenced views, and
150/// 3. jointly optimizing the `MIR` plans in the [`MirDataflowDescription`].
151#[derive(Clone, Debug)]
152pub struct GlobalMirPlan {
153    df_desc: MirDataflowDescription,
154    df_meta: DataflowMetainfo,
155}
156
157impl GlobalMirPlan {
158    pub fn df_desc(&self) -> &MirDataflowDescription {
159        &self.df_desc
160    }
161}
162
163/// The (final) result after MIR ⇒ LIR lowering and optimizing the resulting
164/// `DataflowDescription` with `LIR` plans.
165#[derive(Clone, Debug)]
166pub struct GlobalLirPlan {
167    df_desc: LirDataflowDescription,
168    df_meta: DataflowMetainfo,
169}
170
171impl GlobalLirPlan {
172    pub fn df_desc(&self) -> &LirDataflowDescription {
173        &self.df_desc
174    }
175
176    pub fn df_meta(&self) -> &DataflowMetainfo {
177        &self.df_meta
178    }
179
180    pub fn desc(&self) -> &RelationDesc {
181        let sink_exports = &self.df_desc.sink_exports;
182        let sink = sink_exports.values().next().expect("valid sink");
183        &sink.from_desc
184    }
185}
186
187impl Optimize<HirRelationExpr> for Optimizer {
188    type To = LocalMirPlan;
189
190    fn optimize(&mut self, expr: HirRelationExpr) -> Result<Self::To, OptimizerError> {
191        let time = Instant::now();
192
193        // Trace the pipeline input under `optimize/raw`.
194        trace_plan!(at: "raw", &expr);
195
196        // HIR ⇒ MIR lowering and decorrelation
197        let mir_expr = expr.clone().lower(&self.config, Some(&self.metrics))?;
198
199        // MIR ⇒ MIR optimization (local)
200        let mut df_meta = DataflowMetainfo::default();
201        let mut transform_ctx = TransformCtx::local(
202            &self.config.features,
203            &self.repr_typecheck_ctx,
204            &mut df_meta,
205            Some(&mut self.metrics),
206            Some(self.view_id),
207        );
208        let mir_expr = optimize_mir_local(mir_expr, &mut transform_ctx)?.into_inner();
209        let typ = infer_sql_type_for_catalog(&expr, &mir_expr);
210
211        self.duration += time.elapsed();
212
213        // Return the (sealed) plan at the end of this optimization step.
214        Ok(LocalMirPlan {
215            expr: mir_expr,
216            df_meta,
217            typ,
218        })
219    }
220}
221
222impl LocalMirPlan {
223    pub fn expr(&self) -> OptimizedMirRelationExpr {
224        OptimizedMirRelationExpr(self.expr.clone())
225    }
226}
227
228/// This is needed only because the pipeline in the bootstrap code starts from an
229/// [`OptimizedMirRelationExpr`] attached to a [`mz_catalog::memory::objects::CatalogItem`].
230impl Optimize<(OptimizedMirRelationExpr, SqlRelationType)> for Optimizer {
231    type To = GlobalMirPlan;
232
233    fn optimize(
234        &mut self,
235        (expr, typ): (OptimizedMirRelationExpr, SqlRelationType),
236    ) -> Result<Self::To, OptimizerError> {
237        let expr = expr.into_inner();
238        let df_meta = DataflowMetainfo::default();
239        self.optimize(LocalMirPlan { expr, df_meta, typ })
240    }
241}
242
243impl Optimize<LocalMirPlan> for Optimizer {
244    type To = GlobalMirPlan;
245
246    fn optimize(&mut self, plan: LocalMirPlan) -> Result<Self::To, OptimizerError> {
247        let time = Instant::now();
248
249        let expr = OptimizedMirRelationExpr(plan.expr);
250        let mut df_meta = plan.df_meta;
251
252        let mut rel_typ = plan.typ;
253        for &i in self.non_null_assertions.iter() {
254            rel_typ.column_types[i].nullable = false;
255        }
256        let rel_desc = RelationDesc::new(rel_typ, self.column_names.clone());
257
258        let mut df_builder = {
259            let compute = self.compute_instance.clone();
260            DataflowBuilder::new(&*self.catalog, compute).with_config(&self.config)
261        };
262        let mut df_desc = MirDataflowDescription::new(self.debug_name.clone());
263
264        df_desc.refresh_schedule.clone_from(&self.refresh_schedule);
265
266        df_builder.import_view_into_dataflow(
267            &self.view_id,
268            &expr,
269            &mut df_desc,
270            &self.config.features,
271        )?;
272        df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
273
274        let sink_description = ComputeSinkDesc {
275            from: self.view_id,
276            from_desc: rel_desc.clone(),
277            connection: ComputeSinkConnection::MaterializedView(MaterializedViewSinkConnection {
278                value_desc: rel_desc,
279                storage_metadata: (),
280            }),
281            with_snapshot: true,
282            up_to: Antichain::default(),
283            non_null_assertions: self.non_null_assertions.clone(),
284            refresh_schedule: self.refresh_schedule.clone(),
285        };
286        df_desc.export_sink(self.sink_id, sink_description);
287
288        // Prepare expressions in the assembled dataflow.
289        let style = ExprPrepMaintained;
290        df_desc.visit_children(
291            |r| style.prep_relation_expr(r),
292            |s| style.prep_scalar_expr(s),
293        )?;
294
295        // Construct TransformCtx for global optimization.
296        let mut transform_ctx = TransformCtx::global(
297            &df_builder,
298            &mz_transform::EmptyStatisticsOracle, // TODO: wire proper stats
299            &self.config.features,
300            &self.repr_typecheck_ctx,
301            &mut df_meta,
302            Some(&mut self.metrics),
303        );
304        // Apply source monotonicity overrides.
305        for id in self.force_source_non_monotonic.iter() {
306            if let Some(import) = df_desc.source_imports.get_mut(id) {
307                import.monotonic = false;
308            }
309        }
310        // Run global optimization.
311        mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
312
313        if self.config.mode == OptimizeMode::Explain {
314            // Collect the list of indexes used by the dataflow at this point.
315            trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
316        }
317
318        self.duration += time.elapsed();
319
320        // Return the (sealed) plan at the end of this optimization step.
321        Ok(GlobalMirPlan { df_desc, df_meta })
322    }
323}
324
325impl Optimize<GlobalMirPlan> for Optimizer {
326    type To = GlobalLirPlan;
327
328    fn optimize(&mut self, plan: GlobalMirPlan) -> Result<Self::To, OptimizerError> {
329        let time = Instant::now();
330
331        let GlobalMirPlan {
332            mut df_desc,
333            df_meta,
334        } = plan;
335
336        // Ensure all expressions are normalized before finalizing.
337        for build in df_desc.objects_to_build.iter_mut() {
338            normalize_lets(&mut build.plan.0, &self.config.features)?
339        }
340
341        // Finalize the dataflow. This includes:
342        // - MIR ⇒ LIR lowering
343        // - LIR ⇒ LIR transforms
344        let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
345
346        // Trace the pipeline output under `optimize`.
347        trace_plan(&df_desc);
348
349        self.duration += time.elapsed();
350        self.metrics
351            .observe_e2e_optimization_time("materialized_view", self.duration);
352
353        // Return the plan at the end of this `optimize` step.
354        Ok(GlobalLirPlan { df_desc, df_meta })
355    }
356}
357
358impl GlobalLirPlan {
359    /// Unwraps the parts of the final result of the optimization pipeline.
360    pub fn unapply(self) -> (LirDataflowDescription, DataflowMetainfo) {
361        (self.df_desc, self.df_meta)
362    }
363}