mz_adapter/optimize.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Optimizer interface to the adapter and coordinator code.
11//!
12//! The goal of this crate is to abstract optimizer specifics behind a
13//! high-level interface that is ready to be consumed by the coordinator code in
14//! a future-proof way (that is, the API is taking the upcoming evolution of
15//! these components into account).
16//!
17//! The contents of this crate should have minimal dependencies to the rest of
18//! the coordinator code so we can pull them out as a separate crate in the
19//! future without too much effort.
20//!
21//! The main type in this module is a very simple [`Optimize`] trait which
22//! allows us to adhere to the following principles:
23//!
24//! - Implementors of this trait are structs that encapsulate all context
25//! required to optimize a statement of type `T` end-to-end (for example
26//! [`materialized_view::Optimizer`] for `T` = `MaterializedView`).
27//! - Each struct implements [`Optimize`] once for each optimization stage. The
28//! `From` type represents the input of the stage and `Self::To` the
29//! associated stage output. This allows to have more than one entrypoints to
30//! a pipeline.
31//! - The concrete types used for stage results are opaque structs that are
32//! specific to the pipeline of that statement type.
33//! - We use different structs even if two statement types might have
34//! structurally identical intermediate results. This ensures that client
35//! code cannot first execute some optimization stages for one type and then
36//! some stages for a different type.
37//! - The only way to construct such a struct is by running the [`Optimize`]
38//! stage that produces it. This ensures that client code cannot interfere
39//! with the pipeline.
40//! - In general, the internals of these structs can be accessed only behind a
41//! shared reference. This ensures that client code can look up information
42//! from intermediate stages but cannot modify it.
43//! - Timestamp selection is modeled as a conversion between structs that are
44//! adjacent in the pipeline using a method called `resolve`.
45//! - The struct representing the result of the final stage of the
46//! optimization pipeline can be destructed to access its internals with a
47//! method called `unapply`.
48//! - The `Send` trait bounds on the `Self` and `From` types ensure that
49//! [`Optimize`] instances can be passed to different threads (this is
50//! required of off-thread optimization).
51//!
52//! For details, see the `20230714_optimizer_interface.md` design doc in this
53//! repository.
54
55pub mod copy_to;
56pub mod dataflows;
57pub mod index;
58pub mod materialized_view;
59pub mod peek;
60pub mod subscribe;
61pub mod view;
62
63use std::fmt::Debug;
64
65use mz_adapter_types::connection::ConnectionId;
66use mz_adapter_types::dyncfgs::PERSIST_FAST_PATH_ORDER;
67use mz_catalog::memory::objects::{CatalogCollectionEntry, CatalogEntry, Index};
68use mz_compute_types::ComputeInstanceId;
69use mz_compute_types::dataflows::DataflowDescription;
70use mz_compute_types::dyncfgs::SUBSCRIBE_SNAPSHOT_OPTIMIZATION;
71use mz_compute_types::plan::Plan;
72use mz_controller_types::ClusterId;
73use mz_expr::{EvalError, MirRelationExpr, OptimizedMirRelationExpr, UnmaterializableFunc};
74use mz_ore::stack::RecursionLimitError;
75use mz_repr::adt::timestamp::TimestampError;
76use mz_repr::optimize::{OptimizerFeatureOverrides, OptimizerFeatures, OverrideFrom};
77use mz_repr::{CatalogItemId, GlobalId};
78use mz_sql::names::{FullItemName, QualifiedItemName};
79use mz_sql::plan::{HirRelationExpr, PlanError};
80use mz_sql::session::metadata::SessionMetadata;
81use mz_sql::session::vars::SystemVars;
82use mz_transform::{MaybeShouldPanic, StatisticsOracle, TransformCtx, TransformError};
83
84use crate::TimestampContext;
85
86// Alias types
87// -----------
88
89/// A type for a [`DataflowDescription`] backed by `Mir~` plans. Used internally
90/// by the optimizer implementations.
91type MirDataflowDescription = DataflowDescription<OptimizedMirRelationExpr>;
92/// A type for a [`DataflowDescription`] backed by `Lir~` plans. Used internally
93/// by the optimizer implementations.
94type LirDataflowDescription = DataflowDescription<Plan>;
95
96// Core API
97// --------
98
99/// A trait that represents an optimization stage.
100///
101/// The trait is implemented by structs that encapsulate the context needed to
102/// run an end-to-end optimization pipeline for a specific statement type
103/// (`Index`, `View`, `MaterializedView`, `Subscribe`, `Select`).
104///
105/// Each implementation represents a concrete optimization stage for a fixed
106/// statement type that consumes an input of type `From` and produces output of
107/// type `Self::To`.
108pub trait Optimize<From> {
109 type To;
110
111 /// Execute the optimization stage, transforming the input plan of type
112 /// `From` to an output plan of type `To`.
113 fn optimize(&mut self, plan: From) -> Result<Self::To, OptimizerError>;
114
115 /// Like [`Self::optimize`], but additionally ensures that panics occurring
116 /// in the [`Self::optimize`] call are caught and demoted to an
117 /// [`OptimizerError::Internal`] error.
118 ///
119 /// Additionally, if the result of the optimization is an error (not a panic) that indicates we
120 /// should panic, then panic.
121 #[mz_ore::instrument(target = "optimizer", level = "debug", name = "optimize")]
122 fn catch_unwind_optimize(&mut self, plan: From) -> Result<Self::To, OptimizerError> {
123 mz_transform::catch_unwind_optimize(|| self.optimize(plan))
124 }
125}
126
127// One-shot peek optimizer dispatch
128// --------------------------------
129
130/// The optimizer driving a one-shot statement through the peek sequencing state
131/// machine.
132///
133/// `SELECT` and `EXPLAIN` are optimized by the [`peek::Optimizer`], while `COPY
134/// TO` is optimized by the [`copy_to::Optimizer`]. Both share the same
135/// surrounding state machine (timestamp selection, read holds, off-thread
136/// optimization, …), so this enum lets that shared machinery carry either
137/// optimizer without caring which one it is. The variants are kept distinct
138/// (rather than abstracted behind a trait object) because the downstream stages
139/// need to recover the concrete optimizer and its statement-specific result.
140#[derive(Debug)]
141pub enum PeekOptimizer {
142 /// Optimizer for `SELECT` and `EXPLAIN` statements.
143 Select(peek::Optimizer),
144 /// Optimizer for `COPY TO` statements.
145 CopyTo(copy_to::Optimizer),
146}
147
148/// The global LIR plan produced by [`PeekOptimizer::optimize`], tagged with the
149/// path that produced it.
150#[derive(Debug)]
151pub enum PeekGlobalLirPlan {
152 /// The result of the `SELECT`/`EXPLAIN` pipeline.
153 Select(peek::GlobalLirPlan),
154 /// The result of the `COPY TO` pipeline.
155 CopyTo(copy_to::GlobalLirPlan),
156}
157
158impl PeekOptimizer {
159 /// The cluster that will run the optimized dataflow.
160 pub fn cluster_id(&self) -> ComputeInstanceId {
161 match self {
162 PeekOptimizer::Select(optimizer) => optimizer.cluster_id(),
163 PeekOptimizer::CopyTo(optimizer) => optimizer.cluster_id(),
164 }
165 }
166
167 /// Runs the full one-shot optimization pipeline end-to-end:
168 ///
169 /// 1. HIR ⇒ MIR lowering and local MIR optimization,
170 /// 2. timestamp resolution,
171 /// 3. global MIR optimization, MIR ⇒ LIR lowering, and global LIR
172 /// optimization.
173 ///
174 /// The pipeline shape is identical for both variants; only the concrete
175 /// (statement-specific) plan types differ, so the steps are shared via
176 /// [`optimize_oneshot`].
177 pub fn optimize(
178 &mut self,
179 raw_expr: HirRelationExpr,
180 timestamp_ctx: TimestampContext,
181 session: &dyn SessionMetadata,
182 stats: Box<dyn StatisticsOracle>,
183 ) -> Result<PeekGlobalLirPlan, OptimizerError> {
184 match self {
185 PeekOptimizer::Select(optimizer) => {
186 let plan = optimize_oneshot(optimizer, raw_expr, |local_mir_plan| {
187 local_mir_plan.resolve(timestamp_ctx, session, stats)
188 })?;
189 Ok(PeekGlobalLirPlan::Select(plan))
190 }
191 PeekOptimizer::CopyTo(optimizer) => {
192 let plan = optimize_oneshot(optimizer, raw_expr, |local_mir_plan| {
193 local_mir_plan.resolve(timestamp_ctx, session, stats)
194 })?;
195 Ok(PeekGlobalLirPlan::CopyTo(plan))
196 }
197 }
198 }
199
200 /// Consumes `self`, returning the inner [`peek::Optimizer`] if this is the
201 /// `SELECT`/`EXPLAIN` path and `None` otherwise.
202 pub fn into_select(self) -> Option<peek::Optimizer> {
203 match self {
204 PeekOptimizer::Select(optimizer) => Some(optimizer),
205 PeekOptimizer::CopyTo(_) => None,
206 }
207 }
208
209 /// Consumes `self`, returning the inner [`copy_to::Optimizer`] if this is
210 /// the `COPY TO` path and `None` otherwise.
211 pub fn into_copy_to(self) -> Option<copy_to::Optimizer> {
212 match self {
213 PeekOptimizer::CopyTo(optimizer) => Some(optimizer),
214 PeekOptimizer::Select(_) => None,
215 }
216 }
217}
218
219/// Runs the shared one-shot optimization pipeline for a single optimizer.
220///
221/// This factors out the (otherwise duplicated) HIR ⇒ local MIR ⇒ resolve ⇒
222/// global LIR sequence that is common to the `SELECT`/`EXPLAIN` and `COPY TO`
223/// paths. The `resolve` closure attaches the timestamp/session/stats context to
224/// the local plan; it is path-specific only in the concrete plan type it
225/// operates on.
226fn optimize_oneshot<O, LocalPlan, ResolvedPlan, GlobalPlan>(
227 optimizer: &mut O,
228 raw_expr: HirRelationExpr,
229 resolve: impl FnOnce(LocalPlan) -> ResolvedPlan,
230) -> Result<GlobalPlan, OptimizerError>
231where
232 O: Optimize<HirRelationExpr, To = LocalPlan> + Optimize<ResolvedPlan, To = GlobalPlan>,
233{
234 // HIR ⇒ MIR lowering and MIR optimization (local).
235 let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr)?;
236 // Attach resolved context required to continue the pipeline.
237 let resolved_mir_plan = resolve(local_mir_plan);
238 // MIR optimization (global), MIR ⇒ LIR lowering, and LIR optimization (global).
239 optimizer.catch_unwind_optimize(resolved_mir_plan)
240}
241
242// Optimizer configuration
243// -----------------------
244
245/// Feature flags for the optimizer.
246///
247/// To add a new feature flag, do the following steps:
248///
249/// 1. To make the flag available to all stages in our [`Optimize`] pipelines
250/// and allow engineers to set a system-wide override:
251/// 1. Add the flag to the `optimizer_feature_flags!(...)` macro call.
252/// 2. Add the flag to the `feature_flags!(...)` macro call and extend the
253/// `From<&SystemVars>` implementation for [`OptimizerFeatures`].
254///
255/// 2. To enable `EXPLAIN ... WITH(...)` overrides which will allow engineers to
256/// inspect plan differences before deploying the optimizer changes:
257/// 1. Add the flag to the `ExplainPlanOptionName` definition.
258/// 2. Add the flag to the `generate_extracted_config!(ExplainPlanOption,
259/// ...)` macro call.
260/// 3. Extend the `TryFrom<ExplainPlanOptionExtracted>` implementation for
261/// [`mz_repr::explain::ExplainConfig`].
262///
263/// 3. To enable `CLUSTER ... FEATURES(...)` overrides which will allow
264/// engineers to experiment with runtime differences before deploying the
265/// optimizer changes:
266/// 1. Add the flag to the `ClusterFeatureName` definition.
267/// 2. Add the flag to the `generate_extracted_config!(ClusterFeature, ...)`
268/// macro call.
269/// 3. Extend the `let optimizer_feature_overrides = ...` call in
270/// `plan_create_cluster`.
271#[derive(Clone, Debug)]
272pub struct OptimizerConfig {
273 /// The mode in which the optimizer runs.
274 pub mode: OptimizeMode,
275 /// If the [`GlobalId`] is set the optimizer works in "replan" mode.
276 ///
277 /// This means that it will not consider catalog items (more specifically
278 /// indexes) with [`GlobalId`] greater or equal than the one provided here.
279 pub replan: Option<GlobalId>,
280 /// Show the slow path plan even if a fast path plan was created. Useful for debugging.
281 /// Enforced if `timing` is set.
282 pub no_fast_path: bool,
283 // If set, allow some additional queries down the Persist fast path when we believe
284 // the orderings are compatible.
285 persist_fast_path_order: bool,
286 // Enable calculating with_snapshot metadata for subscribes.
287 subscribe_snapshot_optimization: bool,
288 /// Optimizer feature flags.
289 pub features: OptimizerFeatures,
290}
291
292#[derive(Clone, Debug, PartialEq, Eq)]
293pub enum OptimizeMode {
294 /// A mode where the optimized statement is executed.
295 Execute,
296 /// A mode where the optimized statement is explained.
297 Explain,
298}
299
300impl From<&SystemVars> for OptimizerConfig {
301 fn from(vars: &SystemVars) -> Self {
302 Self {
303 mode: OptimizeMode::Execute,
304 replan: None,
305 no_fast_path: false,
306 persist_fast_path_order: PERSIST_FAST_PATH_ORDER.get(vars.dyncfgs()),
307 subscribe_snapshot_optimization: SUBSCRIBE_SNAPSHOT_OPTIMIZATION.get(vars.dyncfgs()),
308 features: OptimizerFeatures::from(vars),
309 }
310 }
311}
312
313/// Override [`OptimizerConfig::features`] from [`OptimizerFeatureOverrides`].
314impl OverrideFrom<OptimizerFeatureOverrides> for OptimizerConfig {
315 fn override_from(mut self, overrides: &OptimizerFeatureOverrides) -> Self {
316 self.features = self.features.override_from(overrides);
317 self
318 }
319}
320
321/// [`OptimizerConfig`] overrides coming from an [`ExplainContext`].
322impl OverrideFrom<ExplainContext> for OptimizerConfig {
323 fn override_from(mut self, ctx: &ExplainContext) -> Self {
324 let ExplainContext::Plan(ctx) = ctx else {
325 return self; // Return immediately for all other contexts.
326 };
327
328 // Override general parameters.
329 self.mode = OptimizeMode::Explain;
330 self.replan = ctx.replan;
331 self.no_fast_path = ctx.config.no_fast_path;
332
333 // Override feature flags that can be enabled in the EXPLAIN config.
334 self.features = self.features.override_from(&ctx.config.features);
335
336 // Return the final result.
337 self
338 }
339}
340
341impl From<&OptimizerConfig> for mz_sql::plan::HirToMirConfig {
342 fn from(config: &OptimizerConfig) -> Self {
343 Self {
344 enable_new_outer_join_lowering: config.features.enable_new_outer_join_lowering,
345 enable_variadic_left_join_lowering: config.features.enable_variadic_left_join_lowering,
346 enable_cast_elimination: config.features.enable_cast_elimination,
347 enable_simplify_quantified_comparisons: config
348 .features
349 .enable_simplify_quantified_comparisons,
350 }
351 }
352}
353
354// OptimizerCatalog
355// ===============
356
357pub trait OptimizerCatalog: Debug + Send + Sync {
358 fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry;
359 fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry;
360 fn resolve_full_name(
361 &self,
362 name: &QualifiedItemName,
363 conn_id: Option<&ConnectionId>,
364 ) -> FullItemName;
365
366 /// Returns all indexes on the given object and cluster known in the
367 /// catalog.
368 fn get_indexes_on(
369 &self,
370 id: GlobalId,
371 cluster: ClusterId,
372 ) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_>;
373}
374
375// OptimizerError
376// ===============
377
378/// Error types that can be generated during optimization.
379#[derive(Debug, thiserror::Error)]
380pub enum OptimizerError {
381 #[error("{0}")]
382 PlanError(#[from] PlanError),
383 #[error("{0}")]
384 RecursionLimitError(#[from] RecursionLimitError),
385 #[error("{0}")]
386 TransformError(#[from] TransformError),
387 #[error("{0}")]
388 EvalError(#[from] EvalError),
389 #[error("cannot materialize call to {0}")]
390 UnmaterializableFunction(UnmaterializableFunc),
391 #[error("cannot call {func} in {context} ")]
392 UncallableFunction {
393 func: UnmaterializableFunc,
394 context: &'static str,
395 },
396 #[error("access to function {0} is restricted")]
397 RestrictedFunction(UnmaterializableFunc),
398 #[error("{0}")]
399 UnsupportedTemporalExpression(String),
400 /// This is a specific kind of internal error. It's distinct from `Internal`, because we want to
401 /// catch it and swallow it in some cases.
402 #[error("internal optimizer error: MfpPlan couldn't be converted into SafeMfpPlan")]
403 InternalUnsafeMfpPlan(String),
404 #[error("internal optimizer error: {0}")]
405 Internal(String),
406}
407
408impl From<String> for OptimizerError {
409 fn from(msg: String) -> Self {
410 Self::Internal(msg)
411 }
412}
413
414impl OptimizerError {
415 pub fn detail(&self) -> Option<String> {
416 match self {
417 Self::UnmaterializableFunction(UnmaterializableFunc::CurrentTimestamp) => {
418 Some("See: https://materialize.com/docs/sql/functions/now_and_mz_now/".into())
419 }
420 Self::RestrictedFunction(_) => Some(
421 "Access to system catalog objects is restricted for this role. \
422 Contact your administrator if you need access."
423 .into(),
424 ),
425 _ => None,
426 }
427 }
428
429 pub fn hint(&self) -> Option<String> {
430 match self {
431 Self::UnmaterializableFunction(UnmaterializableFunc::CurrentTimestamp) => {
432 Some("In temporal filters `mz_now()` may work instead.".into())
433 }
434 _ => None,
435 }
436 }
437}
438
439impl From<TimestampError> for OptimizerError {
440 fn from(value: TimestampError) -> Self {
441 OptimizerError::EvalError(EvalError::from(value))
442 }
443}
444
445impl From<anyhow::Error> for OptimizerError {
446 fn from(value: anyhow::Error) -> Self {
447 OptimizerError::Internal(value.to_string())
448 }
449}
450
451impl MaybeShouldPanic for OptimizerError {
452 fn should_panic(&self) -> Option<String> {
453 match self {
454 OptimizerError::TransformError(TransformError::CallerShouldPanic(msg)) => {
455 Some(msg.to_string())
456 }
457 _ => None,
458 }
459 }
460}
461
462// Tracing helpers
463// ---------------
464
465#[mz_ore::instrument(target = "optimizer", level = "debug", name = "local")]
466fn optimize_mir_local(
467 expr: MirRelationExpr,
468 ctx: &mut TransformCtx,
469) -> Result<OptimizedMirRelationExpr, OptimizerError> {
470 #[allow(deprecated)]
471 let optimizer = mz_transform::Optimizer::logical_optimizer(ctx);
472 let expr = optimizer.optimize(expr, ctx)?;
473
474 // Trace the result of this phase.
475 mz_repr::explain::trace_plan(expr.as_inner());
476
477 Ok::<_, OptimizerError>(expr)
478}
479
480/// This is just a wrapper around [mz_transform::Optimizer::constant_optimizer],
481/// running it, and tracing the result plan.
482#[mz_ore::instrument(target = "optimizer", level = "debug", name = "constant")]
483fn optimize_mir_constant(
484 expr: MirRelationExpr,
485 ctx: &mut TransformCtx,
486 limit: bool,
487) -> Result<MirRelationExpr, OptimizerError> {
488 let optimizer = mz_transform::Optimizer::constant_optimizer(ctx, limit);
489 let expr = optimizer.optimize(expr, ctx)?;
490
491 // Trace the result of this phase.
492 mz_repr::explain::trace_plan(expr.as_inner());
493
494 Ok::<_, OptimizerError>(expr.0)
495}
496
497macro_rules! trace_plan {
498 (at: $span:literal, $plan:expr) => {
499 tracing::debug_span!(target: "optimizer", $span).in_scope(|| {
500 mz_repr::explain::trace_plan($plan);
501 });
502 }
503}
504
505use trace_plan;
506
507use crate::coord::ExplainContext;