Skip to main content

mz_adapter/
optimize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Optimizer interface to the adapter and coordinator code.
11//!
12//! The goal of this crate is to abstract optimizer specifics behind a
13//! high-level interface that is ready to be consumed by the coordinator code in
14//! a future-proof way (that is, the API is taking the upcoming evolution of
15//! these components into account).
16//!
17//! The contents of this crate should have minimal dependencies to the rest of
18//! the coordinator code so we can pull them out as a separate crate in the
19//! future without too much effort.
20//!
21//! The main type in this module is a very simple [`Optimize`] trait which
22//! allows us to adhere to the following principles:
23//!
24//! - Implementors of this trait are structs that encapsulate all context
25//!   required to optimize a statement of type `T` end-to-end (for example
26//!   [`materialized_view::Optimizer`] for `T` = `MaterializedView`).
27//! - Each struct implements [`Optimize`] once for each optimization stage. The
28//!   `From` type represents the input of the stage and `Self::To` the
29//!   associated stage output. This allows to have more than one entrypoints to
30//!   a pipeline.
31//! - The concrete types used for stage results are opaque structs that are
32//!   specific to the pipeline of that statement type.
33//!   - We use different structs even if two statement types might have
34//!     structurally identical intermediate results. This ensures that client
35//!     code cannot first execute some optimization stages for one type and then
36//!     some stages for a different type.
37//!   - The only way to construct such a struct is by running the [`Optimize`]
38//!     stage that produces it. This ensures that client code cannot interfere
39//!     with the pipeline.
40//!   - In general, the internals of these structs can be accessed only behind a
41//!     shared reference. This ensures that client code can look up information
42//!     from intermediate stages but cannot modify it.
43//!   - Timestamp selection is modeled as a conversion between structs that are
44//!     adjacent in the pipeline using a method called `resolve`.
45//!   - The struct representing the result of the final stage of the
46//!     optimization pipeline can be destructed to access its internals with a
47//!     method called `unapply`.
48//! - The `Send` trait bounds on the `Self` and `From` types ensure that
49//!   [`Optimize`] instances can be passed to different threads (this is
50//!   required of off-thread optimization).
51//!
52//! For details, see the `20230714_optimizer_interface.md` design doc in this
53//! repository.
54
55pub mod copy_to;
56pub mod dataflows;
57pub mod index;
58pub mod materialized_view;
59pub mod peek;
60pub mod subscribe;
61pub mod view;
62
63use std::fmt::Debug;
64
65use mz_adapter_types::connection::ConnectionId;
66use mz_adapter_types::dyncfgs::PERSIST_FAST_PATH_ORDER;
67use mz_catalog::memory::objects::{CatalogCollectionEntry, CatalogEntry, Index};
68use mz_compute_types::ComputeInstanceId;
69use mz_compute_types::dataflows::DataflowDescription;
70use mz_compute_types::dyncfgs::SUBSCRIBE_SNAPSHOT_OPTIMIZATION;
71use mz_compute_types::plan::Plan;
72use mz_controller_types::ClusterId;
73use mz_expr::{EvalError, MirRelationExpr, OptimizedMirRelationExpr, UnmaterializableFunc};
74use mz_ore::stack::RecursionLimitError;
75use mz_repr::adt::timestamp::TimestampError;
76use mz_repr::optimize::{OptimizerFeatureOverrides, OptimizerFeatures, OverrideFrom};
77use mz_repr::{CatalogItemId, GlobalId};
78use mz_sql::names::{FullItemName, QualifiedItemName};
79use mz_sql::plan::{HirRelationExpr, PlanError};
80use mz_sql::session::metadata::SessionMetadata;
81use mz_sql::session::vars::SystemVars;
82use mz_transform::{MaybeShouldPanic, StatisticsOracle, TransformCtx, TransformError};
83
84use crate::TimestampContext;
85
86// Alias types
87// -----------
88
89/// A type for a [`DataflowDescription`] backed by `Mir~` plans. Used internally
90/// by the optimizer implementations.
91type MirDataflowDescription = DataflowDescription<OptimizedMirRelationExpr>;
92/// A type for a [`DataflowDescription`] backed by `Lir~` plans. Used internally
93/// by the optimizer implementations.
94type LirDataflowDescription = DataflowDescription<Plan>;
95
96// Core API
97// --------
98
99/// A trait that represents an optimization stage.
100///
101/// The trait is implemented by structs that encapsulate the context needed to
102/// run an end-to-end optimization pipeline for a specific statement type
103/// (`Index`, `View`, `MaterializedView`, `Subscribe`, `Select`).
104///
105/// Each implementation represents a concrete optimization stage for a fixed
106/// statement type that consumes an input of type `From` and produces output of
107/// type `Self::To`.
108pub trait Optimize<From> {
109    type To;
110
111    /// Execute the optimization stage, transforming the input plan of type
112    /// `From` to an output plan of type `To`.
113    fn optimize(&mut self, plan: From) -> Result<Self::To, OptimizerError>;
114
115    /// Like [`Self::optimize`], but additionally ensures that panics occurring
116    /// in the [`Self::optimize`] call are caught and demoted to an
117    /// [`OptimizerError::Internal`] error.
118    ///
119    /// Additionally, if the result of the optimization is an error (not a panic) that indicates we
120    /// should panic, then panic.
121    #[mz_ore::instrument(target = "optimizer", level = "debug", name = "optimize")]
122    fn catch_unwind_optimize(&mut self, plan: From) -> Result<Self::To, OptimizerError> {
123        mz_transform::catch_unwind_optimize(|| self.optimize(plan))
124    }
125}
126
127// One-shot peek optimizer dispatch
128// --------------------------------
129
130/// The optimizer driving a one-shot statement through the peek sequencing state
131/// machine.
132///
133/// `SELECT` and `EXPLAIN` are optimized by the [`peek::Optimizer`], while `COPY
134/// TO` is optimized by the [`copy_to::Optimizer`]. Both share the same
135/// surrounding state machine (timestamp selection, read holds, off-thread
136/// optimization, …), so this enum lets that shared machinery carry either
137/// optimizer without caring which one it is. The variants are kept distinct
138/// (rather than abstracted behind a trait object) because the downstream stages
139/// need to recover the concrete optimizer and its statement-specific result.
140#[derive(Debug)]
141pub enum PeekOptimizer {
142    /// Optimizer for `SELECT` and `EXPLAIN` statements.
143    Select(peek::Optimizer),
144    /// Optimizer for `COPY TO` statements.
145    CopyTo(copy_to::Optimizer),
146}
147
148/// The global LIR plan produced by [`PeekOptimizer::optimize`], tagged with the
149/// path that produced it.
150#[derive(Debug)]
151pub enum PeekGlobalLirPlan {
152    /// The result of the `SELECT`/`EXPLAIN` pipeline.
153    Select(peek::GlobalLirPlan),
154    /// The result of the `COPY TO` pipeline.
155    CopyTo(copy_to::GlobalLirPlan),
156}
157
158impl PeekOptimizer {
159    /// The cluster that will run the optimized dataflow.
160    pub fn cluster_id(&self) -> ComputeInstanceId {
161        match self {
162            PeekOptimizer::Select(optimizer) => optimizer.cluster_id(),
163            PeekOptimizer::CopyTo(optimizer) => optimizer.cluster_id(),
164        }
165    }
166
167    /// Runs the full one-shot optimization pipeline end-to-end:
168    ///
169    /// 1. HIR ⇒ MIR lowering and local MIR optimization,
170    /// 2. timestamp resolution,
171    /// 3. global MIR optimization, MIR ⇒ LIR lowering, and global LIR
172    ///    optimization.
173    ///
174    /// The pipeline shape is identical for both variants; only the concrete
175    /// (statement-specific) plan types differ, so the steps are shared via
176    /// [`optimize_oneshot`].
177    pub fn optimize(
178        &mut self,
179        raw_expr: HirRelationExpr,
180        timestamp_ctx: TimestampContext,
181        session: &dyn SessionMetadata,
182        stats: Box<dyn StatisticsOracle>,
183    ) -> Result<PeekGlobalLirPlan, OptimizerError> {
184        match self {
185            PeekOptimizer::Select(optimizer) => {
186                let plan = optimize_oneshot(optimizer, raw_expr, |local_mir_plan| {
187                    local_mir_plan.resolve(timestamp_ctx, session, stats)
188                })?;
189                Ok(PeekGlobalLirPlan::Select(plan))
190            }
191            PeekOptimizer::CopyTo(optimizer) => {
192                let plan = optimize_oneshot(optimizer, raw_expr, |local_mir_plan| {
193                    local_mir_plan.resolve(timestamp_ctx, session, stats)
194                })?;
195                Ok(PeekGlobalLirPlan::CopyTo(plan))
196            }
197        }
198    }
199
200    /// Consumes `self`, returning the inner [`peek::Optimizer`] if this is the
201    /// `SELECT`/`EXPLAIN` path and `None` otherwise.
202    pub fn into_select(self) -> Option<peek::Optimizer> {
203        match self {
204            PeekOptimizer::Select(optimizer) => Some(optimizer),
205            PeekOptimizer::CopyTo(_) => None,
206        }
207    }
208
209    /// Consumes `self`, returning the inner [`copy_to::Optimizer`] if this is
210    /// the `COPY TO` path and `None` otherwise.
211    pub fn into_copy_to(self) -> Option<copy_to::Optimizer> {
212        match self {
213            PeekOptimizer::CopyTo(optimizer) => Some(optimizer),
214            PeekOptimizer::Select(_) => None,
215        }
216    }
217}
218
219/// Runs the shared one-shot optimization pipeline for a single optimizer.
220///
221/// This factors out the (otherwise duplicated) HIR ⇒ local MIR ⇒ resolve ⇒
222/// global LIR sequence that is common to the `SELECT`/`EXPLAIN` and `COPY TO`
223/// paths. The `resolve` closure attaches the timestamp/session/stats context to
224/// the local plan; it is path-specific only in the concrete plan type it
225/// operates on.
226fn optimize_oneshot<O, LocalPlan, ResolvedPlan, GlobalPlan>(
227    optimizer: &mut O,
228    raw_expr: HirRelationExpr,
229    resolve: impl FnOnce(LocalPlan) -> ResolvedPlan,
230) -> Result<GlobalPlan, OptimizerError>
231where
232    O: Optimize<HirRelationExpr, To = LocalPlan> + Optimize<ResolvedPlan, To = GlobalPlan>,
233{
234    // HIR ⇒ MIR lowering and MIR optimization (local).
235    let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr)?;
236    // Attach resolved context required to continue the pipeline.
237    let resolved_mir_plan = resolve(local_mir_plan);
238    // MIR optimization (global), MIR ⇒ LIR lowering, and LIR optimization (global).
239    optimizer.catch_unwind_optimize(resolved_mir_plan)
240}
241
242// Optimizer configuration
243// -----------------------
244
245/// Feature flags for the optimizer.
246///
247/// To add a new feature flag, do the following steps:
248///
249/// 1. To make the flag available to all stages in our [`Optimize`] pipelines
250///    and allow engineers to set a system-wide override:
251///    1. Add the flag to the `optimizer_feature_flags!(...)` macro call.
252///    2. Add the flag to the `feature_flags!(...)` macro call and extend the
253///       `From<&SystemVars>` implementation for [`OptimizerFeatures`].
254///
255/// 2. To enable `EXPLAIN ... WITH(...)` overrides which will allow engineers to
256///    inspect plan differences before deploying the optimizer changes:
257///    1. Add the flag to the `ExplainPlanOptionName` definition.
258///    2. Add the flag to the `generate_extracted_config!(ExplainPlanOption,
259///       ...)` macro call.
260///    3. Extend the `TryFrom<ExplainPlanOptionExtracted>` implementation for
261///       [`mz_repr::explain::ExplainConfig`].
262///
263/// 3. To enable `CLUSTER ... FEATURES(...)` overrides which will allow
264///    engineers to experiment with runtime differences before deploying the
265///    optimizer changes:
266///    1. Add the flag to the `ClusterFeatureName` definition.
267///    2. Add the flag to the `generate_extracted_config!(ClusterFeature, ...)`
268///       macro call.
269///    3. Extend the `let optimizer_feature_overrides = ...` call in
270///       `plan_create_cluster`.
271#[derive(Clone, Debug)]
272pub struct OptimizerConfig {
273    /// The mode in which the optimizer runs.
274    pub mode: OptimizeMode,
275    /// If the [`GlobalId`] is set the optimizer works in "replan" mode.
276    ///
277    /// This means that it will not consider catalog items (more specifically
278    /// indexes) with [`GlobalId`] greater or equal than the one provided here.
279    pub replan: Option<GlobalId>,
280    /// Show the slow path plan even if a fast path plan was created. Useful for debugging.
281    /// Enforced if `timing` is set.
282    pub no_fast_path: bool,
283    // If set, allow some additional queries down the Persist fast path when we believe
284    // the orderings are compatible.
285    persist_fast_path_order: bool,
286    // Enable calculating with_snapshot metadata for subscribes.
287    subscribe_snapshot_optimization: bool,
288    /// Optimizer feature flags.
289    pub features: OptimizerFeatures,
290}
291
292#[derive(Clone, Debug, PartialEq, Eq)]
293pub enum OptimizeMode {
294    /// A mode where the optimized statement is executed.
295    Execute,
296    /// A mode where the optimized statement is explained.
297    Explain,
298}
299
300impl From<&SystemVars> for OptimizerConfig {
301    fn from(vars: &SystemVars) -> Self {
302        Self {
303            mode: OptimizeMode::Execute,
304            replan: None,
305            no_fast_path: false,
306            persist_fast_path_order: PERSIST_FAST_PATH_ORDER.get(vars.dyncfgs()),
307            subscribe_snapshot_optimization: SUBSCRIBE_SNAPSHOT_OPTIMIZATION.get(vars.dyncfgs()),
308            features: OptimizerFeatures::from(vars),
309        }
310    }
311}
312
313/// Override [`OptimizerConfig::features`] from [`OptimizerFeatureOverrides`].
314impl OverrideFrom<OptimizerFeatureOverrides> for OptimizerConfig {
315    fn override_from(mut self, overrides: &OptimizerFeatureOverrides) -> Self {
316        self.features = self.features.override_from(overrides);
317        self
318    }
319}
320
321/// [`OptimizerConfig`] overrides coming from an [`ExplainContext`].
322impl OverrideFrom<ExplainContext> for OptimizerConfig {
323    fn override_from(mut self, ctx: &ExplainContext) -> Self {
324        let ExplainContext::Plan(ctx) = ctx else {
325            return self; // Return immediately for all other contexts.
326        };
327
328        // Override general parameters.
329        self.mode = OptimizeMode::Explain;
330        self.replan = ctx.replan;
331        self.no_fast_path = ctx.config.no_fast_path;
332
333        // Override feature flags that can be enabled in the EXPLAIN config.
334        self.features = self.features.override_from(&ctx.config.features);
335
336        // Return the final result.
337        self
338    }
339}
340
341impl From<&OptimizerConfig> for mz_sql::plan::HirToMirConfig {
342    fn from(config: &OptimizerConfig) -> Self {
343        Self {
344            enable_new_outer_join_lowering: config.features.enable_new_outer_join_lowering,
345            enable_variadic_left_join_lowering: config.features.enable_variadic_left_join_lowering,
346            enable_cast_elimination: config.features.enable_cast_elimination,
347            enable_simplify_quantified_comparisons: config
348                .features
349                .enable_simplify_quantified_comparisons,
350        }
351    }
352}
353
354// OptimizerCatalog
355// ===============
356
357pub trait OptimizerCatalog: Debug + Send + Sync {
358    fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry;
359    fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry;
360    fn resolve_full_name(
361        &self,
362        name: &QualifiedItemName,
363        conn_id: Option<&ConnectionId>,
364    ) -> FullItemName;
365
366    /// Returns all indexes on the given object and cluster known in the
367    /// catalog.
368    fn get_indexes_on(
369        &self,
370        id: GlobalId,
371        cluster: ClusterId,
372    ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_>;
373}
374
375// OptimizerError
376// ===============
377
378/// Error types that can be generated during optimization.
379#[derive(Debug, thiserror::Error)]
380pub enum OptimizerError {
381    #[error("{0}")]
382    PlanError(#[from] PlanError),
383    #[error("{0}")]
384    RecursionLimitError(#[from] RecursionLimitError),
385    #[error("{0}")]
386    TransformError(#[from] TransformError),
387    #[error("{0}")]
388    EvalError(#[from] EvalError),
389    #[error("cannot materialize call to {0}")]
390    UnmaterializableFunction(UnmaterializableFunc),
391    #[error("cannot call {func} in {context} ")]
392    UncallableFunction {
393        func: UnmaterializableFunc,
394        context: &'static str,
395    },
396    #[error("access to function {0} is restricted")]
397    RestrictedFunction(UnmaterializableFunc),
398    #[error("{0}")]
399    UnsupportedTemporalExpression(String),
400    /// This is a specific kind of internal error. It's distinct from `Internal`, because we want to
401    /// catch it and swallow it in some cases.
402    #[error("internal optimizer error: MfpPlan couldn't be converted into SafeMfpPlan")]
403    InternalUnsafeMfpPlan(String),
404    #[error("internal optimizer error: {0}")]
405    Internal(String),
406}
407
408impl From<String> for OptimizerError {
409    fn from(msg: String) -> Self {
410        Self::Internal(msg)
411    }
412}
413
414impl OptimizerError {
415    pub fn detail(&self) -> Option<String> {
416        match self {
417            Self::UnmaterializableFunction(UnmaterializableFunc::CurrentTimestamp) => {
418                Some("See: https://materialize.com/docs/sql/functions/now_and_mz_now/".into())
419            }
420            Self::RestrictedFunction(_) => Some(
421                "Access to system catalog objects is restricted for this role. \
422                Contact your administrator if you need access."
423                    .into(),
424            ),
425            _ => None,
426        }
427    }
428
429    pub fn hint(&self) -> Option<String> {
430        match self {
431            Self::UnmaterializableFunction(UnmaterializableFunc::CurrentTimestamp) => {
432                Some("In temporal filters `mz_now()` may work instead.".into())
433            }
434            _ => None,
435        }
436    }
437}
438
439impl From<TimestampError> for OptimizerError {
440    fn from(value: TimestampError) -> Self {
441        OptimizerError::EvalError(EvalError::from(value))
442    }
443}
444
445impl From<anyhow::Error> for OptimizerError {
446    fn from(value: anyhow::Error) -> Self {
447        OptimizerError::Internal(value.to_string())
448    }
449}
450
451impl MaybeShouldPanic for OptimizerError {
452    fn should_panic(&self) -> Option<String> {
453        match self {
454            OptimizerError::TransformError(TransformError::CallerShouldPanic(msg)) => {
455                Some(msg.to_string())
456            }
457            _ => None,
458        }
459    }
460}
461
462// Tracing helpers
463// ---------------
464
465#[mz_ore::instrument(target = "optimizer", level = "debug", name = "local")]
466fn optimize_mir_local(
467    expr: MirRelationExpr,
468    ctx: &mut TransformCtx,
469) -> Result<OptimizedMirRelationExpr, OptimizerError> {
470    #[allow(deprecated)]
471    let optimizer = mz_transform::Optimizer::logical_optimizer(ctx);
472    let expr = optimizer.optimize(expr, ctx)?;
473
474    // Trace the result of this phase.
475    mz_repr::explain::trace_plan(expr.as_inner());
476
477    Ok::<_, OptimizerError>(expr)
478}
479
480/// This is just a wrapper around [mz_transform::Optimizer::constant_optimizer],
481/// running it, and tracing the result plan.
482#[mz_ore::instrument(target = "optimizer", level = "debug", name = "constant")]
483fn optimize_mir_constant(
484    expr: MirRelationExpr,
485    ctx: &mut TransformCtx,
486    limit: bool,
487) -> Result<MirRelationExpr, OptimizerError> {
488    let optimizer = mz_transform::Optimizer::constant_optimizer(ctx, limit);
489    let expr = optimizer.optimize(expr, ctx)?;
490
491    // Trace the result of this phase.
492    mz_repr::explain::trace_plan(expr.as_inner());
493
494    Ok::<_, OptimizerError>(expr.0)
495}
496
497macro_rules! trace_plan {
498    (at: $span:literal, $plan:expr) => {
499        tracing::debug_span!(target: "optimizer", $span).in_scope(|| {
500            mz_repr::explain::trace_plan($plan);
501        });
502    }
503}
504
505use trace_plan;
506
507use crate::coord::ExplainContext;