mz_adapter/
optimize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Optimizer interface to the adapter and coordinator code.
11//!
12//! The goal of this crate is to abstract optimizer specifics behind a
13//! high-level interface that is ready to be consumed by the coordinator code in
14//! a future-proof way (that is, the API is taking the upcoming evolution of
15//! these components into account).
16//!
17//! The contents of this crate should have minimal dependencies to the rest of
18//! the coordinator code so we can pull them out as a separate crate in the
19//! future without too much effort.
20//!
21//! The main type in this module is a very simple [`Optimize`] trait which
22//! allows us to adhere to the following principles:
23//!
24//! - Implementors of this trait are structs that encapsulate all context
25//!   required to optimize a statement of type `T` end-to-end (for example
26//!   [`materialized_view::Optimizer`] for `T` = `MaterializedView`).
27//! - Each struct implements [`Optimize`] once for each optimization stage. The
28//!   `From` type represents the input of the stage and `Self::To` the
29//!   associated stage output. This allows to have more than one entrypoints to
30//!   a pipeline.
31//! - The concrete types used for stage results are opaque structs that are
32//!   specific to the pipeline of that statement type.
33//!   - We use different structs even if two statement types might have
34//!     structurally identical intermediate results. This ensures that client
35//!     code cannot first execute some optimization stages for one type and then
36//!     some stages for a different type.
37//!   - The only way to construct such a struct is by running the [`Optimize`]
38//!     stage that produces it. This ensures that client code cannot interfere
39//!     with the pipeline.
40//!   - In general, the internals of these structs can be accessed only behind a
41//!     shared reference. This ensures that client code can look up information
42//!     from intermediate stages but cannot modify it.
43//!   - Timestamp selection is modeled as a conversion between structs that are
44//!     adjacent in the pipeline using a method called `resolve`.
45//!   - The struct representing the result of the final stage of the
46//!     optimization pipeline can be destructed to access its internals with a
47//!     method called `unapply`.
48//! - The `Send` trait bounds on the `Self` and `From` types ensure that
49//!   [`Optimize`] instances can be passed to different threads (this is
50//!   required of off-thread optimization).
51//!
52//! For details, see the `20230714_optimizer_interface.md` design doc in this
53//! repository.
54
55pub mod copy_to;
56pub mod dataflows;
57pub mod index;
58pub mod materialized_view;
59pub mod peek;
60pub mod subscribe;
61pub mod view;
62
63use std::fmt::Debug;
64
65use mz_adapter_types::connection::ConnectionId;
66use mz_adapter_types::dyncfgs::PERSIST_FAST_PATH_ORDER;
67use mz_catalog::memory::objects::{CatalogCollectionEntry, CatalogEntry, Index};
68use mz_compute_types::dataflows::DataflowDescription;
69use mz_compute_types::plan::Plan;
70use mz_controller_types::ClusterId;
71use mz_expr::{EvalError, MirRelationExpr, OptimizedMirRelationExpr, UnmaterializableFunc};
72use mz_ore::stack::RecursionLimitError;
73use mz_repr::adt::timestamp::TimestampError;
74use mz_repr::optimize::{OptimizerFeatureOverrides, OptimizerFeatures, OverrideFrom};
75use mz_repr::{CatalogItemId, GlobalId};
76use mz_sql::names::{FullItemName, QualifiedItemName};
77use mz_sql::plan::PlanError;
78use mz_sql::session::vars::SystemVars;
79use mz_transform::{MaybeShouldPanic, TransformCtx, TransformError};
80
81// Alias types
82// -----------
83
84/// A type for a [`DataflowDescription`] backed by `Mir~` plans. Used internally
85/// by the optimizer implementations.
86type MirDataflowDescription = DataflowDescription<OptimizedMirRelationExpr>;
87/// A type for a [`DataflowDescription`] backed by `Lir~` plans. Used internally
88/// by the optimizer implementations.
89type LirDataflowDescription = DataflowDescription<Plan>;
90
91// Core API
92// --------
93
94/// A trait that represents an optimization stage.
95///
96/// The trait is implemented by structs that encapsulate the context needed to
97/// run an end-to-end optimization pipeline for a specific statement type
98/// (`Index`, `View`, `MaterializedView`, `Subscribe`, `Select`).
99///
100/// Each implementation represents a concrete optimization stage for a fixed
101/// statement type that consumes an input of type `From` and produces output of
102/// type `Self::To`.
103pub trait Optimize<From> {
104    type To;
105
106    /// Execute the optimization stage, transforming the input plan of type
107    /// `From` to an output plan of type `To`.
108    fn optimize(&mut self, plan: From) -> Result<Self::To, OptimizerError>;
109
110    /// Like [`Self::optimize`], but additionally ensures that panics occurring
111    /// in the [`Self::optimize`] call are caught and demoted to an
112    /// [`OptimizerError::Internal`] error.
113    ///
114    /// Additionally, if the result of the optimization is an error (not a panic) that indicates we
115    /// should panic, then panic.
116    #[mz_ore::instrument(target = "optimizer", level = "debug", name = "optimize")]
117    fn catch_unwind_optimize(&mut self, plan: From) -> Result<Self::To, OptimizerError> {
118        mz_transform::catch_unwind_optimize(|| self.optimize(plan))
119    }
120}
121
122// Optimizer configuration
123// -----------------------
124
125/// Feature flags for the optimizer.
126///
127/// To add a new feature flag, do the following steps:
128///
129/// 1. To make the flag available to all stages in our [`Optimize`] pipelines
130///    and allow engineers to set a system-wide override:
131///    1. Add the flag to the `optimizer_feature_flags!(...)` macro call.
132///    2. Add the flag to the `feature_flags!(...)` macro call and extend the
133///       `From<&SystemVars>` implementation for [`OptimizerFeatures`].
134///
135/// 2. To enable `EXPLAIN ... WITH(...)` overrides which will allow engineers to
136///    inspect plan differences before deploying the optimizer changes:
137///    1. Add the flag to the `ExplainPlanOptionName` definition.
138///    2. Add the flag to the `generate_extracted_config!(ExplainPlanOption,
139///       ...)` macro call.
140///    3. Extend the `TryFrom<ExplainPlanOptionExtracted>` implementation for
141///       [`mz_repr::explain::ExplainConfig`].
142///
143/// 3. To enable `CLUSTER ... FEATURES(...)` overrides which will allow
144///    engineers to experiment with runtime differences before deploying the
145///    optimizer changes:
146///    1. Add the flag to the `ClusterFeatureName` definition.
147///    2. Add the flag to the `generate_extracted_config!(ClusterFeature, ...)`
148///       macro call.
149///    3. Extend the `let optimizer_feature_overrides = ...` call in
150///       `plan_create_cluster`.
151#[derive(Clone, Debug)]
152pub struct OptimizerConfig {
153    /// The mode in which the optimizer runs.
154    pub mode: OptimizeMode,
155    /// If the [`GlobalId`] is set the optimizer works in "replan" mode.
156    ///
157    /// This means that it will not consider catalog items (more specifically
158    /// indexes) with [`GlobalId`] greater or equal than the one provided here.
159    pub replan: Option<GlobalId>,
160    /// Show the slow path plan even if a fast path plan was created. Useful for debugging.
161    /// Enforced if `timing` is set.
162    pub no_fast_path: bool,
163    // If set, allow some additional queries down the Persist fast path when we believe
164    // the orderings are compatible.
165    persist_fast_path_order: bool,
166    /// Optimizer feature flags.
167    pub features: OptimizerFeatures,
168}
169
170#[derive(Clone, Debug, PartialEq, Eq)]
171pub enum OptimizeMode {
172    /// A mode where the optimized statement is executed.
173    Execute,
174    /// A mode where the optimized statement is explained.
175    Explain,
176}
177
178impl From<&SystemVars> for OptimizerConfig {
179    fn from(vars: &SystemVars) -> Self {
180        Self {
181            mode: OptimizeMode::Execute,
182            replan: None,
183            no_fast_path: false,
184            persist_fast_path_order: PERSIST_FAST_PATH_ORDER.get(vars.dyncfgs()),
185            features: OptimizerFeatures::from(vars),
186        }
187    }
188}
189
190/// Override [`OptimizerConfig::features`] from [`OptimizerFeatureOverrides`].
191impl OverrideFrom<OptimizerFeatureOverrides> for OptimizerConfig {
192    fn override_from(mut self, overrides: &OptimizerFeatureOverrides) -> Self {
193        self.features = self.features.override_from(overrides);
194        self
195    }
196}
197
198/// [`OptimizerConfig`] overrides coming from an [`ExplainContext`].
199impl OverrideFrom<ExplainContext> for OptimizerConfig {
200    fn override_from(mut self, ctx: &ExplainContext) -> Self {
201        let ExplainContext::Plan(ctx) = ctx else {
202            return self; // Return immediately for all other contexts.
203        };
204
205        // Override general parameters.
206        self.mode = OptimizeMode::Explain;
207        self.replan = ctx.replan;
208        self.no_fast_path = ctx.config.no_fast_path;
209
210        // Override feature flags that can be enabled in the EXPLAIN config.
211        self.features = self.features.override_from(&ctx.config.features);
212
213        // Return the final result.
214        self
215    }
216}
217
218impl From<&OptimizerConfig> for mz_sql::plan::HirToMirConfig {
219    fn from(config: &OptimizerConfig) -> Self {
220        Self {
221            enable_new_outer_join_lowering: config.features.enable_new_outer_join_lowering,
222            enable_variadic_left_join_lowering: config.features.enable_variadic_left_join_lowering,
223            enable_guard_subquery_tablefunc: config.features.enable_guard_subquery_tablefunc,
224            enable_cast_elimination: config.features.enable_cast_elimination,
225        }
226    }
227}
228
229// OptimizerCatalog
230// ===============
231
232pub trait OptimizerCatalog: Debug + Send + Sync {
233    fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry;
234    fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry;
235    fn resolve_full_name(
236        &self,
237        name: &QualifiedItemName,
238        conn_id: Option<&ConnectionId>,
239    ) -> FullItemName;
240
241    /// Returns all indexes on the given object and cluster known in the
242    /// catalog.
243    fn get_indexes_on(
244        &self,
245        id: GlobalId,
246        cluster: ClusterId,
247    ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_>;
248}
249
250// OptimizerError
251// ===============
252
253/// Error types that can be generated during optimization.
254#[derive(Debug, thiserror::Error)]
255pub enum OptimizerError {
256    #[error("{0}")]
257    PlanError(#[from] PlanError),
258    #[error("{0}")]
259    RecursionLimitError(#[from] RecursionLimitError),
260    #[error("{0}")]
261    TransformError(#[from] TransformError),
262    #[error("{0}")]
263    EvalError(#[from] EvalError),
264    #[error("cannot materialize call to {0}")]
265    UnmaterializableFunction(UnmaterializableFunc),
266    #[error("cannot call {func} in {context} ")]
267    UncallableFunction {
268        func: UnmaterializableFunc,
269        context: &'static str,
270    },
271    #[error("{0}")]
272    UnsupportedTemporalExpression(String),
273    /// This is a specific kind of internal error. It's distinct from `Internal`, because we want to
274    /// catch it and swallow it in some cases.
275    #[error("internal optimizer error: MfpPlan couldn't be converted into SafeMfpPlan")]
276    InternalUnsafeMfpPlan(String),
277    #[error("internal optimizer error: {0}")]
278    Internal(String),
279}
280
281impl From<String> for OptimizerError {
282    fn from(msg: String) -> Self {
283        Self::Internal(msg)
284    }
285}
286
287impl OptimizerError {
288    pub fn detail(&self) -> Option<String> {
289        match self {
290            Self::UnmaterializableFunction(UnmaterializableFunc::CurrentTimestamp) => {
291                Some("See: https://materialize.com/docs/sql/functions/now_and_mz_now/".into())
292            }
293            _ => None,
294        }
295    }
296
297    pub fn hint(&self) -> Option<String> {
298        match self {
299            Self::UnmaterializableFunction(UnmaterializableFunc::CurrentTimestamp) => {
300                Some("In temporal filters `mz_now()` may work instead.".into())
301            }
302            _ => None,
303        }
304    }
305}
306
307impl From<TimestampError> for OptimizerError {
308    fn from(value: TimestampError) -> Self {
309        OptimizerError::EvalError(EvalError::from(value))
310    }
311}
312
313impl From<anyhow::Error> for OptimizerError {
314    fn from(value: anyhow::Error) -> Self {
315        OptimizerError::Internal(value.to_string())
316    }
317}
318
319impl MaybeShouldPanic for OptimizerError {
320    fn should_panic(&self) -> Option<String> {
321        match self {
322            OptimizerError::TransformError(TransformError::CallerShouldPanic(msg)) => {
323                Some(msg.to_string())
324            }
325            _ => None,
326        }
327    }
328}
329
330// Tracing helpers
331// ---------------
332
333#[mz_ore::instrument(target = "optimizer", level = "debug", name = "local")]
334fn optimize_mir_local(
335    expr: MirRelationExpr,
336    ctx: &mut TransformCtx,
337) -> Result<OptimizedMirRelationExpr, OptimizerError> {
338    #[allow(deprecated)]
339    let optimizer = mz_transform::Optimizer::logical_optimizer(ctx);
340    let expr = optimizer.optimize(expr, ctx)?;
341
342    // Trace the result of this phase.
343    mz_repr::explain::trace_plan(expr.as_inner());
344
345    Ok::<_, OptimizerError>(expr)
346}
347
348/// This is just a wrapper around [mz_transform::Optimizer::constant_optimizer],
349/// running it, and tracing the result plan.
350#[mz_ore::instrument(target = "optimizer", level = "debug", name = "constant")]
351fn optimize_mir_constant(
352    expr: MirRelationExpr,
353    ctx: &mut TransformCtx,
354    limit: bool,
355) -> Result<MirRelationExpr, OptimizerError> {
356    let optimizer = mz_transform::Optimizer::constant_optimizer(ctx, limit);
357    let expr = optimizer.optimize(expr, ctx)?;
358
359    // Trace the result of this phase.
360    mz_repr::explain::trace_plan(expr.as_inner());
361
362    Ok::<_, OptimizerError>(expr.0)
363}
364
365macro_rules! trace_plan {
366    (at: $span:literal, $plan:expr) => {
367        tracing::debug_span!(target: "optimizer", $span).in_scope(|| {
368            mz_repr::explain::trace_plan($plan);
369        });
370    }
371}
372
373use trace_plan;
374
375use crate::coord::ExplainContext;