mz_adapter/optimize/
materialized_view.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Optimizer implementation for `CREATE MATERIALIZED VIEW` statements.
11//!
12//! Note that, in contrast to other optimization pipelines, timestamp selection is not part of
13//! MV optimization. Instead users are expected to separately set the as-of on the optimized
14//! `DataflowDescription` received from `GlobalLirPlan::unapply`. Reasons for choosing to exclude
15//! timestamp selection from the MV optimization pipeline are:
16//!
17//!  (a) MVs don't support non-empty `until` frontiers, so they don't provide opportunity for
18//!      optimizations based on the selected timestamp.
19//!  (b) We want to generate dataflow plans early during environment bootstrapping, before we have
20//!      access to all information required for timestamp selection.
21//!
22//! None of this is set in stone though. If we find an opportunity for optimizing MVs based on
23//! their timestamps, we'll want to make timestamp selection part of the MV optimization again and
24//! find a different approach to bootstrapping.
25//!
26//! See also MaterializeInc/materialize#22940.
27
28use std::collections::BTreeSet;
29use std::sync::Arc;
30use std::time::{Duration, Instant};
31
32use mz_compute_types::plan::Plan;
33use mz_compute_types::sinks::{
34    ComputeSinkConnection, ComputeSinkDesc, MaterializedViewSinkConnection,
35};
36use mz_expr::{MirRelationExpr, OptimizedMirRelationExpr};
37use mz_repr::explain::trace_plan;
38use mz_repr::refresh_schedule::RefreshSchedule;
39use mz_repr::{ColumnName, GlobalId, RelationDesc};
40use mz_sql::optimizer_metrics::OptimizerMetrics;
41use mz_sql::plan::HirRelationExpr;
42use mz_transform::TransformCtx;
43use mz_transform::dataflow::DataflowMetainfo;
44use mz_transform::normalize_lets::normalize_lets;
45use mz_transform::reprtypecheck::{
46    SharedContext as ReprTypecheckContext, empty_context as empty_repr_context,
47};
48use mz_transform::typecheck::{SharedContext as TypecheckContext, empty_context};
49use timely::progress::Antichain;
50
51use crate::optimize::dataflows::{
52    ComputeInstanceSnapshot, DataflowBuilder, ExprPrepStyle, prep_relation_expr, prep_scalar_expr,
53};
54use crate::optimize::{
55    LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerCatalog,
56    OptimizerConfig, OptimizerError, optimize_mir_local, trace_plan,
57};
58
59pub struct Optimizer {
60    /// A typechecking context to use throughout the optimizer pipeline.
61    typecheck_ctx: TypecheckContext,
62    /// A representation typechecking context to use throughout the optimizer pipeline.
63    repr_typecheck_ctx: ReprTypecheckContext,
64    /// A snapshot of the catalog state.
65    catalog: Arc<dyn OptimizerCatalog>,
66    /// A snapshot of the cluster that will run the dataflows.
67    compute_instance: ComputeInstanceSnapshot,
68    /// A durable GlobalId to be used with the exported materialized view sink.
69    sink_id: GlobalId,
70    /// A transient GlobalId to be used when constructing the dataflow.
71    view_id: GlobalId,
72    /// The resulting column names.
73    column_names: Vec<ColumnName>,
74    /// Output columns that are asserted to be not null in the `CREATE VIEW`
75    /// statement.
76    non_null_assertions: Vec<usize>,
77    /// Refresh schedule, e.g., `REFRESH EVERY '1 day'`
78    refresh_schedule: Option<RefreshSchedule>,
79    /// A human-readable name exposed internally (useful for debugging).
80    debug_name: String,
81    /// Optimizer config.
82    config: OptimizerConfig,
83    /// Optimizer metrics.
84    metrics: OptimizerMetrics,
85    /// The time spent performing optimization so far.
86    duration: Duration,
87    /// Overrides monotonicity for the given source collections.
88    ///
89    /// This is here only for continual tasks, which at runtime introduce
90    /// synthetic retractions to "input sources". If/when we split a CT
91    /// optimizer out of the MV optimizer, this can be removed.
92    ///
93    /// TODO(ct3): There are other differences between a GlobalId used as a CT
94    /// input vs as a normal collection, such as the statistical size estimates.
95    /// Plus, at the moment, it is not possible to use the same GlobalId as both
96    /// an "input" and a "reference" in a CT. So, better than this approach
97    /// would be for the optimizer itself to somehow understand the distinction
98    /// between a CT input and a normal collection.
99    ///
100    /// In the meantime, it might be desirable to refactor the MV optimizer to
101    /// have a small amount of knowledge about CTs, in particular producing the
102    /// CT sink connection directly. This would allow us to replace this field
103    /// with something derived directly from that sink connection.
104    force_source_non_monotonic: BTreeSet<GlobalId>,
105}
106
107impl Optimizer {
108    pub fn new(
109        catalog: Arc<dyn OptimizerCatalog>,
110        compute_instance: ComputeInstanceSnapshot,
111        sink_id: GlobalId,
112        view_id: GlobalId,
113        column_names: Vec<ColumnName>,
114        non_null_assertions: Vec<usize>,
115        refresh_schedule: Option<RefreshSchedule>,
116        debug_name: String,
117        config: OptimizerConfig,
118        metrics: OptimizerMetrics,
119        force_source_non_monotonic: BTreeSet<GlobalId>,
120    ) -> Self {
121        Self {
122            typecheck_ctx: empty_context(),
123            repr_typecheck_ctx: empty_repr_context(),
124            catalog,
125            compute_instance,
126            sink_id,
127            view_id,
128            column_names,
129            non_null_assertions,
130            refresh_schedule,
131            debug_name,
132            config,
133            metrics,
134            duration: Default::default(),
135            force_source_non_monotonic,
136        }
137    }
138}
139
140/// The (sealed intermediate) result after HIR ⇒ MIR lowering and decorrelation
141/// and MIR optimization.
142#[derive(Clone, Debug)]
143pub struct LocalMirPlan {
144    expr: MirRelationExpr,
145    df_meta: DataflowMetainfo,
146}
147
148/// The (sealed intermediate) result after:
149///
150/// 1. embedding a [`LocalMirPlan`] into a [`MirDataflowDescription`],
151/// 2. transitively inlining referenced views, and
152/// 3. jointly optimizing the `MIR` plans in the [`MirDataflowDescription`].
153#[derive(Clone, Debug)]
154pub struct GlobalMirPlan {
155    df_desc: MirDataflowDescription,
156    df_meta: DataflowMetainfo,
157}
158
159impl GlobalMirPlan {
160    pub fn df_desc(&self) -> &MirDataflowDescription {
161        &self.df_desc
162    }
163}
164
165/// The (final) result after MIR ⇒ LIR lowering and optimizing the resulting
166/// `DataflowDescription` with `LIR` plans.
167#[derive(Clone, Debug)]
168pub struct GlobalLirPlan {
169    df_desc: LirDataflowDescription,
170    df_meta: DataflowMetainfo,
171}
172
173impl GlobalLirPlan {
174    pub fn df_desc(&self) -> &LirDataflowDescription {
175        &self.df_desc
176    }
177
178    pub fn df_meta(&self) -> &DataflowMetainfo {
179        &self.df_meta
180    }
181
182    pub fn desc(&self) -> &RelationDesc {
183        let sink_exports = &self.df_desc.sink_exports;
184        let sink = sink_exports.values().next().expect("valid sink");
185        &sink.from_desc
186    }
187}
188
189impl Optimize<HirRelationExpr> for Optimizer {
190    type To = LocalMirPlan;
191
192    fn optimize(&mut self, expr: HirRelationExpr) -> Result<Self::To, OptimizerError> {
193        let time = Instant::now();
194
195        // Trace the pipeline input under `optimize/raw`.
196        trace_plan!(at: "raw", &expr);
197
198        // HIR ⇒ MIR lowering and decorrelation
199        let expr = expr.lower(&self.config, Some(&self.metrics))?;
200
201        // MIR ⇒ MIR optimization (local)
202        let mut df_meta = DataflowMetainfo::default();
203        let mut transform_ctx = TransformCtx::local(
204            &self.config.features,
205            &self.typecheck_ctx,
206            &self.repr_typecheck_ctx,
207            &mut df_meta,
208            Some(&self.metrics),
209            Some(self.view_id),
210        );
211        let expr = optimize_mir_local(expr, &mut transform_ctx)?.into_inner();
212
213        self.duration += time.elapsed();
214
215        // Return the (sealed) plan at the end of this optimization step.
216        Ok(LocalMirPlan { expr, df_meta })
217    }
218}
219
220impl LocalMirPlan {
221    pub fn expr(&self) -> OptimizedMirRelationExpr {
222        OptimizedMirRelationExpr(self.expr.clone())
223    }
224}
225
226/// This is needed only because the pipeline in the bootstrap code starts from an
227/// [`OptimizedMirRelationExpr`] attached to a [`mz_catalog::memory::objects::CatalogItem`].
228impl Optimize<OptimizedMirRelationExpr> for Optimizer {
229    type To = GlobalMirPlan;
230
231    fn optimize(&mut self, expr: OptimizedMirRelationExpr) -> Result<Self::To, OptimizerError> {
232        let expr = expr.into_inner();
233        let df_meta = DataflowMetainfo::default();
234        self.optimize(LocalMirPlan { expr, df_meta })
235    }
236}
237
238impl Optimize<LocalMirPlan> for Optimizer {
239    type To = GlobalMirPlan;
240
241    fn optimize(&mut self, plan: LocalMirPlan) -> Result<Self::To, OptimizerError> {
242        let time = Instant::now();
243
244        let expr = OptimizedMirRelationExpr(plan.expr);
245        let mut df_meta = plan.df_meta;
246
247        let mut rel_typ = expr.typ();
248        for &i in self.non_null_assertions.iter() {
249            rel_typ.column_types[i].nullable = false;
250        }
251        let rel_desc = RelationDesc::new(rel_typ, self.column_names.clone());
252
253        let mut df_builder = {
254            let compute = self.compute_instance.clone();
255            DataflowBuilder::new(&*self.catalog, compute).with_config(&self.config)
256        };
257        let mut df_desc = MirDataflowDescription::new(self.debug_name.clone());
258
259        df_desc.refresh_schedule.clone_from(&self.refresh_schedule);
260
261        df_builder.import_view_into_dataflow(
262            &self.view_id,
263            &expr,
264            &mut df_desc,
265            &self.config.features,
266        )?;
267        df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
268
269        let sink_description = ComputeSinkDesc {
270            from: self.view_id,
271            from_desc: rel_desc.clone(),
272            connection: ComputeSinkConnection::MaterializedView(MaterializedViewSinkConnection {
273                value_desc: rel_desc,
274                storage_metadata: (),
275            }),
276            with_snapshot: true,
277            up_to: Antichain::default(),
278            non_null_assertions: self.non_null_assertions.clone(),
279            refresh_schedule: self.refresh_schedule.clone(),
280        };
281        df_desc.export_sink(self.sink_id, sink_description);
282
283        // Prepare expressions in the assembled dataflow.
284        let style = ExprPrepStyle::Maintained;
285        df_desc.visit_children(
286            |r| prep_relation_expr(r, style),
287            |s| prep_scalar_expr(s, style),
288        )?;
289
290        // Construct TransformCtx for global optimization.
291        let mut transform_ctx = TransformCtx::global(
292            &df_builder,
293            &mz_transform::EmptyStatisticsOracle, // TODO: wire proper stats
294            &self.config.features,
295            &self.typecheck_ctx,
296            &self.repr_typecheck_ctx,
297            &mut df_meta,
298            Some(&self.metrics),
299        );
300        // Apply source monotonicity overrides.
301        for id in self.force_source_non_monotonic.iter() {
302            if let Some((_desc, monotonic, _upper)) = df_desc.source_imports.get_mut(id) {
303                *monotonic = false;
304            }
305        }
306        // Run global optimization.
307        mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
308
309        if self.config.mode == OptimizeMode::Explain {
310            // Collect the list of indexes used by the dataflow at this point.
311            trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
312        }
313
314        self.duration += time.elapsed();
315
316        // Return the (sealed) plan at the end of this optimization step.
317        Ok(GlobalMirPlan { df_desc, df_meta })
318    }
319}
320
321impl Optimize<GlobalMirPlan> for Optimizer {
322    type To = GlobalLirPlan;
323
324    fn optimize(&mut self, plan: GlobalMirPlan) -> Result<Self::To, OptimizerError> {
325        let time = Instant::now();
326
327        let GlobalMirPlan {
328            mut df_desc,
329            df_meta,
330        } = plan;
331
332        // Ensure all expressions are normalized before finalizing.
333        for build in df_desc.objects_to_build.iter_mut() {
334            normalize_lets(&mut build.plan.0, &self.config.features)?
335        }
336
337        // Finalize the dataflow. This includes:
338        // - MIR ⇒ LIR lowering
339        // - LIR ⇒ LIR transforms
340        let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
341
342        // Trace the pipeline output under `optimize`.
343        trace_plan(&df_desc);
344
345        self.duration += time.elapsed();
346        self.metrics
347            .observe_e2e_optimization_time("materialized_view", self.duration);
348
349        // Return the plan at the end of this `optimize` step.
350        Ok(GlobalLirPlan { df_desc, df_meta })
351    }
352}
353
354impl GlobalLirPlan {
355    /// Unwraps the parts of the final result of the optimization pipeline.
356    pub fn unapply(self) -> (LirDataflowDescription, DataflowMetainfo) {
357        (self.df_desc, self.df_meta)
358    }
359}