mz_adapter/optimize/
index.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Optimizer implementation for `CREATE INDEX` statements.
11//!
12//! Note that, in contrast to other optimization pipelines, timestamp selection is not part of
13//! index optimization. Instead users are expected to separately set the as-of on the optimized
14//! `DataflowDescription` received from `GlobalLirPlan::unapply`. Reasons for choosing to exclude
15//! timestamp selection from the index optimization pipeline are:
16//!
17//!  (a) Indexes don't support non-empty `until` frontiers, so they don't provide opportunity for
18//!      optimizations based on the selected timestamp.
19//!  (b) We want to generate dataflow plans early during environment bootstrapping, before we have
20//!      access to all information required for timestamp selection.
21//!
22//! None of this is set in stone though. If we find an opportunity for optimizing indexes based on
23//! their timestamps, we'll want to make timestamp selection part of the index optimization again
24//! and find a different approach to bootstrapping.
25//!
26//! See also MaterializeInc/materialize#22940.
27
28use std::sync::Arc;
29use std::time::{Duration, Instant};
30
31use mz_compute_types::dataflows::IndexDesc;
32use mz_compute_types::plan::Plan;
33use mz_repr::GlobalId;
34use mz_repr::explain::trace_plan;
35use mz_sql::names::QualifiedItemName;
36use mz_sql::optimizer_metrics::OptimizerMetrics;
37use mz_transform::TransformCtx;
38use mz_transform::dataflow::DataflowMetainfo;
39use mz_transform::normalize_lets::normalize_lets;
40use mz_transform::notice::{IndexAlreadyExists, IndexKeyEmpty};
41use mz_transform::reprtypecheck::{
42    SharedContext as ReprTypecheckContext, empty_context as empty_repr_context,
43};
44use mz_transform::typecheck::{SharedContext as TypecheckContext, empty_context};
45
46use crate::optimize::dataflows::{
47    ComputeInstanceSnapshot, DataflowBuilder, ExprPrepStyle, prep_relation_expr, prep_scalar_expr,
48};
49use crate::optimize::{
50    LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerCatalog,
51    OptimizerConfig, OptimizerError, trace_plan,
52};
53
54pub struct Optimizer {
55    /// A typechecking context to use throughout the optimizer pipeline.
56    typecheck_ctx: TypecheckContext,
57    /// A representation typechecking context to use throughout the optimizer pipeline.
58    repr_typecheck_ctx: ReprTypecheckContext,
59    /// A snapshot of the catalog state.
60    catalog: Arc<dyn OptimizerCatalog>,
61    /// A snapshot of the cluster that will run the dataflows.
62    compute_instance: ComputeInstanceSnapshot,
63    /// A durable GlobalId to be used with the exported index arrangement.
64    exported_index_id: GlobalId,
65    /// Optimizer config.
66    config: OptimizerConfig,
67    /// Optimizer metrics.
68    metrics: OptimizerMetrics,
69    /// The time spent performing optimization so far.
70    duration: Duration,
71}
72
73impl Optimizer {
74    pub fn new(
75        catalog: Arc<dyn OptimizerCatalog>,
76        compute_instance: ComputeInstanceSnapshot,
77        exported_index_id: GlobalId,
78        config: OptimizerConfig,
79        metrics: OptimizerMetrics,
80    ) -> Self {
81        Self {
82            typecheck_ctx: empty_context(),
83            repr_typecheck_ctx: empty_repr_context(),
84            catalog,
85            compute_instance,
86            exported_index_id,
87            config,
88            metrics,
89            duration: Default::default(),
90        }
91    }
92}
93
94/// A wrapper of index parts needed to start the optimization process.
95pub struct Index {
96    name: QualifiedItemName,
97    on: GlobalId,
98    keys: Vec<mz_expr::MirScalarExpr>,
99}
100
101impl Index {
102    /// Construct a new [`Index`]. Arguments are recorded as-is.
103    pub fn new(name: QualifiedItemName, on: GlobalId, keys: Vec<mz_expr::MirScalarExpr>) -> Self {
104        Self { name, on, keys }
105    }
106}
107
108/// The (sealed intermediate) result after:
109///
110/// 1. embedding an [`Index`] into a [`MirDataflowDescription`],
111/// 2. transitively inlining referenced views, and
112/// 3. jointly optimizing the `MIR` plans in the [`MirDataflowDescription`].
113#[derive(Clone, Debug)]
114pub struct GlobalMirPlan {
115    df_desc: MirDataflowDescription,
116    df_meta: DataflowMetainfo,
117}
118
119impl GlobalMirPlan {
120    pub fn df_desc(&self) -> &MirDataflowDescription {
121        &self.df_desc
122    }
123}
124
125/// The (final) result after MIR ⇒ LIR lowering and optimizing the resulting
126/// `DataflowDescription` with `LIR` plans.
127#[derive(Clone, Debug)]
128pub struct GlobalLirPlan {
129    df_desc: LirDataflowDescription,
130    df_meta: DataflowMetainfo,
131}
132
133impl GlobalLirPlan {
134    pub fn df_desc(&self) -> &LirDataflowDescription {
135        &self.df_desc
136    }
137
138    pub fn df_meta(&self) -> &DataflowMetainfo {
139        &self.df_meta
140    }
141}
142
143impl Optimize<Index> for Optimizer {
144    type To = GlobalMirPlan;
145
146    fn optimize(&mut self, index: Index) -> Result<Self::To, OptimizerError> {
147        let time = Instant::now();
148
149        let on_entry = self.catalog.get_entry(&index.on);
150        let full_name = self
151            .catalog
152            .resolve_full_name(&index.name, on_entry.conn_id());
153        let on_desc = on_entry
154            .desc(&full_name)
155            .expect("can only create indexes on items with a valid description");
156
157        let mut df_builder = {
158            let compute = self.compute_instance.clone();
159            DataflowBuilder::new(&*self.catalog, compute).with_config(&self.config)
160        };
161        let mut df_desc = MirDataflowDescription::new(full_name.to_string());
162
163        df_builder.import_into_dataflow(&index.on, &mut df_desc, &self.config.features)?;
164        df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
165
166        let index_desc = IndexDesc {
167            on_id: index.on,
168            key: index.keys.clone(),
169        };
170        df_desc.export_index(self.exported_index_id, index_desc, on_desc.typ().clone());
171
172        // Prepare expressions in the assembled dataflow.
173        let style = ExprPrepStyle::Maintained;
174        df_desc.visit_children(
175            |r| prep_relation_expr(r, style),
176            |s| prep_scalar_expr(s, style),
177        )?;
178
179        // Construct TransformCtx for global optimization.
180        let mut df_meta = DataflowMetainfo::default();
181        let mut transform_ctx = TransformCtx::global(
182            &df_builder,
183            &mz_transform::EmptyStatisticsOracle, // TODO: wire proper stats
184            &self.config.features,
185            &self.typecheck_ctx,
186            &self.repr_typecheck_ctx,
187            &mut df_meta,
188            Some(&self.metrics),
189        );
190        // Run global optimization.
191        mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
192
193        if self.config.mode == OptimizeMode::Explain {
194            // Collect the list of indexes used by the dataflow at this point.
195            trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
196        }
197
198        // Emit a notice if we are trying to create an empty index.
199        if index.keys.is_empty() {
200            df_meta.push_optimizer_notice_dedup(IndexKeyEmpty);
201        }
202
203        // Emit a notice for each available index identical to the one we are
204        // currently optimizing.
205        for (index_id, idx) in df_builder
206            .indexes_on(index.on)
207            .filter(|(_id, idx)| idx.keys.as_ref() == &index.keys)
208        {
209            df_meta.push_optimizer_notice_dedup(IndexAlreadyExists {
210                index_id,
211                index_key: idx.keys.to_vec(),
212                index_on_id: idx.on,
213                exported_index_id: self.exported_index_id,
214            });
215        }
216
217        self.duration += time.elapsed();
218
219        // Return the (sealed) plan at the end of this optimization step.
220        Ok(GlobalMirPlan { df_desc, df_meta })
221    }
222}
223
224impl Optimize<GlobalMirPlan> for Optimizer {
225    type To = GlobalLirPlan;
226
227    fn optimize(&mut self, plan: GlobalMirPlan) -> Result<Self::To, OptimizerError> {
228        let time = Instant::now();
229
230        let GlobalMirPlan {
231            mut df_desc,
232            df_meta,
233        } = plan;
234
235        // Ensure all expressions are normalized before finalizing.
236        for build in df_desc.objects_to_build.iter_mut() {
237            normalize_lets(&mut build.plan.0, &self.config.features)?
238        }
239
240        // Finalize the dataflow. This includes:
241        // - MIR ⇒ LIR lowering
242        // - LIR ⇒ LIR transforms
243        let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
244
245        // Trace the pipeline output under `optimize`.
246        trace_plan(&df_desc);
247
248        self.duration += time.elapsed();
249        self.metrics
250            .observe_e2e_optimization_time("index", self.duration);
251
252        // Return the plan at the end of this `optimize` step.
253        Ok(GlobalLirPlan { df_desc, df_meta })
254    }
255}
256
257impl GlobalLirPlan {
258    /// Unwraps the parts of the final result of the optimization pipeline.
259    pub fn unapply(self) -> (LirDataflowDescription, DataflowMetainfo) {
260        (self.df_desc, self.df_meta)
261    }
262}