mz_adapter/optimize.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Optimizer interface to the adapter and coordinator code.
11//!
12//! The goal of this crate is to abstract optimizer specifics behind a
13//! high-level interface that is ready to be consumed by the coordinator code in
14//! a future-proof way (that is, the API is taking the upcoming evolution of
15//! these components into account).
16//!
17//! The contents of this crate should have minimal dependencies to the rest of
18//! the coordinator code so we can pull them out as a separate crate in the
19//! future without too much effort.
20//!
21//! The main type in this module is a very simple [`Optimize`] trait which
22//! allows us to adhere to the following principles:
23//!
24//! - Implementors of this trait are structs that encapsulate all context
25//! required to optimize a statement of type `T` end-to-end (for example
26//! [`materialized_view::Optimizer`] for `T` = `MaterializedView`).
27//! - Each struct implements [`Optimize`] once for each optimization stage. The
28//! `From` type represents the input of the stage and `Self::To` the
29//! associated stage output. This allows to have more than one entrypoints to
30//! a pipeline.
31//! - The concrete types used for stage results are opaque structs that are
32//! specific to the pipeline of that statement type.
33//! - We use different structs even if two statement types might have
34//! structurally identical intermediate results. This ensures that client
35//! code cannot first execute some optimization stages for one type and then
36//! some stages for a different type.
37//! - The only way to construct such a struct is by running the [`Optimize`]
38//! stage that produces it. This ensures that client code cannot interfere
39//! with the pipeline.
40//! - In general, the internals of these structs can be accessed only behind a
41//! shared reference. This ensures that client code can look up information
42//! from intermediate stages but cannot modify it.
43//! - Timestamp selection is modeled as a conversion between structs that are
44//! adjacent in the pipeline using a method called `resolve`.
45//! - The struct representing the result of the final stage of the
46//! optimization pipeline can be destructed to access its internals with a
47//! method called `unapply`.
48//! - The `Send` trait bounds on the `Self` and `From` types ensure that
49//! [`Optimize`] instances can be passed to different threads (this is
50//! required of off-thread optimization).
51//!
52//! For details, see the `20230714_optimizer_interface.md` design doc in this
53//! repository.
54
55pub mod copy_to;
56pub mod dataflows;
57pub mod index;
58pub mod materialized_view;
59pub mod peek;
60pub mod subscribe;
61pub mod view;
62
63use std::fmt::Debug;
64
65use mz_adapter_types::connection::ConnectionId;
66use mz_adapter_types::dyncfgs::PERSIST_FAST_PATH_ORDER;
67use mz_catalog::memory::objects::{CatalogCollectionEntry, CatalogEntry, Index};
68use mz_compute_types::dataflows::DataflowDescription;
69use mz_compute_types::plan::Plan;
70use mz_controller_types::ClusterId;
71use mz_expr::{EvalError, MirRelationExpr, OptimizedMirRelationExpr, UnmaterializableFunc};
72use mz_ore::stack::RecursionLimitError;
73use mz_repr::adt::timestamp::TimestampError;
74use mz_repr::optimize::{OptimizerFeatureOverrides, OptimizerFeatures, OverrideFrom};
75use mz_repr::{CatalogItemId, GlobalId};
76use mz_sql::names::{FullItemName, QualifiedItemName};
77use mz_sql::plan::PlanError;
78use mz_sql::session::vars::SystemVars;
79use mz_transform::{MaybeShouldPanic, TransformCtx, TransformError};
80
81// Alias types
82// -----------
83
84/// A type for a [`DataflowDescription`] backed by `Mir~` plans. Used internally
85/// by the optimizer implementations.
86type MirDataflowDescription = DataflowDescription<OptimizedMirRelationExpr>;
87/// A type for a [`DataflowDescription`] backed by `Lir~` plans. Used internally
88/// by the optimizer implementations.
89type LirDataflowDescription = DataflowDescription<Plan>;
90
91// Core API
92// --------
93
94/// A trait that represents an optimization stage.
95///
96/// The trait is implemented by structs that encapsulate the context needed to
97/// run an end-to-end optimization pipeline for a specific statement type
98/// (`Index`, `View`, `MaterializedView`, `Subscribe`, `Select`).
99///
100/// Each implementation represents a concrete optimization stage for a fixed
101/// statement type that consumes an input of type `From` and produces output of
102/// type `Self::To`.
103pub trait Optimize<From> {
104 type To;
105
106 /// Execute the optimization stage, transforming the input plan of type
107 /// `From` to an output plan of type `To`.
108 fn optimize(&mut self, plan: From) -> Result<Self::To, OptimizerError>;
109
110 /// Like [`Self::optimize`], but additionally ensures that panics occurring
111 /// in the [`Self::optimize`] call are caught and demoted to an
112 /// [`OptimizerError::Internal`] error.
113 ///
114 /// Additionally, if the result of the optimization is an error (not a panic) that indicates we
115 /// should panic, then panic.
116 #[mz_ore::instrument(target = "optimizer", level = "debug", name = "optimize")]
117 fn catch_unwind_optimize(&mut self, plan: From) -> Result<Self::To, OptimizerError> {
118 mz_transform::catch_unwind_optimize(|| self.optimize(plan))
119 }
120}
121
122// Optimizer configuration
123// -----------------------
124
125/// Feature flags for the optimizer.
126///
127/// To add a new feature flag, do the following steps:
128///
129/// 1. To make the flag available to all stages in our [`Optimize`] pipelines
130/// and allow engineers to set a system-wide override:
131/// 1. Add the flag to the `optimizer_feature_flags!(...)` macro call.
132/// 2. Add the flag to the `feature_flags!(...)` macro call and extend the
133/// `From<&SystemVars>` implementation for [`OptimizerFeatures`].
134///
135/// 2. To enable `EXPLAIN ... WITH(...)` overrides which will allow engineers to
136/// inspect plan differences before deploying the optimizer changes:
137/// 1. Add the flag to the `ExplainPlanOptionName` definition.
138/// 2. Add the flag to the `generate_extracted_config!(ExplainPlanOption,
139/// ...)` macro call.
140/// 3. Extend the `TryFrom<ExplainPlanOptionExtracted>` implementation for
141/// [`mz_repr::explain::ExplainConfig`].
142///
143/// 3. To enable `CLUSTER ... FEATURES(...)` overrides which will allow
144/// engineers to experiment with runtime differences before deploying the
145/// optimizer changes:
146/// 1. Add the flag to the `ClusterFeatureName` definition.
147/// 2. Add the flag to the `generate_extracted_config!(ClusterFeature, ...)`
148/// macro call.
149/// 3. Extend the `let optimizer_feature_overrides = ...` call in
150/// `plan_create_cluster`.
151#[derive(Clone, Debug)]
152pub struct OptimizerConfig {
153 /// The mode in which the optimizer runs.
154 pub mode: OptimizeMode,
155 /// If the [`GlobalId`] is set the optimizer works in "replan" mode.
156 ///
157 /// This means that it will not consider catalog items (more specifically
158 /// indexes) with [`GlobalId`] greater or equal than the one provided here.
159 pub replan: Option<GlobalId>,
160 /// Show the slow path plan even if a fast path plan was created. Useful for debugging.
161 /// Enforced if `timing` is set.
162 pub no_fast_path: bool,
163 // If set, allow some additional queries down the Persist fast path when we believe
164 // the orderings are compatible.
165 persist_fast_path_order: bool,
166 /// Optimizer feature flags.
167 pub features: OptimizerFeatures,
168}
169
170#[derive(Clone, Debug, PartialEq, Eq)]
171pub enum OptimizeMode {
172 /// A mode where the optimized statement is executed.
173 Execute,
174 /// A mode where the optimized statement is explained.
175 Explain,
176}
177
178impl From<&SystemVars> for OptimizerConfig {
179 fn from(vars: &SystemVars) -> Self {
180 Self {
181 mode: OptimizeMode::Execute,
182 replan: None,
183 no_fast_path: false,
184 persist_fast_path_order: PERSIST_FAST_PATH_ORDER.get(vars.dyncfgs()),
185 features: OptimizerFeatures::from(vars),
186 }
187 }
188}
189
190/// Override [`OptimizerConfig::features`] from [`OptimizerFeatureOverrides`].
191impl OverrideFrom<OptimizerFeatureOverrides> for OptimizerConfig {
192 fn override_from(mut self, overrides: &OptimizerFeatureOverrides) -> Self {
193 self.features = self.features.override_from(overrides);
194 self
195 }
196}
197
198/// [`OptimizerConfig`] overrides coming from an [`ExplainContext`].
199impl OverrideFrom<ExplainContext> for OptimizerConfig {
200 fn override_from(mut self, ctx: &ExplainContext) -> Self {
201 let ExplainContext::Plan(ctx) = ctx else {
202 return self; // Return immediately for all other contexts.
203 };
204
205 // Override general parameters.
206 self.mode = OptimizeMode::Explain;
207 self.replan = ctx.replan;
208 self.no_fast_path = ctx.config.no_fast_path;
209
210 // Override feature flags that can be enabled in the EXPLAIN config.
211 self.features = self.features.override_from(&ctx.config.features);
212
213 // Return the final result.
214 self
215 }
216}
217
218impl From<&OptimizerConfig> for mz_sql::plan::HirToMirConfig {
219 fn from(config: &OptimizerConfig) -> Self {
220 Self {
221 enable_new_outer_join_lowering: config.features.enable_new_outer_join_lowering,
222 enable_variadic_left_join_lowering: config.features.enable_variadic_left_join_lowering,
223 enable_guard_subquery_tablefunc: config.features.enable_guard_subquery_tablefunc,
224 enable_cast_elimination: config.features.enable_cast_elimination,
225 }
226 }
227}
228
229// OptimizerCatalog
230// ===============
231
232pub trait OptimizerCatalog: Debug + Send + Sync {
233 fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry;
234 fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry;
235 fn resolve_full_name(
236 &self,
237 name: &QualifiedItemName,
238 conn_id: Option<&ConnectionId>,
239 ) -> FullItemName;
240
241 /// Returns all indexes on the given object and cluster known in the
242 /// catalog.
243 fn get_indexes_on(
244 &self,
245 id: GlobalId,
246 cluster: ClusterId,
247 ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_>;
248}
249
250// OptimizerError
251// ===============
252
253/// Error types that can be generated during optimization.
254#[derive(Debug, thiserror::Error)]
255pub enum OptimizerError {
256 #[error("{0}")]
257 PlanError(#[from] PlanError),
258 #[error("{0}")]
259 RecursionLimitError(#[from] RecursionLimitError),
260 #[error("{0}")]
261 TransformError(#[from] TransformError),
262 #[error("{0}")]
263 EvalError(#[from] EvalError),
264 #[error("cannot materialize call to {0}")]
265 UnmaterializableFunction(UnmaterializableFunc),
266 #[error("cannot call {func} in {context} ")]
267 UncallableFunction {
268 func: UnmaterializableFunc,
269 context: &'static str,
270 },
271 #[error("{0}")]
272 UnsupportedTemporalExpression(String),
273 /// This is a specific kind of internal error. It's distinct from `Internal`, because we want to
274 /// catch it and swallow it in some cases.
275 #[error("internal optimizer error: MfpPlan couldn't be converted into SafeMfpPlan")]
276 InternalUnsafeMfpPlan(String),
277 #[error("internal optimizer error: {0}")]
278 Internal(String),
279}
280
281impl From<String> for OptimizerError {
282 fn from(msg: String) -> Self {
283 Self::Internal(msg)
284 }
285}
286
287impl OptimizerError {
288 pub fn detail(&self) -> Option<String> {
289 match self {
290 Self::UnmaterializableFunction(UnmaterializableFunc::CurrentTimestamp) => {
291 Some("See: https://materialize.com/docs/sql/functions/now_and_mz_now/".into())
292 }
293 _ => None,
294 }
295 }
296
297 pub fn hint(&self) -> Option<String> {
298 match self {
299 Self::UnmaterializableFunction(UnmaterializableFunc::CurrentTimestamp) => {
300 Some("In temporal filters `mz_now()` may work instead.".into())
301 }
302 _ => None,
303 }
304 }
305}
306
307impl From<TimestampError> for OptimizerError {
308 fn from(value: TimestampError) -> Self {
309 OptimizerError::EvalError(EvalError::from(value))
310 }
311}
312
313impl From<anyhow::Error> for OptimizerError {
314 fn from(value: anyhow::Error) -> Self {
315 OptimizerError::Internal(value.to_string())
316 }
317}
318
319impl MaybeShouldPanic for OptimizerError {
320 fn should_panic(&self) -> Option<String> {
321 match self {
322 OptimizerError::TransformError(TransformError::CallerShouldPanic(msg)) => {
323 Some(msg.to_string())
324 }
325 _ => None,
326 }
327 }
328}
329
330// Tracing helpers
331// ---------------
332
333#[mz_ore::instrument(target = "optimizer", level = "debug", name = "local")]
334fn optimize_mir_local(
335 expr: MirRelationExpr,
336 ctx: &mut TransformCtx,
337) -> Result<OptimizedMirRelationExpr, OptimizerError> {
338 #[allow(deprecated)]
339 let optimizer = mz_transform::Optimizer::logical_optimizer(ctx);
340 let expr = optimizer.optimize(expr, ctx)?;
341
342 // Trace the result of this phase.
343 mz_repr::explain::trace_plan(expr.as_inner());
344
345 Ok::<_, OptimizerError>(expr)
346}
347
348/// This is just a wrapper around [mz_transform::Optimizer::constant_optimizer],
349/// running it, and tracing the result plan.
350#[mz_ore::instrument(target = "optimizer", level = "debug", name = "constant")]
351fn optimize_mir_constant(
352 expr: MirRelationExpr,
353 ctx: &mut TransformCtx,
354 limit: bool,
355) -> Result<MirRelationExpr, OptimizerError> {
356 let optimizer = mz_transform::Optimizer::constant_optimizer(ctx, limit);
357 let expr = optimizer.optimize(expr, ctx)?;
358
359 // Trace the result of this phase.
360 mz_repr::explain::trace_plan(expr.as_inner());
361
362 Ok::<_, OptimizerError>(expr.0)
363}
364
365macro_rules! trace_plan {
366 (at: $span:literal, $plan:expr) => {
367 tracing::debug_span!(target: "optimizer", $span).in_scope(|| {
368 mz_repr::explain::trace_plan($plan);
369 });
370 }
371}
372
373use trace_plan;
374
375use crate::coord::ExplainContext;