Skip to main content

mz_adapter/
optimize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Optimizer interface to the adapter and coordinator code.
11//!
12//! The goal of this crate is to abstract optimizer specifics behind a
13//! high-level interface that is ready to be consumed by the coordinator code in
14//! a future-proof way (that is, the API is taking the upcoming evolution of
15//! these components into account).
16//!
17//! The contents of this crate should have minimal dependencies to the rest of
18//! the coordinator code so we can pull them out as a separate crate in the
19//! future without too much effort.
20//!
21//! The main type in this module is a very simple [`Optimize`] trait which
22//! allows us to adhere to the following principles:
23//!
24//! - Implementors of this trait are structs that encapsulate all context
25//!   required to optimize a statement of type `T` end-to-end (for example
26//!   [`materialized_view::Optimizer`] for `T` = `MaterializedView`).
27//! - Each struct implements [`Optimize`] once for each optimization stage. The
28//!   `From` type represents the input of the stage and `Self::To` the
29//!   associated stage output. This allows to have more than one entrypoints to
30//!   a pipeline.
31//! - The concrete types used for stage results are opaque structs that are
32//!   specific to the pipeline of that statement type.
33//!   - We use different structs even if two statement types might have
34//!     structurally identical intermediate results. This ensures that client
35//!     code cannot first execute some optimization stages for one type and then
36//!     some stages for a different type.
37//!   - The only way to construct such a struct is by running the [`Optimize`]
38//!     stage that produces it. This ensures that client code cannot interfere
39//!     with the pipeline.
40//!   - In general, the internals of these structs can be accessed only behind a
41//!     shared reference. This ensures that client code can look up information
42//!     from intermediate stages but cannot modify it.
43//!   - Timestamp selection is modeled as a conversion between structs that are
44//!     adjacent in the pipeline using a method called `resolve`.
45//!   - The struct representing the result of the final stage of the
46//!     optimization pipeline can be destructed to access its internals with a
47//!     method called `unapply`.
48//! - The `Send` trait bounds on the `Self` and `From` types ensure that
49//!   [`Optimize`] instances can be passed to different threads (this is
50//!   required of off-thread optimization).
51//!
52//! For details, see the `20230714_optimizer_interface.md` design doc in this
53//! repository.
54
55pub mod copy_to;
56pub mod dataflows;
57pub mod index;
58pub mod materialized_view;
59pub mod peek;
60pub mod subscribe;
61pub mod view;
62
63use std::fmt::Debug;
64
65use mz_adapter_types::connection::ConnectionId;
66use mz_adapter_types::dyncfgs::PERSIST_FAST_PATH_ORDER;
67use mz_catalog::memory::objects::{CatalogCollectionEntry, CatalogEntry, Index};
68use mz_compute_types::dataflows::DataflowDescription;
69use mz_compute_types::dyncfgs::SUBSCRIBE_SNAPSHOT_OPTIMIZATION;
70use mz_compute_types::plan::Plan;
71use mz_controller_types::ClusterId;
72use mz_expr::{EvalError, MirRelationExpr, OptimizedMirRelationExpr, UnmaterializableFunc};
73use mz_ore::stack::RecursionLimitError;
74use mz_repr::adt::timestamp::TimestampError;
75use mz_repr::optimize::{OptimizerFeatureOverrides, OptimizerFeatures, OverrideFrom};
76use mz_repr::{CatalogItemId, GlobalId};
77use mz_sql::names::{FullItemName, QualifiedItemName};
78use mz_sql::plan::PlanError;
79use mz_sql::session::vars::SystemVars;
80use mz_transform::{MaybeShouldPanic, TransformCtx, TransformError};
81
82// Alias types
83// -----------
84
85/// A type for a [`DataflowDescription`] backed by `Mir~` plans. Used internally
86/// by the optimizer implementations.
87type MirDataflowDescription = DataflowDescription<OptimizedMirRelationExpr>;
88/// A type for a [`DataflowDescription`] backed by `Lir~` plans. Used internally
89/// by the optimizer implementations.
90type LirDataflowDescription = DataflowDescription<Plan>;
91
92// Core API
93// --------
94
95/// A trait that represents an optimization stage.
96///
97/// The trait is implemented by structs that encapsulate the context needed to
98/// run an end-to-end optimization pipeline for a specific statement type
99/// (`Index`, `View`, `MaterializedView`, `Subscribe`, `Select`).
100///
101/// Each implementation represents a concrete optimization stage for a fixed
102/// statement type that consumes an input of type `From` and produces output of
103/// type `Self::To`.
104pub trait Optimize<From> {
105    type To;
106
107    /// Execute the optimization stage, transforming the input plan of type
108    /// `From` to an output plan of type `To`.
109    fn optimize(&mut self, plan: From) -> Result<Self::To, OptimizerError>;
110
111    /// Like [`Self::optimize`], but additionally ensures that panics occurring
112    /// in the [`Self::optimize`] call are caught and demoted to an
113    /// [`OptimizerError::Internal`] error.
114    ///
115    /// Additionally, if the result of the optimization is an error (not a panic) that indicates we
116    /// should panic, then panic.
117    #[mz_ore::instrument(target = "optimizer", level = "debug", name = "optimize")]
118    fn catch_unwind_optimize(&mut self, plan: From) -> Result<Self::To, OptimizerError> {
119        mz_transform::catch_unwind_optimize(|| self.optimize(plan))
120    }
121}
122
123// Optimizer configuration
124// -----------------------
125
126/// Feature flags for the optimizer.
127///
128/// To add a new feature flag, do the following steps:
129///
130/// 1. To make the flag available to all stages in our [`Optimize`] pipelines
131///    and allow engineers to set a system-wide override:
132///    1. Add the flag to the `optimizer_feature_flags!(...)` macro call.
133///    2. Add the flag to the `feature_flags!(...)` macro call and extend the
134///       `From<&SystemVars>` implementation for [`OptimizerFeatures`].
135///
136/// 2. To enable `EXPLAIN ... WITH(...)` overrides which will allow engineers to
137///    inspect plan differences before deploying the optimizer changes:
138///    1. Add the flag to the `ExplainPlanOptionName` definition.
139///    2. Add the flag to the `generate_extracted_config!(ExplainPlanOption,
140///       ...)` macro call.
141///    3. Extend the `TryFrom<ExplainPlanOptionExtracted>` implementation for
142///       [`mz_repr::explain::ExplainConfig`].
143///
144/// 3. To enable `CLUSTER ... FEATURES(...)` overrides which will allow
145///    engineers to experiment with runtime differences before deploying the
146///    optimizer changes:
147///    1. Add the flag to the `ClusterFeatureName` definition.
148///    2. Add the flag to the `generate_extracted_config!(ClusterFeature, ...)`
149///       macro call.
150///    3. Extend the `let optimizer_feature_overrides = ...` call in
151///       `plan_create_cluster`.
152#[derive(Clone, Debug)]
153pub struct OptimizerConfig {
154    /// The mode in which the optimizer runs.
155    pub mode: OptimizeMode,
156    /// If the [`GlobalId`] is set the optimizer works in "replan" mode.
157    ///
158    /// This means that it will not consider catalog items (more specifically
159    /// indexes) with [`GlobalId`] greater or equal than the one provided here.
160    pub replan: Option<GlobalId>,
161    /// Show the slow path plan even if a fast path plan was created. Useful for debugging.
162    /// Enforced if `timing` is set.
163    pub no_fast_path: bool,
164    // If set, allow some additional queries down the Persist fast path when we believe
165    // the orderings are compatible.
166    persist_fast_path_order: bool,
167    // Enable calculating with_snapshot metadata for subscribes.
168    subscribe_snapshot_optimization: bool,
169    /// Optimizer feature flags.
170    pub features: OptimizerFeatures,
171}
172
173#[derive(Clone, Debug, PartialEq, Eq)]
174pub enum OptimizeMode {
175    /// A mode where the optimized statement is executed.
176    Execute,
177    /// A mode where the optimized statement is explained.
178    Explain,
179}
180
181impl From<&SystemVars> for OptimizerConfig {
182    fn from(vars: &SystemVars) -> Self {
183        Self {
184            mode: OptimizeMode::Execute,
185            replan: None,
186            no_fast_path: false,
187            persist_fast_path_order: PERSIST_FAST_PATH_ORDER.get(vars.dyncfgs()),
188            subscribe_snapshot_optimization: SUBSCRIBE_SNAPSHOT_OPTIMIZATION.get(vars.dyncfgs()),
189            features: OptimizerFeatures::from(vars),
190        }
191    }
192}
193
194/// Override [`OptimizerConfig::features`] from [`OptimizerFeatureOverrides`].
195impl OverrideFrom<OptimizerFeatureOverrides> for OptimizerConfig {
196    fn override_from(mut self, overrides: &OptimizerFeatureOverrides) -> Self {
197        self.features = self.features.override_from(overrides);
198        self
199    }
200}
201
202/// [`OptimizerConfig`] overrides coming from an [`ExplainContext`].
203impl OverrideFrom<ExplainContext> for OptimizerConfig {
204    fn override_from(mut self, ctx: &ExplainContext) -> Self {
205        let ExplainContext::Plan(ctx) = ctx else {
206            return self; // Return immediately for all other contexts.
207        };
208
209        // Override general parameters.
210        self.mode = OptimizeMode::Explain;
211        self.replan = ctx.replan;
212        self.no_fast_path = ctx.config.no_fast_path;
213
214        // Override feature flags that can be enabled in the EXPLAIN config.
215        self.features = self.features.override_from(&ctx.config.features);
216
217        // Return the final result.
218        self
219    }
220}
221
222impl From<&OptimizerConfig> for mz_sql::plan::HirToMirConfig {
223    fn from(config: &OptimizerConfig) -> Self {
224        Self {
225            enable_new_outer_join_lowering: config.features.enable_new_outer_join_lowering,
226            enable_variadic_left_join_lowering: config.features.enable_variadic_left_join_lowering,
227            enable_guard_subquery_tablefunc: config.features.enable_guard_subquery_tablefunc,
228            enable_cast_elimination: config.features.enable_cast_elimination,
229        }
230    }
231}
232
233// OptimizerCatalog
234// ===============
235
236pub trait OptimizerCatalog: Debug + Send + Sync {
237    fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry;
238    fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry;
239    fn resolve_full_name(
240        &self,
241        name: &QualifiedItemName,
242        conn_id: Option<&ConnectionId>,
243    ) -> FullItemName;
244
245    /// Returns all indexes on the given object and cluster known in the
246    /// catalog.
247    fn get_indexes_on(
248        &self,
249        id: GlobalId,
250        cluster: ClusterId,
251    ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_>;
252}
253
254// OptimizerError
255// ===============
256
257/// Error types that can be generated during optimization.
258#[derive(Debug, thiserror::Error)]
259pub enum OptimizerError {
260    #[error("{0}")]
261    PlanError(#[from] PlanError),
262    #[error("{0}")]
263    RecursionLimitError(#[from] RecursionLimitError),
264    #[error("{0}")]
265    TransformError(#[from] TransformError),
266    #[error("{0}")]
267    EvalError(#[from] EvalError),
268    #[error("cannot materialize call to {0}")]
269    UnmaterializableFunction(UnmaterializableFunc),
270    #[error("cannot call {func} in {context} ")]
271    UncallableFunction {
272        func: UnmaterializableFunc,
273        context: &'static str,
274    },
275    #[error("{0}")]
276    UnsupportedTemporalExpression(String),
277    /// This is a specific kind of internal error. It's distinct from `Internal`, because we want to
278    /// catch it and swallow it in some cases.
279    #[error("internal optimizer error: MfpPlan couldn't be converted into SafeMfpPlan")]
280    InternalUnsafeMfpPlan(String),
281    #[error("internal optimizer error: {0}")]
282    Internal(String),
283}
284
285impl From<String> for OptimizerError {
286    fn from(msg: String) -> Self {
287        Self::Internal(msg)
288    }
289}
290
291impl OptimizerError {
292    pub fn detail(&self) -> Option<String> {
293        match self {
294            Self::UnmaterializableFunction(UnmaterializableFunc::CurrentTimestamp) => {
295                Some("See: https://materialize.com/docs/sql/functions/now_and_mz_now/".into())
296            }
297            _ => None,
298        }
299    }
300
301    pub fn hint(&self) -> Option<String> {
302        match self {
303            Self::UnmaterializableFunction(UnmaterializableFunc::CurrentTimestamp) => {
304                Some("In temporal filters `mz_now()` may work instead.".into())
305            }
306            _ => None,
307        }
308    }
309}
310
311impl From<TimestampError> for OptimizerError {
312    fn from(value: TimestampError) -> Self {
313        OptimizerError::EvalError(EvalError::from(value))
314    }
315}
316
317impl From<anyhow::Error> for OptimizerError {
318    fn from(value: anyhow::Error) -> Self {
319        OptimizerError::Internal(value.to_string())
320    }
321}
322
323impl MaybeShouldPanic for OptimizerError {
324    fn should_panic(&self) -> Option<String> {
325        match self {
326            OptimizerError::TransformError(TransformError::CallerShouldPanic(msg)) => {
327                Some(msg.to_string())
328            }
329            _ => None,
330        }
331    }
332}
333
334// Tracing helpers
335// ---------------
336
337#[mz_ore::instrument(target = "optimizer", level = "debug", name = "local")]
338fn optimize_mir_local(
339    expr: MirRelationExpr,
340    ctx: &mut TransformCtx,
341) -> Result<OptimizedMirRelationExpr, OptimizerError> {
342    #[allow(deprecated)]
343    let optimizer = mz_transform::Optimizer::logical_optimizer(ctx);
344    let expr = optimizer.optimize(expr, ctx)?;
345
346    // Trace the result of this phase.
347    mz_repr::explain::trace_plan(expr.as_inner());
348
349    Ok::<_, OptimizerError>(expr)
350}
351
352/// This is just a wrapper around [mz_transform::Optimizer::constant_optimizer],
353/// running it, and tracing the result plan.
354#[mz_ore::instrument(target = "optimizer", level = "debug", name = "constant")]
355fn optimize_mir_constant(
356    expr: MirRelationExpr,
357    ctx: &mut TransformCtx,
358    limit: bool,
359) -> Result<MirRelationExpr, OptimizerError> {
360    let optimizer = mz_transform::Optimizer::constant_optimizer(ctx, limit);
361    let expr = optimizer.optimize(expr, ctx)?;
362
363    // Trace the result of this phase.
364    mz_repr::explain::trace_plan(expr.as_inner());
365
366    Ok::<_, OptimizerError>(expr.0)
367}
368
369macro_rules! trace_plan {
370    (at: $span:literal, $plan:expr) => {
371        tracing::debug_span!(target: "optimizer", $span).in_scope(|| {
372            mz_repr::explain::trace_plan($plan);
373        });
374    }
375}
376
377use trace_plan;
378
379use crate::coord::ExplainContext;