Skip to main content

mz_adapter/optimize/
index.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Optimizer implementation for `CREATE INDEX` statements.
11//!
12//! Note that, in contrast to other optimization pipelines, timestamp selection is not part of
13//! index optimization. Instead users are expected to separately set the as-of on the optimized
14//! `DataflowDescription` received from `GlobalLirPlan::unapply`. Reasons for choosing to exclude
15//! timestamp selection from the index optimization pipeline are:
16//!
17//!  (a) Indexes don't support non-empty `until` frontiers, so they don't provide opportunity for
18//!      optimizations based on the selected timestamp.
19//!  (b) We want to generate dataflow plans early during environment bootstrapping, before we have
20//!      access to all information required for timestamp selection.
21//!
22//! None of this is set in stone though. If we find an opportunity for optimizing indexes based on
23//! their timestamps, we'll want to make timestamp selection part of the index optimization again
24//! and find a different approach to bootstrapping.
25//!
26//! See also MaterializeInc/materialize#22940.
27
28use std::sync::Arc;
29use std::time::{Duration, Instant};
30
31use mz_compute_types::dataflows::IndexDesc;
32use mz_compute_types::plan::Plan;
33use mz_repr::explain::trace_plan;
34use mz_repr::{GlobalId, ReprRelationType};
35use mz_sql::names::QualifiedItemName;
36use mz_sql::optimizer_metrics::OptimizerMetrics;
37use mz_transform::TransformCtx;
38use mz_transform::dataflow::DataflowMetainfo;
39use mz_transform::normalize_lets::normalize_lets;
40use mz_transform::notice::{IndexAlreadyExists, IndexKeyEmpty};
41use mz_transform::typecheck::{SharedTypecheckingContext, empty_typechecking_context};
42
43use crate::optimize::dataflows::{
44    ComputeInstanceSnapshot, DataflowBuilder, ExprPrep, ExprPrepMaintained,
45};
46use crate::optimize::{
47    LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerCatalog,
48    OptimizerConfig, OptimizerError, trace_plan,
49};
50
51pub struct Optimizer {
52    /// A representation typechecking context to use throughout the optimizer pipeline.
53    typecheck_ctx: SharedTypecheckingContext,
54    /// A snapshot of the catalog state.
55    catalog: Arc<dyn OptimizerCatalog>,
56    /// A snapshot of the cluster that will run the dataflows.
57    compute_instance: ComputeInstanceSnapshot,
58    /// A durable GlobalId to be used with the exported index arrangement.
59    exported_index_id: GlobalId,
60    /// Optimizer config.
61    config: OptimizerConfig,
62    /// Optimizer metrics.
63    metrics: OptimizerMetrics,
64    /// The time spent performing optimization so far.
65    duration: Duration,
66}
67
68impl Optimizer {
69    pub fn new(
70        catalog: Arc<dyn OptimizerCatalog>,
71        compute_instance: ComputeInstanceSnapshot,
72        exported_index_id: GlobalId,
73        config: OptimizerConfig,
74        metrics: OptimizerMetrics,
75    ) -> Self {
76        Self {
77            typecheck_ctx: empty_typechecking_context(),
78            catalog,
79            compute_instance,
80            exported_index_id,
81            config,
82            metrics,
83            duration: Default::default(),
84        }
85    }
86}
87
88/// A wrapper of index parts needed to start the optimization process.
89pub struct Index {
90    name: QualifiedItemName,
91    on: GlobalId,
92    keys: Vec<mz_expr::MirScalarExpr>,
93}
94
95impl Index {
96    /// Construct a new [`Index`]. Arguments are recorded as-is.
97    pub fn new(name: QualifiedItemName, on: GlobalId, keys: Vec<mz_expr::MirScalarExpr>) -> Self {
98        Self { name, on, keys }
99    }
100}
101
102/// The (sealed intermediate) result after:
103///
104/// 1. embedding an [`Index`] into a [`MirDataflowDescription`],
105/// 2. transitively inlining referenced views, and
106/// 3. jointly optimizing the `MIR` plans in the [`MirDataflowDescription`].
107#[derive(Clone, Debug)]
108pub struct GlobalMirPlan {
109    df_desc: MirDataflowDescription,
110    df_meta: DataflowMetainfo,
111}
112
113impl GlobalMirPlan {
114    pub fn df_desc(&self) -> &MirDataflowDescription {
115        &self.df_desc
116    }
117}
118
119/// The (final) result after MIR ⇒ LIR lowering and optimizing the resulting
120/// `DataflowDescription` with `LIR` plans.
121#[derive(Clone, Debug)]
122pub struct GlobalLirPlan {
123    df_desc: LirDataflowDescription,
124    df_meta: DataflowMetainfo,
125}
126
127impl GlobalLirPlan {
128    pub fn df_desc(&self) -> &LirDataflowDescription {
129        &self.df_desc
130    }
131
132    pub fn df_meta(&self) -> &DataflowMetainfo {
133        &self.df_meta
134    }
135}
136
137impl Optimize<Index> for Optimizer {
138    type To = GlobalMirPlan;
139
140    fn optimize(&mut self, index: Index) -> Result<Self::To, OptimizerError> {
141        let time = Instant::now();
142
143        let on_entry = self.catalog.get_entry(&index.on);
144        let full_name = self
145            .catalog
146            .resolve_full_name(&index.name, on_entry.conn_id());
147        let on_desc = on_entry
148            .relation_desc()
149            .expect("can only create indexes on items with a valid description");
150
151        let mut df_builder = {
152            let compute = self.compute_instance.clone();
153            DataflowBuilder::new(&*self.catalog, compute).with_config(&self.config)
154        };
155        let mut df_desc = MirDataflowDescription::new(full_name.to_string());
156
157        df_builder.import_into_dataflow(&index.on, &mut df_desc, &self.config.features)?;
158        df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
159
160        let index_desc = IndexDesc {
161            on_id: index.on,
162            key: index.keys.clone(),
163        };
164        df_desc.export_index(
165            self.exported_index_id,
166            index_desc,
167            ReprRelationType::from(on_desc.typ()),
168        );
169
170        // Prepare expressions in the assembled dataflow.
171        let style = ExprPrepMaintained;
172        df_desc.visit_children(
173            |r| style.prep_relation_expr(r),
174            |s| style.prep_scalar_expr(s),
175        )?;
176
177        // Construct TransformCtx for global optimization.
178        let mut df_meta = DataflowMetainfo::default();
179        let mut transform_ctx = TransformCtx::global(
180            &df_builder,
181            &mz_transform::EmptyStatisticsOracle, // TODO: wire proper stats
182            &self.config.features,
183            &self.typecheck_ctx,
184            &mut df_meta,
185            Some(&mut self.metrics),
186        );
187        // Run global optimization.
188        mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
189
190        if self.config.mode == OptimizeMode::Explain {
191            // Collect the list of indexes used by the dataflow at this point.
192            trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
193        }
194
195        // Emit a notice if we are trying to create an empty index.
196        if index.keys.is_empty() {
197            df_meta.push_optimizer_notice_dedup(IndexKeyEmpty);
198        }
199
200        // Emit a notice for each available index identical to the one we are
201        // currently optimizing.
202        for (index_id, idx) in df_builder
203            .indexes_on(index.on)
204            .filter(|(_id, idx)| idx.keys.as_ref() == &index.keys)
205        {
206            df_meta.push_optimizer_notice_dedup(IndexAlreadyExists {
207                index_id,
208                index_key: idx.keys.to_vec(),
209                index_on_id: idx.on,
210                exported_index_id: self.exported_index_id,
211            });
212        }
213
214        self.duration += time.elapsed();
215
216        // Return the (sealed) plan at the end of this optimization step.
217        Ok(GlobalMirPlan { df_desc, df_meta })
218    }
219}
220
221impl Optimize<GlobalMirPlan> for Optimizer {
222    type To = GlobalLirPlan;
223
224    fn optimize(&mut self, plan: GlobalMirPlan) -> Result<Self::To, OptimizerError> {
225        let time = Instant::now();
226
227        let GlobalMirPlan {
228            mut df_desc,
229            df_meta,
230        } = plan;
231
232        // Ensure all expressions are normalized before finalizing.
233        for build in df_desc.objects_to_build.iter_mut() {
234            normalize_lets(&mut build.plan.0, &self.config.features)?
235        }
236
237        // Finalize the dataflow. This includes:
238        // - MIR ⇒ LIR lowering
239        // - LIR ⇒ LIR transforms
240        let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
241
242        // Trace the pipeline output under `optimize`.
243        trace_plan(&df_desc);
244
245        self.duration += time.elapsed();
246        self.metrics
247            .observe_e2e_optimization_time("index", self.duration);
248
249        // Return the plan at the end of this `optimize` step.
250        Ok(GlobalLirPlan { df_desc, df_meta })
251    }
252}
253
254impl GlobalLirPlan {
255    /// Unwraps the parts of the final result of the optimization pipeline.
256    pub fn unapply(self) -> (LirDataflowDescription, DataflowMetainfo) {
257        (self.df_desc, self.df_meta)
258    }
259}