mz_adapter/optimize.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Optimizer interface to the adapter and coordinator code.
11//!
12//! The goal of this crate is to abstract optimizer specifics behind a
13//! high-level interface that is ready to be consumed by the coordinator code in
14//! a future-proof way (that is, the API is taking the upcoming evolution of
15//! these components into account).
16//!
17//! The contents of this crate should have minimal dependencies to the rest of
18//! the coordinator code so we can pull them out as a separate crate in the
19//! future without too much effort.
20//!
21//! The main type in this module is a very simple [`Optimize`] trait which
22//! allows us to adhere to the following principles:
23//!
24//! - Implementors of this trait are structs that encapsulate all context
25//! required to optimize a statement of type `T` end-to-end (for example
26//! [`materialized_view::Optimizer`] for `T` = `MaterializedView`).
27//! - Each struct implements [`Optimize`] once for each optimization stage. The
28//! `From` type represents the input of the stage and `Self::To` the
29//! associated stage output. This allows to have more than one entrypoints to
30//! a pipeline.
31//! - The concrete types used for stage results are opaque structs that are
32//! specific to the pipeline of that statement type.
33//! - We use different structs even if two statement types might have
34//! structurally identical intermediate results. This ensures that client
35//! code cannot first execute some optimization stages for one type and then
36//! some stages for a different type.
37//! - The only way to construct such a struct is by running the [`Optimize`]
38//! stage that produces it. This ensures that client code cannot interfere
39//! with the pipeline.
40//! - In general, the internals of these structs can be accessed only behind a
41//! shared reference. This ensures that client code can look up information
42//! from intermediate stages but cannot modify it.
43//! - Timestamp selection is modeled as a conversion between structs that are
44//! adjacent in the pipeline using a method called `resolve`.
45//! - The struct representing the result of the final stage of the
46//! optimization pipeline can be destructed to access its internals with a
47//! method called `unapply`.
48//! - The `Send` trait bounds on the `Self` and `From` types ensure that
49//! [`Optimize`] instances can be passed to different threads (this is
50//! required of off-thread optimization).
51//!
52//! For details, see the `20230714_optimizer_interface.md` design doc in this
53//! repository.
54
55pub mod copy_to;
56pub mod dataflows;
57pub mod index;
58pub mod materialized_view;
59pub mod peek;
60pub mod subscribe;
61pub mod view;
62
63use std::fmt::Debug;
64
65use mz_adapter_types::connection::ConnectionId;
66use mz_adapter_types::dyncfgs::PERSIST_FAST_PATH_ORDER;
67use mz_catalog::memory::objects::{CatalogCollectionEntry, CatalogEntry, Index};
68use mz_compute_types::dataflows::DataflowDescription;
69use mz_compute_types::plan::Plan;
70use mz_controller_types::ClusterId;
71use mz_expr::{EvalError, MirRelationExpr, OptimizedMirRelationExpr, UnmaterializableFunc};
72use mz_ore::stack::RecursionLimitError;
73use mz_repr::adt::timestamp::TimestampError;
74use mz_repr::optimize::{OptimizerFeatureOverrides, OptimizerFeatures, OverrideFrom};
75use mz_repr::{CatalogItemId, GlobalId};
76use mz_sql::names::{FullItemName, QualifiedItemName};
77use mz_sql::plan::PlanError;
78use mz_sql::session::vars::SystemVars;
79use mz_transform::{MaybeShouldPanic, TransformCtx, TransformError};
80
81// Alias types
82// -----------
83
84/// A type for a [`DataflowDescription`] backed by `Mir~` plans. Used internally
85/// by the optimizer implementations.
86type MirDataflowDescription = DataflowDescription<OptimizedMirRelationExpr>;
87/// A type for a [`DataflowDescription`] backed by `Lir~` plans. Used internally
88/// by the optimizer implementations.
89type LirDataflowDescription = DataflowDescription<Plan>;
90
91// Core API
92// --------
93
94/// A trait that represents an optimization stage.
95///
96/// The trait is implemented by structs that encapsulate the context needed to
97/// run an end-to-end optimization pipeline for a specific statement type
98/// (`Index`, `View`, `MaterializedView`, `Subscribe`, `Select`).
99///
100/// Each implementation represents a concrete optimization stage for a fixed
101/// statement type that consumes an input of type `From` and produces output of
102/// type `Self::To`.
103///
104/// The generic lifetime `'ctx` models the lifetime of the optimizer context and
105/// can be passed to the optimizer struct and the `Self::To` types.
106///
107/// The `'s: 'ctx` bound in the `optimize` method call ensures that an optimizer
108/// instance can run an optimization stage that produces a `Self::To` with
109/// `&'ctx` references.
110pub trait Optimize<From>: Send
111where
112 From: Send,
113{
114 type To: Send;
115
116 /// Execute the optimization stage, transforming the input plan of type
117 /// `From` to an output plan of type `To`.
118 fn optimize(&mut self, plan: From) -> Result<Self::To, OptimizerError>;
119
120 /// Like [`Self::optimize`], but additionally ensures that panics occurring
121 /// in the [`Self::optimize`] call are caught and demoted to an
122 /// [`OptimizerError::Internal`] error.
123 ///
124 /// Additionally, if the result of the optimization is an error (not a panic) that indicates we
125 /// should panic, then panic.
126 #[mz_ore::instrument(target = "optimizer", level = "debug", name = "optimize")]
127 fn catch_unwind_optimize(&mut self, plan: From) -> Result<Self::To, OptimizerError> {
128 mz_transform::catch_unwind_optimize(|| self.optimize(plan))
129 }
130
131 /// Execute the optimization stage and panic if an error occurs.
132 ///
133 /// See [`Optimize::optimize`].
134 #[allow(dead_code)] // This function is never used, but it's useful to keep around.
135 fn must_optimize(&mut self, expr: From) -> Self::To {
136 match self.optimize(expr) {
137 Ok(ok) => ok,
138 Err(err) => panic!("must_optimize call failed: {err}"),
139 }
140 }
141}
142
143// Optimizer configuration
144// -----------------------
145
146/// Feature flags for the optimizer.
147///
148/// To add a new feature flag, do the following steps:
149///
150/// 1. To make the flag available to all stages in our [`Optimize`] pipelines
151/// and allow engineers to set a system-wide override:
152/// 1. Add the flag to the `optimizer_feature_flags!(...)` macro call.
153/// 2. Add the flag to the `feature_flags!(...)` macro call and extend the
154/// `From<&SystemVars>` implementation for [`OptimizerFeatures`].
155///
156/// 2. To enable `EXPLAIN ... WITH(...)` overrides which will allow engineers to
157/// inspect plan differences before deploying the optimizer changes:
158/// 1. Add the flag to the `ExplainPlanOptionName` definition.
159/// 2. Add the flag to the `generate_extracted_config!(ExplainPlanOption,
160/// ...)` macro call.
161/// 3. Extend the `TryFrom<ExplainPlanOptionExtracted>` implementation for
162/// [`mz_repr::explain::ExplainConfig`].
163///
164/// 3. To enable `CLUSTER ... FEATURES(...)` overrides which will allow
165/// engineers to experiment with runtime differences before deploying the
166/// optimizer changes:
167/// 1. Add the flag to the `ClusterFeatureName` definition.
168/// 2. Add the flag to the `generate_extracted_config!(ClusterFeature, ...)`
169/// macro call.
170/// 3. Extend the `let optimizer_feature_overrides = ...` call in
171/// `plan_create_cluster`.
172#[derive(Clone, Debug)]
173pub struct OptimizerConfig {
174 /// The mode in which the optimizer runs.
175 pub mode: OptimizeMode,
176 /// If the [`GlobalId`] is set the optimizer works in "replan" mode.
177 ///
178 /// This means that it will not consider catalog items (more specifically
179 /// indexes) with [`GlobalId`] greater or equal than the one provided here.
180 pub replan: Option<GlobalId>,
181 /// Show the slow path plan even if a fast path plan was created. Useful for debugging.
182 /// Enforced if `timing` is set.
183 pub no_fast_path: bool,
184 // If set, allow some additional queries down the Persist fast path when we believe
185 // the orderings are compatible.
186 persist_fast_path_order: bool,
187 /// Optimizer feature flags.
188 pub features: OptimizerFeatures,
189}
190
191#[derive(Clone, Debug, PartialEq, Eq)]
192pub enum OptimizeMode {
193 /// A mode where the optimized statement is executed.
194 Execute,
195 /// A mode where the optimized statement is explained.
196 Explain,
197}
198
199impl From<&SystemVars> for OptimizerConfig {
200 fn from(vars: &SystemVars) -> Self {
201 Self {
202 mode: OptimizeMode::Execute,
203 replan: None,
204 no_fast_path: false,
205 persist_fast_path_order: PERSIST_FAST_PATH_ORDER.get(vars.dyncfgs()),
206 features: OptimizerFeatures::from(vars),
207 }
208 }
209}
210
211/// Override [`OptimizerConfig::features`] from [`OptimizerFeatureOverrides`].
212impl OverrideFrom<OptimizerFeatureOverrides> for OptimizerConfig {
213 fn override_from(mut self, overrides: &OptimizerFeatureOverrides) -> Self {
214 self.features = self.features.override_from(overrides);
215 self
216 }
217}
218
219/// [`OptimizerConfig`] overrides coming from an [`ExplainContext`].
220impl OverrideFrom<ExplainContext> for OptimizerConfig {
221 fn override_from(mut self, ctx: &ExplainContext) -> Self {
222 let ExplainContext::Plan(ctx) = ctx else {
223 return self; // Return immediately for all other contexts.
224 };
225
226 // Override general parameters.
227 self.mode = OptimizeMode::Explain;
228 self.replan = ctx.replan;
229 self.no_fast_path = ctx.config.no_fast_path;
230
231 // Override feature flags that can be enabled in the EXPLAIN config.
232 self.features = self.features.override_from(&ctx.config.features);
233
234 // Return the final result.
235 self
236 }
237}
238
239impl From<&OptimizerConfig> for mz_sql::plan::HirToMirConfig {
240 fn from(config: &OptimizerConfig) -> Self {
241 Self {
242 enable_new_outer_join_lowering: config.features.enable_new_outer_join_lowering,
243 enable_variadic_left_join_lowering: config.features.enable_variadic_left_join_lowering,
244 }
245 }
246}
247
248// OptimizerCatalog
249// ===============
250
251pub trait OptimizerCatalog: Debug + Send + Sync {
252 fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry;
253 fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry;
254 fn resolve_full_name(
255 &self,
256 name: &QualifiedItemName,
257 conn_id: Option<&ConnectionId>,
258 ) -> FullItemName;
259
260 /// Returns all indexes on the given object and cluster known in the
261 /// catalog.
262 fn get_indexes_on(
263 &self,
264 id: GlobalId,
265 cluster: ClusterId,
266 ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_>;
267}
268
269// OptimizerError
270// ===============
271
272/// Error types that can be generated during optimization.
273#[derive(Debug, thiserror::Error)]
274pub enum OptimizerError {
275 #[error("{0}")]
276 PlanError(#[from] PlanError),
277 #[error("{0}")]
278 RecursionLimitError(#[from] RecursionLimitError),
279 #[error("{0}")]
280 TransformError(#[from] TransformError),
281 #[error("{0}")]
282 EvalError(#[from] EvalError),
283 #[error("cannot materialize call to {0}")]
284 UnmaterializableFunction(UnmaterializableFunc),
285 #[error("cannot call {func} in {context} ")]
286 UncallableFunction {
287 func: UnmaterializableFunc,
288 context: &'static str,
289 },
290 #[error("MfpPlan couldn't be converted into SafeMfpPlan")]
291 UnsafeMfpPlan,
292 #[error("internal optimizer error: {0}")]
293 Internal(String),
294}
295
296impl From<String> for OptimizerError {
297 fn from(msg: String) -> Self {
298 Self::Internal(msg)
299 }
300}
301
302impl OptimizerError {
303 pub fn detail(&self) -> Option<String> {
304 match self {
305 Self::UnmaterializableFunction(UnmaterializableFunc::CurrentTimestamp) => {
306 Some("See: https://materialize.com/docs/sql/functions/now_and_mz_now/".into())
307 }
308 _ => None,
309 }
310 }
311
312 pub fn hint(&self) -> Option<String> {
313 match self {
314 Self::UnmaterializableFunction(UnmaterializableFunc::CurrentTimestamp) => {
315 Some("In temporal filters `mz_now()` may work instead.".into())
316 }
317 _ => None,
318 }
319 }
320}
321
322impl From<TimestampError> for OptimizerError {
323 fn from(value: TimestampError) -> Self {
324 OptimizerError::EvalError(EvalError::from(value))
325 }
326}
327
328impl From<anyhow::Error> for OptimizerError {
329 fn from(value: anyhow::Error) -> Self {
330 OptimizerError::Internal(value.to_string())
331 }
332}
333
334impl MaybeShouldPanic for OptimizerError {
335 fn should_panic(&self) -> Option<String> {
336 match self {
337 OptimizerError::TransformError(TransformError::CallerShouldPanic(msg)) => {
338 Some(msg.to_string())
339 }
340 _ => None,
341 }
342 }
343}
344
345// Tracing helpers
346// ---------------
347
348#[mz_ore::instrument(target = "optimizer", level = "debug", name = "local")]
349fn optimize_mir_local(
350 expr: MirRelationExpr,
351 ctx: &mut TransformCtx,
352) -> Result<OptimizedMirRelationExpr, OptimizerError> {
353 #[allow(deprecated)]
354 let optimizer = mz_transform::Optimizer::logical_optimizer(ctx);
355 let expr = optimizer.optimize(expr, ctx)?;
356
357 // Trace the result of this phase.
358 mz_repr::explain::trace_plan(expr.as_inner());
359
360 Ok::<_, OptimizerError>(expr)
361}
362
363/// This is just a wrapper around [mz_transform::Optimizer::constant_optimizer],
364/// running it, and tracing the result plan.
365#[mz_ore::instrument(target = "optimizer", level = "debug", name = "constant")]
366fn optimize_mir_constant(
367 expr: MirRelationExpr,
368 ctx: &mut TransformCtx,
369) -> Result<MirRelationExpr, OptimizerError> {
370 let optimizer = mz_transform::Optimizer::constant_optimizer(ctx);
371 let expr = optimizer.optimize(expr, ctx)?;
372
373 // Trace the result of this phase.
374 mz_repr::explain::trace_plan(expr.as_inner());
375
376 Ok::<_, OptimizerError>(expr.0)
377}
378
379macro_rules! trace_plan {
380 (at: $span:literal, $plan:expr) => {
381 tracing::debug_span!(target: "optimizer", $span).in_scope(|| {
382 mz_repr::explain::trace_plan($plan);
383 });
384 }
385}
386
387use trace_plan;
388
389use crate::coord::ExplainContext;