mz_adapter/optimize.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Optimizer interface to the adapter and coordinator code.
11//!
12//! The goal of this crate is to abstract optimizer specifics behind a
13//! high-level interface that is ready to be consumed by the coordinator code in
14//! a future-proof way (that is, the API is taking the upcoming evolution of
15//! these components into account).
16//!
17//! The contents of this crate should have minimal dependencies to the rest of
18//! the coordinator code so we can pull them out as a separate crate in the
19//! future without too much effort.
20//!
21//! The main type in this module is a very simple [`Optimize`] trait which
22//! allows us to adhere to the following principles:
23//!
24//! - Implementors of this trait are structs that encapsulate all context
25//! required to optimize a statement of type `T` end-to-end (for example
26//! [`materialized_view::Optimizer`] for `T` = `MaterializedView`).
27//! - Each struct implements [`Optimize`] once for each optimization stage. The
28//! `From` type represents the input of the stage and `Self::To` the
29//! associated stage output. This allows to have more than one entrypoints to
30//! a pipeline.
31//! - The concrete types used for stage results are opaque structs that are
32//! specific to the pipeline of that statement type.
33//! - We use different structs even if two statement types might have
34//! structurally identical intermediate results. This ensures that client
35//! code cannot first execute some optimization stages for one type and then
36//! some stages for a different type.
37//! - The only way to construct such a struct is by running the [`Optimize`]
38//! stage that produces it. This ensures that client code cannot interfere
39//! with the pipeline.
40//! - In general, the internals of these structs can be accessed only behind a
41//! shared reference. This ensures that client code can look up information
42//! from intermediate stages but cannot modify it.
43//! - Timestamp selection is modeled as a conversion between structs that are
44//! adjacent in the pipeline using a method called `resolve`.
45//! - The struct representing the result of the final stage of the
46//! optimization pipeline can be destructed to access its internals with a
47//! method called `unapply`.
48//! - The `Send` trait bounds on the `Self` and `From` types ensure that
49//! [`Optimize`] instances can be passed to different threads (this is
50//! required of off-thread optimization).
51//!
52//! For details, see the `20230714_optimizer_interface.md` design doc in this
53//! repository.
54
55pub mod copy_to;
56pub mod dataflows;
57pub mod index;
58pub mod materialized_view;
59pub mod peek;
60pub mod subscribe;
61pub mod view;
62
63use std::fmt::Debug;
64
65use mz_adapter_types::connection::ConnectionId;
66use mz_adapter_types::dyncfgs::PERSIST_FAST_PATH_ORDER;
67use mz_catalog::memory::objects::{CatalogCollectionEntry, CatalogEntry, Index};
68use mz_compute_types::dataflows::DataflowDescription;
69use mz_compute_types::plan::Plan;
70use mz_controller_types::ClusterId;
71use mz_expr::{EvalError, MirRelationExpr, OptimizedMirRelationExpr, UnmaterializableFunc};
72use mz_ore::stack::RecursionLimitError;
73use mz_repr::adt::timestamp::TimestampError;
74use mz_repr::optimize::{OptimizerFeatureOverrides, OptimizerFeatures, OverrideFrom};
75use mz_repr::{CatalogItemId, GlobalId};
76use mz_sql::names::{FullItemName, QualifiedItemName};
77use mz_sql::plan::PlanError;
78use mz_sql::session::vars::SystemVars;
79use mz_transform::{MaybeShouldPanic, TransformCtx, TransformError};
80
81// Alias types
82// -----------
83
84/// A type for a [`DataflowDescription`] backed by `Mir~` plans. Used internally
85/// by the optimizer implementations.
86type MirDataflowDescription = DataflowDescription<OptimizedMirRelationExpr>;
87/// A type for a [`DataflowDescription`] backed by `Lir~` plans. Used internally
88/// by the optimizer implementations.
89type LirDataflowDescription = DataflowDescription<Plan>;
90
91// Core API
92// --------
93
94/// A trait that represents an optimization stage.
95///
96/// The trait is implemented by structs that encapsulate the context needed to
97/// run an end-to-end optimization pipeline for a specific statement type
98/// (`Index`, `View`, `MaterializedView`, `Subscribe`, `Select`).
99///
100/// Each implementation represents a concrete optimization stage for a fixed
101/// statement type that consumes an input of type `From` and produces output of
102/// type `Self::To`.
103///
104/// The generic lifetime `'ctx` models the lifetime of the optimizer context and
105/// can be passed to the optimizer struct and the `Self::To` types.
106///
107/// The `'s: 'ctx` bound in the `optimize` method call ensures that an optimizer
108/// instance can run an optimization stage that produces a `Self::To` with
109/// `&'ctx` references.
110pub trait Optimize<From>: Send
111where
112 From: Send,
113{
114 type To: Send;
115
116 /// Execute the optimization stage, transforming the input plan of type
117 /// `From` to an output plan of type `To`.
118 fn optimize(&mut self, plan: From) -> Result<Self::To, OptimizerError>;
119
120 /// Like [`Self::optimize`], but additionally ensures that panics occurring
121 /// in the [`Self::optimize`] call are caught and demoted to an
122 /// [`OptimizerError::Internal`] error.
123 ///
124 /// Additionally, if the result of the optimization is an error (not a panic) that indicates we
125 /// should panic, then panic.
126 #[mz_ore::instrument(target = "optimizer", level = "debug", name = "optimize")]
127 fn catch_unwind_optimize(&mut self, plan: From) -> Result<Self::To, OptimizerError> {
128 mz_transform::catch_unwind_optimize(|| self.optimize(plan))
129 }
130
131 /// Execute the optimization stage and panic if an error occurs.
132 ///
133 /// See [`Optimize::optimize`].
134 #[allow(dead_code)] // This function is never used, but it's useful to keep around.
135 fn must_optimize(&mut self, expr: From) -> Self::To {
136 match self.optimize(expr) {
137 Ok(ok) => ok,
138 Err(err) => panic!("must_optimize call failed: {err}"),
139 }
140 }
141}
142
143// Optimizer configuration
144// -----------------------
145
146/// Feature flags for the optimizer.
147///
148/// To add a new feature flag, do the following steps:
149///
150/// 1. To make the flag available to all stages in our [`Optimize`] pipelines
151/// and allow engineers to set a system-wide override:
152/// 1. Add the flag to the `optimizer_feature_flags!(...)` macro call.
153/// 2. Add the flag to the `feature_flags!(...)` macro call and extend the
154/// `From<&SystemVars>` implementation for [`OptimizerFeatures`].
155///
156/// 2. To enable `EXPLAIN ... WITH(...)` overrides which will allow engineers to
157/// inspect plan differences before deploying the optimizer changes:
158/// 1. Add the flag to the `ExplainPlanOptionName` definition.
159/// 2. Add the flag to the `generate_extracted_config!(ExplainPlanOption,
160/// ...)` macro call.
161/// 3. Extend the `TryFrom<ExplainPlanOptionExtracted>` implementation for
162/// [`mz_repr::explain::ExplainConfig`].
163///
164/// 3. To enable `CLUSTER ... FEATURES(...)` overrides which will allow
165/// engineers to experiment with runtime differences before deploying the
166/// optimizer changes:
167/// 1. Add the flag to the `ClusterFeatureName` definition.
168/// 2. Add the flag to the `generate_extracted_config!(ClusterFeature, ...)`
169/// macro call.
170/// 3. Extend the `let optimizer_feature_overrides = ...` call in
171/// `plan_create_cluster`.
172#[derive(Clone, Debug)]
173pub struct OptimizerConfig {
174 /// The mode in which the optimizer runs.
175 pub mode: OptimizeMode,
176 /// If the [`GlobalId`] is set the optimizer works in "replan" mode.
177 ///
178 /// This means that it will not consider catalog items (more specifically
179 /// indexes) with [`GlobalId`] greater or equal than the one provided here.
180 pub replan: Option<GlobalId>,
181 /// Show the slow path plan even if a fast path plan was created. Useful for debugging.
182 /// Enforced if `timing` is set.
183 pub no_fast_path: bool,
184 // If set, allow some additional queries down the Persist fast path when we believe
185 // the orderings are compatible.
186 persist_fast_path_order: bool,
187 /// Optimizer feature flags.
188 pub features: OptimizerFeatures,
189}
190
191#[derive(Clone, Debug, PartialEq, Eq)]
192pub enum OptimizeMode {
193 /// A mode where the optimized statement is executed.
194 Execute,
195 /// A mode where the optimized statement is explained.
196 Explain,
197}
198
199impl From<&SystemVars> for OptimizerConfig {
200 fn from(vars: &SystemVars) -> Self {
201 Self {
202 mode: OptimizeMode::Execute,
203 replan: None,
204 no_fast_path: false,
205 persist_fast_path_order: PERSIST_FAST_PATH_ORDER.get(vars.dyncfgs()),
206 features: OptimizerFeatures::from(vars),
207 }
208 }
209}
210
211/// Override [`OptimizerConfig::features`] from [`OptimizerFeatureOverrides`].
212impl OverrideFrom<OptimizerFeatureOverrides> for OptimizerConfig {
213 fn override_from(mut self, overrides: &OptimizerFeatureOverrides) -> Self {
214 self.features = self.features.override_from(overrides);
215 self
216 }
217}
218
219/// [`OptimizerConfig`] overrides coming from an [`ExplainContext`].
220impl OverrideFrom<ExplainContext> for OptimizerConfig {
221 fn override_from(mut self, ctx: &ExplainContext) -> Self {
222 let ExplainContext::Plan(ctx) = ctx else {
223 return self; // Return immediately for all other contexts.
224 };
225
226 // Override general parameters.
227 self.mode = OptimizeMode::Explain;
228 self.replan = ctx.replan;
229 self.no_fast_path = ctx.config.no_fast_path;
230
231 // Override feature flags that can be enabled in the EXPLAIN config.
232 self.features = self.features.override_from(&ctx.config.features);
233
234 // Return the final result.
235 self
236 }
237}
238
239impl From<&OptimizerConfig> for mz_sql::plan::HirToMirConfig {
240 fn from(config: &OptimizerConfig) -> Self {
241 Self {
242 enable_new_outer_join_lowering: config.features.enable_new_outer_join_lowering,
243 enable_variadic_left_join_lowering: config.features.enable_variadic_left_join_lowering,
244 enable_guard_subquery_tablefunc: config.features.enable_guard_subquery_tablefunc,
245 }
246 }
247}
248
249// OptimizerCatalog
250// ===============
251
252pub trait OptimizerCatalog: Debug + Send + Sync {
253 fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry;
254 fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry;
255 fn resolve_full_name(
256 &self,
257 name: &QualifiedItemName,
258 conn_id: Option<&ConnectionId>,
259 ) -> FullItemName;
260
261 /// Returns all indexes on the given object and cluster known in the
262 /// catalog.
263 fn get_indexes_on(
264 &self,
265 id: GlobalId,
266 cluster: ClusterId,
267 ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_>;
268}
269
270// OptimizerError
271// ===============
272
273/// Error types that can be generated during optimization.
274#[derive(Debug, thiserror::Error)]
275pub enum OptimizerError {
276 #[error("{0}")]
277 PlanError(#[from] PlanError),
278 #[error("{0}")]
279 RecursionLimitError(#[from] RecursionLimitError),
280 #[error("{0}")]
281 TransformError(#[from] TransformError),
282 #[error("{0}")]
283 EvalError(#[from] EvalError),
284 #[error("cannot materialize call to {0}")]
285 UnmaterializableFunction(UnmaterializableFunc),
286 #[error("cannot call {func} in {context} ")]
287 UncallableFunction {
288 func: UnmaterializableFunc,
289 context: &'static str,
290 },
291 #[error("{0}")]
292 UnsupportedTemporalExpression(String),
293 /// This is a specific kind of internal error. It's distinct from `Internal`, because we want to
294 /// catch it and swallow it in some cases.
295 #[error("internal optimizer error: MfpPlan couldn't be converted into SafeMfpPlan")]
296 InternalUnsafeMfpPlan(String),
297 #[error("internal optimizer error: {0}")]
298 Internal(String),
299}
300
301impl From<String> for OptimizerError {
302 fn from(msg: String) -> Self {
303 Self::Internal(msg)
304 }
305}
306
307impl OptimizerError {
308 pub fn detail(&self) -> Option<String> {
309 match self {
310 Self::UnmaterializableFunction(UnmaterializableFunc::CurrentTimestamp) => {
311 Some("See: https://materialize.com/docs/sql/functions/now_and_mz_now/".into())
312 }
313 _ => None,
314 }
315 }
316
317 pub fn hint(&self) -> Option<String> {
318 match self {
319 Self::UnmaterializableFunction(UnmaterializableFunc::CurrentTimestamp) => {
320 Some("In temporal filters `mz_now()` may work instead.".into())
321 }
322 _ => None,
323 }
324 }
325}
326
327impl From<TimestampError> for OptimizerError {
328 fn from(value: TimestampError) -> Self {
329 OptimizerError::EvalError(EvalError::from(value))
330 }
331}
332
333impl From<anyhow::Error> for OptimizerError {
334 fn from(value: anyhow::Error) -> Self {
335 OptimizerError::Internal(value.to_string())
336 }
337}
338
339impl MaybeShouldPanic for OptimizerError {
340 fn should_panic(&self) -> Option<String> {
341 match self {
342 OptimizerError::TransformError(TransformError::CallerShouldPanic(msg)) => {
343 Some(msg.to_string())
344 }
345 _ => None,
346 }
347 }
348}
349
350// Tracing helpers
351// ---------------
352
353#[mz_ore::instrument(target = "optimizer", level = "debug", name = "local")]
354fn optimize_mir_local(
355 expr: MirRelationExpr,
356 ctx: &mut TransformCtx,
357) -> Result<OptimizedMirRelationExpr, OptimizerError> {
358 #[allow(deprecated)]
359 let optimizer = mz_transform::Optimizer::logical_optimizer(ctx);
360 let expr = optimizer.optimize(expr, ctx)?;
361
362 // Trace the result of this phase.
363 mz_repr::explain::trace_plan(expr.as_inner());
364
365 Ok::<_, OptimizerError>(expr)
366}
367
368/// This is just a wrapper around [mz_transform::Optimizer::constant_optimizer],
369/// running it, and tracing the result plan.
370#[mz_ore::instrument(target = "optimizer", level = "debug", name = "constant")]
371fn optimize_mir_constant(
372 expr: MirRelationExpr,
373 ctx: &mut TransformCtx,
374) -> Result<MirRelationExpr, OptimizerError> {
375 let optimizer = mz_transform::Optimizer::constant_optimizer(ctx);
376 let expr = optimizer.optimize(expr, ctx)?;
377
378 // Trace the result of this phase.
379 mz_repr::explain::trace_plan(expr.as_inner());
380
381 Ok::<_, OptimizerError>(expr.0)
382}
383
384macro_rules! trace_plan {
385 (at: $span:literal, $plan:expr) => {
386 tracing::debug_span!(target: "optimizer", $span).in_scope(|| {
387 mz_repr::explain::trace_plan($plan);
388 });
389 }
390}
391
392use trace_plan;
393
394use crate::coord::ExplainContext;