mz_adapter/
optimize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Optimizer interface to the adapter and coordinator code.
11//!
12//! The goal of this crate is to abstract optimizer specifics behind a
13//! high-level interface that is ready to be consumed by the coordinator code in
14//! a future-proof way (that is, the API is taking the upcoming evolution of
15//! these components into account).
16//!
17//! The contents of this crate should have minimal dependencies to the rest of
18//! the coordinator code so we can pull them out as a separate crate in the
19//! future without too much effort.
20//!
21//! The main type in this module is a very simple [`Optimize`] trait which
22//! allows us to adhere to the following principles:
23//!
24//! - Implementors of this trait are structs that encapsulate all context
25//!   required to optimize a statement of type `T` end-to-end (for example
26//!   [`materialized_view::Optimizer`] for `T` = `MaterializedView`).
27//! - Each struct implements [`Optimize`] once for each optimization stage. The
28//!   `From` type represents the input of the stage and `Self::To` the
29//!   associated stage output. This allows to have more than one entrypoints to
30//!   a pipeline.
31//! - The concrete types used for stage results are opaque structs that are
32//!   specific to the pipeline of that statement type.
33//!   - We use different structs even if two statement types might have
34//!     structurally identical intermediate results. This ensures that client
35//!     code cannot first execute some optimization stages for one type and then
36//!     some stages for a different type.
37//!   - The only way to construct such a struct is by running the [`Optimize`]
38//!     stage that produces it. This ensures that client code cannot interfere
39//!     with the pipeline.
40//!   - In general, the internals of these structs can be accessed only behind a
41//!     shared reference. This ensures that client code can look up information
42//!     from intermediate stages but cannot modify it.
43//!   - Timestamp selection is modeled as a conversion between structs that are
44//!     adjacent in the pipeline using a method called `resolve`.
45//!   - The struct representing the result of the final stage of the
46//!     optimization pipeline can be destructed to access its internals with a
47//!     method called `unapply`.
48//! - The `Send` trait bounds on the `Self` and `From` types ensure that
49//!   [`Optimize`] instances can be passed to different threads (this is
50//!   required of off-thread optimization).
51//!
52//! For details, see the `20230714_optimizer_interface.md` design doc in this
53//! repository.
54
55pub mod copy_to;
56pub mod dataflows;
57pub mod index;
58pub mod materialized_view;
59pub mod peek;
60pub mod subscribe;
61pub mod view;
62
63use std::fmt::Debug;
64
65use mz_adapter_types::connection::ConnectionId;
66use mz_adapter_types::dyncfgs::PERSIST_FAST_PATH_ORDER;
67use mz_catalog::memory::objects::{CatalogCollectionEntry, CatalogEntry, Index};
68use mz_compute_types::dataflows::DataflowDescription;
69use mz_compute_types::plan::Plan;
70use mz_controller_types::ClusterId;
71use mz_expr::{EvalError, MirRelationExpr, OptimizedMirRelationExpr, UnmaterializableFunc};
72use mz_ore::stack::RecursionLimitError;
73use mz_repr::adt::timestamp::TimestampError;
74use mz_repr::optimize::{OptimizerFeatureOverrides, OptimizerFeatures, OverrideFrom};
75use mz_repr::{CatalogItemId, GlobalId};
76use mz_sql::names::{FullItemName, QualifiedItemName};
77use mz_sql::plan::PlanError;
78use mz_sql::session::vars::SystemVars;
79use mz_transform::{MaybeShouldPanic, TransformCtx, TransformError};
80
81// Alias types
82// -----------
83
84/// A type for a [`DataflowDescription`] backed by `Mir~` plans. Used internally
85/// by the optimizer implementations.
86type MirDataflowDescription = DataflowDescription<OptimizedMirRelationExpr>;
87/// A type for a [`DataflowDescription`] backed by `Lir~` plans. Used internally
88/// by the optimizer implementations.
89type LirDataflowDescription = DataflowDescription<Plan>;
90
91// Core API
92// --------
93
94/// A trait that represents an optimization stage.
95///
96/// The trait is implemented by structs that encapsulate the context needed to
97/// run an end-to-end optimization pipeline for a specific statement type
98/// (`Index`, `View`, `MaterializedView`, `Subscribe`, `Select`).
99///
100/// Each implementation represents a concrete optimization stage for a fixed
101/// statement type that consumes an input of type `From` and produces output of
102/// type `Self::To`.
103///
104/// The generic lifetime `'ctx` models the lifetime of the optimizer context and
105/// can be passed to the optimizer struct and the `Self::To` types.
106///
107/// The `'s: 'ctx` bound in the `optimize` method call ensures that an optimizer
108/// instance can run an optimization stage that produces a `Self::To` with
109/// `&'ctx` references.
110pub trait Optimize<From>: Send
111where
112    From: Send,
113{
114    type To: Send;
115
116    /// Execute the optimization stage, transforming the input plan of type
117    /// `From` to an output plan of type `To`.
118    fn optimize(&mut self, plan: From) -> Result<Self::To, OptimizerError>;
119
120    /// Like [`Self::optimize`], but additionally ensures that panics occurring
121    /// in the [`Self::optimize`] call are caught and demoted to an
122    /// [`OptimizerError::Internal`] error.
123    ///
124    /// Additionally, if the result of the optimization is an error (not a panic) that indicates we
125    /// should panic, then panic.
126    #[mz_ore::instrument(target = "optimizer", level = "debug", name = "optimize")]
127    fn catch_unwind_optimize(&mut self, plan: From) -> Result<Self::To, OptimizerError> {
128        mz_transform::catch_unwind_optimize(|| self.optimize(plan))
129    }
130
131    /// Execute the optimization stage and panic if an error occurs.
132    ///
133    /// See [`Optimize::optimize`].
134    #[allow(dead_code)] // This function is never used, but it's useful to keep around.
135    fn must_optimize(&mut self, expr: From) -> Self::To {
136        match self.optimize(expr) {
137            Ok(ok) => ok,
138            Err(err) => panic!("must_optimize call failed: {err}"),
139        }
140    }
141}
142
143// Optimizer configuration
144// -----------------------
145
146/// Feature flags for the optimizer.
147///
148/// To add a new feature flag, do the following steps:
149///
150/// 1. To make the flag available to all stages in our [`Optimize`] pipelines
151///    and allow engineers to set a system-wide override:
152///    1. Add the flag to the `optimizer_feature_flags!(...)` macro call.
153///    2. Add the flag to the `feature_flags!(...)` macro call and extend the
154///       `From<&SystemVars>` implementation for [`OptimizerFeatures`].
155///
156/// 2. To enable `EXPLAIN ... WITH(...)` overrides which will allow engineers to
157///    inspect plan differences before deploying the optimizer changes:
158///    1. Add the flag to the `ExplainPlanOptionName` definition.
159///    2. Add the flag to the `generate_extracted_config!(ExplainPlanOption,
160///       ...)` macro call.
161///    3. Extend the `TryFrom<ExplainPlanOptionExtracted>` implementation for
162///       [`mz_repr::explain::ExplainConfig`].
163///
164/// 3. To enable `CLUSTER ... FEATURES(...)` overrides which will allow
165///    engineers to experiment with runtime differences before deploying the
166///    optimizer changes:
167///    1. Add the flag to the `ClusterFeatureName` definition.
168///    2. Add the flag to the `generate_extracted_config!(ClusterFeature, ...)`
169///       macro call.
170///    3. Extend the `let optimizer_feature_overrides = ...` call in
171///       `plan_create_cluster`.
172#[derive(Clone, Debug)]
173pub struct OptimizerConfig {
174    /// The mode in which the optimizer runs.
175    pub mode: OptimizeMode,
176    /// If the [`GlobalId`] is set the optimizer works in "replan" mode.
177    ///
178    /// This means that it will not consider catalog items (more specifically
179    /// indexes) with [`GlobalId`] greater or equal than the one provided here.
180    pub replan: Option<GlobalId>,
181    /// Show the slow path plan even if a fast path plan was created. Useful for debugging.
182    /// Enforced if `timing` is set.
183    pub no_fast_path: bool,
184    // If set, allow some additional queries down the Persist fast path when we believe
185    // the orderings are compatible.
186    persist_fast_path_order: bool,
187    /// Optimizer feature flags.
188    pub features: OptimizerFeatures,
189}
190
191#[derive(Clone, Debug, PartialEq, Eq)]
192pub enum OptimizeMode {
193    /// A mode where the optimized statement is executed.
194    Execute,
195    /// A mode where the optimized statement is explained.
196    Explain,
197}
198
199impl From<&SystemVars> for OptimizerConfig {
200    fn from(vars: &SystemVars) -> Self {
201        Self {
202            mode: OptimizeMode::Execute,
203            replan: None,
204            no_fast_path: false,
205            persist_fast_path_order: PERSIST_FAST_PATH_ORDER.get(vars.dyncfgs()),
206            features: OptimizerFeatures::from(vars),
207        }
208    }
209}
210
211/// Override [`OptimizerConfig::features`] from [`OptimizerFeatureOverrides`].
212impl OverrideFrom<OptimizerFeatureOverrides> for OptimizerConfig {
213    fn override_from(mut self, overrides: &OptimizerFeatureOverrides) -> Self {
214        self.features = self.features.override_from(overrides);
215        self
216    }
217}
218
219/// [`OptimizerConfig`] overrides coming from an [`ExplainContext`].
220impl OverrideFrom<ExplainContext> for OptimizerConfig {
221    fn override_from(mut self, ctx: &ExplainContext) -> Self {
222        let ExplainContext::Plan(ctx) = ctx else {
223            return self; // Return immediately for all other contexts.
224        };
225
226        // Override general parameters.
227        self.mode = OptimizeMode::Explain;
228        self.replan = ctx.replan;
229        self.no_fast_path = ctx.config.no_fast_path;
230
231        // Override feature flags that can be enabled in the EXPLAIN config.
232        self.features = self.features.override_from(&ctx.config.features);
233
234        // Return the final result.
235        self
236    }
237}
238
239impl From<&OptimizerConfig> for mz_sql::plan::HirToMirConfig {
240    fn from(config: &OptimizerConfig) -> Self {
241        Self {
242            enable_new_outer_join_lowering: config.features.enable_new_outer_join_lowering,
243            enable_variadic_left_join_lowering: config.features.enable_variadic_left_join_lowering,
244        }
245    }
246}
247
248// OptimizerCatalog
249// ===============
250
251pub trait OptimizerCatalog: Debug + Send + Sync {
252    fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry;
253    fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry;
254    fn resolve_full_name(
255        &self,
256        name: &QualifiedItemName,
257        conn_id: Option<&ConnectionId>,
258    ) -> FullItemName;
259
260    /// Returns all indexes on the given object and cluster known in the
261    /// catalog.
262    fn get_indexes_on(
263        &self,
264        id: GlobalId,
265        cluster: ClusterId,
266    ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_>;
267}
268
269// OptimizerError
270// ===============
271
272/// Error types that can be generated during optimization.
273#[derive(Debug, thiserror::Error)]
274pub enum OptimizerError {
275    #[error("{0}")]
276    PlanError(#[from] PlanError),
277    #[error("{0}")]
278    RecursionLimitError(#[from] RecursionLimitError),
279    #[error("{0}")]
280    TransformError(#[from] TransformError),
281    #[error("{0}")]
282    EvalError(#[from] EvalError),
283    #[error("cannot materialize call to {0}")]
284    UnmaterializableFunction(UnmaterializableFunc),
285    #[error("cannot call {func} in {context} ")]
286    UncallableFunction {
287        func: UnmaterializableFunc,
288        context: &'static str,
289    },
290    #[error("MfpPlan couldn't be converted into SafeMfpPlan")]
291    UnsafeMfpPlan,
292    #[error("internal optimizer error: {0}")]
293    Internal(String),
294}
295
296impl From<String> for OptimizerError {
297    fn from(msg: String) -> Self {
298        Self::Internal(msg)
299    }
300}
301
302impl OptimizerError {
303    pub fn detail(&self) -> Option<String> {
304        match self {
305            Self::UnmaterializableFunction(UnmaterializableFunc::CurrentTimestamp) => {
306                Some("See: https://materialize.com/docs/sql/functions/now_and_mz_now/".into())
307            }
308            _ => None,
309        }
310    }
311
312    pub fn hint(&self) -> Option<String> {
313        match self {
314            Self::UnmaterializableFunction(UnmaterializableFunc::CurrentTimestamp) => {
315                Some("In temporal filters `mz_now()` may work instead.".into())
316            }
317            _ => None,
318        }
319    }
320}
321
322impl From<TimestampError> for OptimizerError {
323    fn from(value: TimestampError) -> Self {
324        OptimizerError::EvalError(EvalError::from(value))
325    }
326}
327
328impl From<anyhow::Error> for OptimizerError {
329    fn from(value: anyhow::Error) -> Self {
330        OptimizerError::Internal(value.to_string())
331    }
332}
333
334impl MaybeShouldPanic for OptimizerError {
335    fn should_panic(&self) -> Option<String> {
336        match self {
337            OptimizerError::TransformError(TransformError::CallerShouldPanic(msg)) => {
338                Some(msg.to_string())
339            }
340            _ => None,
341        }
342    }
343}
344
345// Tracing helpers
346// ---------------
347
348#[mz_ore::instrument(target = "optimizer", level = "debug", name = "local")]
349fn optimize_mir_local(
350    expr: MirRelationExpr,
351    ctx: &mut TransformCtx,
352) -> Result<OptimizedMirRelationExpr, OptimizerError> {
353    #[allow(deprecated)]
354    let optimizer = mz_transform::Optimizer::logical_optimizer(ctx);
355    let expr = optimizer.optimize(expr, ctx)?;
356
357    // Trace the result of this phase.
358    mz_repr::explain::trace_plan(expr.as_inner());
359
360    Ok::<_, OptimizerError>(expr)
361}
362
363/// This is just a wrapper around [mz_transform::Optimizer::constant_optimizer],
364/// running it, and tracing the result plan.
365#[mz_ore::instrument(target = "optimizer", level = "debug", name = "constant")]
366fn optimize_mir_constant(
367    expr: MirRelationExpr,
368    ctx: &mut TransformCtx,
369) -> Result<MirRelationExpr, OptimizerError> {
370    let optimizer = mz_transform::Optimizer::constant_optimizer(ctx);
371    let expr = optimizer.optimize(expr, ctx)?;
372
373    // Trace the result of this phase.
374    mz_repr::explain::trace_plan(expr.as_inner());
375
376    Ok::<_, OptimizerError>(expr.0)
377}
378
379macro_rules! trace_plan {
380    (at: $span:literal, $plan:expr) => {
381        tracing::debug_span!(target: "optimizer", $span).in_scope(|| {
382            mz_repr::explain::trace_plan($plan);
383        });
384    }
385}
386
387use trace_plan;
388
389use crate::coord::ExplainContext;