mz_adapter/
optimize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Optimizer interface to the adapter and coordinator code.
11//!
12//! The goal of this crate is to abstract optimizer specifics behind a
13//! high-level interface that is ready to be consumed by the coordinator code in
14//! a future-proof way (that is, the API is taking the upcoming evolution of
15//! these components into account).
16//!
17//! The contents of this crate should have minimal dependencies to the rest of
18//! the coordinator code so we can pull them out as a separate crate in the
19//! future without too much effort.
20//!
21//! The main type in this module is a very simple [`Optimize`] trait which
22//! allows us to adhere to the following principles:
23//!
24//! - Implementors of this trait are structs that encapsulate all context
25//!   required to optimize a statement of type `T` end-to-end (for example
26//!   [`materialized_view::Optimizer`] for `T` = `MaterializedView`).
27//! - Each struct implements [`Optimize`] once for each optimization stage. The
28//!   `From` type represents the input of the stage and `Self::To` the
29//!   associated stage output. This allows to have more than one entrypoints to
30//!   a pipeline.
31//! - The concrete types used for stage results are opaque structs that are
32//!   specific to the pipeline of that statement type.
33//!   - We use different structs even if two statement types might have
34//!     structurally identical intermediate results. This ensures that client
35//!     code cannot first execute some optimization stages for one type and then
36//!     some stages for a different type.
37//!   - The only way to construct such a struct is by running the [`Optimize`]
38//!     stage that produces it. This ensures that client code cannot interfere
39//!     with the pipeline.
40//!   - In general, the internals of these structs can be accessed only behind a
41//!     shared reference. This ensures that client code can look up information
42//!     from intermediate stages but cannot modify it.
43//!   - Timestamp selection is modeled as a conversion between structs that are
44//!     adjacent in the pipeline using a method called `resolve`.
45//!   - The struct representing the result of the final stage of the
46//!     optimization pipeline can be destructed to access its internals with a
47//!     method called `unapply`.
48//! - The `Send` trait bounds on the `Self` and `From` types ensure that
49//!   [`Optimize`] instances can be passed to different threads (this is
50//!   required of off-thread optimization).
51//!
52//! For details, see the `20230714_optimizer_interface.md` design doc in this
53//! repository.
54
55pub mod copy_to;
56pub mod dataflows;
57pub mod index;
58pub mod materialized_view;
59pub mod peek;
60pub mod subscribe;
61pub mod view;
62
63use std::fmt::Debug;
64
65use mz_adapter_types::connection::ConnectionId;
66use mz_adapter_types::dyncfgs::PERSIST_FAST_PATH_ORDER;
67use mz_catalog::memory::objects::{CatalogCollectionEntry, CatalogEntry, Index};
68use mz_compute_types::dataflows::DataflowDescription;
69use mz_compute_types::plan::Plan;
70use mz_controller_types::ClusterId;
71use mz_expr::{EvalError, MirRelationExpr, OptimizedMirRelationExpr, UnmaterializableFunc};
72use mz_ore::stack::RecursionLimitError;
73use mz_repr::adt::timestamp::TimestampError;
74use mz_repr::optimize::{OptimizerFeatureOverrides, OptimizerFeatures, OverrideFrom};
75use mz_repr::{CatalogItemId, GlobalId};
76use mz_sql::names::{FullItemName, QualifiedItemName};
77use mz_sql::plan::PlanError;
78use mz_sql::session::vars::SystemVars;
79use mz_transform::{MaybeShouldPanic, TransformCtx, TransformError};
80
81// Alias types
82// -----------
83
84/// A type for a [`DataflowDescription`] backed by `Mir~` plans. Used internally
85/// by the optimizer implementations.
86type MirDataflowDescription = DataflowDescription<OptimizedMirRelationExpr>;
87/// A type for a [`DataflowDescription`] backed by `Lir~` plans. Used internally
88/// by the optimizer implementations.
89type LirDataflowDescription = DataflowDescription<Plan>;
90
91// Core API
92// --------
93
94/// A trait that represents an optimization stage.
95///
96/// The trait is implemented by structs that encapsulate the context needed to
97/// run an end-to-end optimization pipeline for a specific statement type
98/// (`Index`, `View`, `MaterializedView`, `Subscribe`, `Select`).
99///
100/// Each implementation represents a concrete optimization stage for a fixed
101/// statement type that consumes an input of type `From` and produces output of
102/// type `Self::To`.
103///
104/// The generic lifetime `'ctx` models the lifetime of the optimizer context and
105/// can be passed to the optimizer struct and the `Self::To` types.
106///
107/// The `'s: 'ctx` bound in the `optimize` method call ensures that an optimizer
108/// instance can run an optimization stage that produces a `Self::To` with
109/// `&'ctx` references.
110pub trait Optimize<From>: Send
111where
112    From: Send,
113{
114    type To: Send;
115
116    /// Execute the optimization stage, transforming the input plan of type
117    /// `From` to an output plan of type `To`.
118    fn optimize(&mut self, plan: From) -> Result<Self::To, OptimizerError>;
119
120    /// Like [`Self::optimize`], but additionally ensures that panics occurring
121    /// in the [`Self::optimize`] call are caught and demoted to an
122    /// [`OptimizerError::Internal`] error.
123    ///
124    /// Additionally, if the result of the optimization is an error (not a panic) that indicates we
125    /// should panic, then panic.
126    #[mz_ore::instrument(target = "optimizer", level = "debug", name = "optimize")]
127    fn catch_unwind_optimize(&mut self, plan: From) -> Result<Self::To, OptimizerError> {
128        mz_transform::catch_unwind_optimize(|| self.optimize(plan))
129    }
130
131    /// Execute the optimization stage and panic if an error occurs.
132    ///
133    /// See [`Optimize::optimize`].
134    #[allow(dead_code)] // This function is never used, but it's useful to keep around.
135    fn must_optimize(&mut self, expr: From) -> Self::To {
136        match self.optimize(expr) {
137            Ok(ok) => ok,
138            Err(err) => panic!("must_optimize call failed: {err}"),
139        }
140    }
141}
142
143// Optimizer configuration
144// -----------------------
145
146/// Feature flags for the optimizer.
147///
148/// To add a new feature flag, do the following steps:
149///
150/// 1. To make the flag available to all stages in our [`Optimize`] pipelines
151///    and allow engineers to set a system-wide override:
152///    1. Add the flag to the `optimizer_feature_flags!(...)` macro call.
153///    2. Add the flag to the `feature_flags!(...)` macro call and extend the
154///       `From<&SystemVars>` implementation for [`OptimizerFeatures`].
155///
156/// 2. To enable `EXPLAIN ... WITH(...)` overrides which will allow engineers to
157///    inspect plan differences before deploying the optimizer changes:
158///    1. Add the flag to the `ExplainPlanOptionName` definition.
159///    2. Add the flag to the `generate_extracted_config!(ExplainPlanOption,
160///       ...)` macro call.
161///    3. Extend the `TryFrom<ExplainPlanOptionExtracted>` implementation for
162///       [`mz_repr::explain::ExplainConfig`].
163///
164/// 3. To enable `CLUSTER ... FEATURES(...)` overrides which will allow
165///    engineers to experiment with runtime differences before deploying the
166///    optimizer changes:
167///    1. Add the flag to the `ClusterFeatureName` definition.
168///    2. Add the flag to the `generate_extracted_config!(ClusterFeature, ...)`
169///       macro call.
170///    3. Extend the `let optimizer_feature_overrides = ...` call in
171///       `plan_create_cluster`.
172#[derive(Clone, Debug)]
173pub struct OptimizerConfig {
174    /// The mode in which the optimizer runs.
175    pub mode: OptimizeMode,
176    /// If the [`GlobalId`] is set the optimizer works in "replan" mode.
177    ///
178    /// This means that it will not consider catalog items (more specifically
179    /// indexes) with [`GlobalId`] greater or equal than the one provided here.
180    pub replan: Option<GlobalId>,
181    /// Show the slow path plan even if a fast path plan was created. Useful for debugging.
182    /// Enforced if `timing` is set.
183    pub no_fast_path: bool,
184    // If set, allow some additional queries down the Persist fast path when we believe
185    // the orderings are compatible.
186    persist_fast_path_order: bool,
187    /// Optimizer feature flags.
188    pub features: OptimizerFeatures,
189}
190
191#[derive(Clone, Debug, PartialEq, Eq)]
192pub enum OptimizeMode {
193    /// A mode where the optimized statement is executed.
194    Execute,
195    /// A mode where the optimized statement is explained.
196    Explain,
197}
198
199impl From<&SystemVars> for OptimizerConfig {
200    fn from(vars: &SystemVars) -> Self {
201        Self {
202            mode: OptimizeMode::Execute,
203            replan: None,
204            no_fast_path: false,
205            persist_fast_path_order: PERSIST_FAST_PATH_ORDER.get(vars.dyncfgs()),
206            features: OptimizerFeatures::from(vars),
207        }
208    }
209}
210
211/// Override [`OptimizerConfig::features`] from [`OptimizerFeatureOverrides`].
212impl OverrideFrom<OptimizerFeatureOverrides> for OptimizerConfig {
213    fn override_from(mut self, overrides: &OptimizerFeatureOverrides) -> Self {
214        self.features = self.features.override_from(overrides);
215        self
216    }
217}
218
219/// [`OptimizerConfig`] overrides coming from an [`ExplainContext`].
220impl OverrideFrom<ExplainContext> for OptimizerConfig {
221    fn override_from(mut self, ctx: &ExplainContext) -> Self {
222        let ExplainContext::Plan(ctx) = ctx else {
223            return self; // Return immediately for all other contexts.
224        };
225
226        // Override general parameters.
227        self.mode = OptimizeMode::Explain;
228        self.replan = ctx.replan;
229        self.no_fast_path = ctx.config.no_fast_path;
230
231        // Override feature flags that can be enabled in the EXPLAIN config.
232        self.features = self.features.override_from(&ctx.config.features);
233
234        // Return the final result.
235        self
236    }
237}
238
239impl From<&OptimizerConfig> for mz_sql::plan::HirToMirConfig {
240    fn from(config: &OptimizerConfig) -> Self {
241        Self {
242            enable_new_outer_join_lowering: config.features.enable_new_outer_join_lowering,
243            enable_variadic_left_join_lowering: config.features.enable_variadic_left_join_lowering,
244            enable_guard_subquery_tablefunc: config.features.enable_guard_subquery_tablefunc,
245        }
246    }
247}
248
249// OptimizerCatalog
250// ===============
251
252pub trait OptimizerCatalog: Debug + Send + Sync {
253    fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry;
254    fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry;
255    fn resolve_full_name(
256        &self,
257        name: &QualifiedItemName,
258        conn_id: Option<&ConnectionId>,
259    ) -> FullItemName;
260
261    /// Returns all indexes on the given object and cluster known in the
262    /// catalog.
263    fn get_indexes_on(
264        &self,
265        id: GlobalId,
266        cluster: ClusterId,
267    ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_>;
268}
269
270// OptimizerError
271// ===============
272
273/// Error types that can be generated during optimization.
274#[derive(Debug, thiserror::Error)]
275pub enum OptimizerError {
276    #[error("{0}")]
277    PlanError(#[from] PlanError),
278    #[error("{0}")]
279    RecursionLimitError(#[from] RecursionLimitError),
280    #[error("{0}")]
281    TransformError(#[from] TransformError),
282    #[error("{0}")]
283    EvalError(#[from] EvalError),
284    #[error("cannot materialize call to {0}")]
285    UnmaterializableFunction(UnmaterializableFunc),
286    #[error("cannot call {func} in {context} ")]
287    UncallableFunction {
288        func: UnmaterializableFunc,
289        context: &'static str,
290    },
291    #[error("{0}")]
292    UnsupportedTemporalExpression(String),
293    /// This is a specific kind of internal error. It's distinct from `Internal`, because we want to
294    /// catch it and swallow it in some cases.
295    #[error("internal optimizer error: MfpPlan couldn't be converted into SafeMfpPlan")]
296    InternalUnsafeMfpPlan(String),
297    #[error("internal optimizer error: {0}")]
298    Internal(String),
299}
300
301impl From<String> for OptimizerError {
302    fn from(msg: String) -> Self {
303        Self::Internal(msg)
304    }
305}
306
307impl OptimizerError {
308    pub fn detail(&self) -> Option<String> {
309        match self {
310            Self::UnmaterializableFunction(UnmaterializableFunc::CurrentTimestamp) => {
311                Some("See: https://materialize.com/docs/sql/functions/now_and_mz_now/".into())
312            }
313            _ => None,
314        }
315    }
316
317    pub fn hint(&self) -> Option<String> {
318        match self {
319            Self::UnmaterializableFunction(UnmaterializableFunc::CurrentTimestamp) => {
320                Some("In temporal filters `mz_now()` may work instead.".into())
321            }
322            _ => None,
323        }
324    }
325}
326
327impl From<TimestampError> for OptimizerError {
328    fn from(value: TimestampError) -> Self {
329        OptimizerError::EvalError(EvalError::from(value))
330    }
331}
332
333impl From<anyhow::Error> for OptimizerError {
334    fn from(value: anyhow::Error) -> Self {
335        OptimizerError::Internal(value.to_string())
336    }
337}
338
339impl MaybeShouldPanic for OptimizerError {
340    fn should_panic(&self) -> Option<String> {
341        match self {
342            OptimizerError::TransformError(TransformError::CallerShouldPanic(msg)) => {
343                Some(msg.to_string())
344            }
345            _ => None,
346        }
347    }
348}
349
350// Tracing helpers
351// ---------------
352
353#[mz_ore::instrument(target = "optimizer", level = "debug", name = "local")]
354fn optimize_mir_local(
355    expr: MirRelationExpr,
356    ctx: &mut TransformCtx,
357) -> Result<OptimizedMirRelationExpr, OptimizerError> {
358    #[allow(deprecated)]
359    let optimizer = mz_transform::Optimizer::logical_optimizer(ctx);
360    let expr = optimizer.optimize(expr, ctx)?;
361
362    // Trace the result of this phase.
363    mz_repr::explain::trace_plan(expr.as_inner());
364
365    Ok::<_, OptimizerError>(expr)
366}
367
368/// This is just a wrapper around [mz_transform::Optimizer::constant_optimizer],
369/// running it, and tracing the result plan.
370#[mz_ore::instrument(target = "optimizer", level = "debug", name = "constant")]
371fn optimize_mir_constant(
372    expr: MirRelationExpr,
373    ctx: &mut TransformCtx,
374) -> Result<MirRelationExpr, OptimizerError> {
375    let optimizer = mz_transform::Optimizer::constant_optimizer(ctx);
376    let expr = optimizer.optimize(expr, ctx)?;
377
378    // Trace the result of this phase.
379    mz_repr::explain::trace_plan(expr.as_inner());
380
381    Ok::<_, OptimizerError>(expr.0)
382}
383
384macro_rules! trace_plan {
385    (at: $span:literal, $plan:expr) => {
386        tracing::debug_span!(target: "optimizer", $span).in_scope(|| {
387            mz_repr::explain::trace_plan($plan);
388        });
389    }
390}
391
392use trace_plan;
393
394use crate::coord::ExplainContext;