1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
//! Optimizer interface to the adapter and coordinator code.
//!
//! The goal of this crate is to abstract optimizer specifics behind a
//! high-level interface that is ready to be consumed by the coordinator code in
//! a future-proof way (that is, the API is taking the upcoming evolution of
//! these components into account).
//!
//! The contents of this crate should have minimal dependencies to the rest of
//! the coordinator code so we can pull them out as a separate crate in the
//! future without too much effort.
//!
//! The main type in this module is a very simple [`Optimize`] trait which
//! allows us to adhere to the following principles:
//!
//! - Implementors of this trait are structs that encapsulate all context
//! required to optimize a statement of type `T` end-to-end (for example
//! [`materialized_view::Optimizer`] for `T` = `MaterializedView`).
//! - Each struct implements [`Optimize`] once for each optimization stage. The
//! `From` type represents the input of the stage and `Self::To` the
//! associated stage output. This allows to have more than one entrypoints to
//! a pipeline.
//! - The concrete types used for stage results are opaque structs that are
//! specific to the pipeline of that statement type.
//! - We use different structs even if two statement types might have
//! structurally identical intermediate results. This ensures that client
//! code cannot first execute some optimization stages for one type and then
//! some stages for a different type.
//! - The only way to construct such a struct is by running the [`Optimize`]
//! stage that produces it. This ensures that client code cannot interfere
//! with the pipeline.
//! - In general, the internals of these structs can be accessed only behind a
//! shared reference. This ensures that client code can look up information
//! from intermediate stages but cannot modify it.
//! - Timestamp selection is modeled as a conversion between structs that are
//! adjacent in the pipeline using a method called `resolve`.
//! - The struct representing the result of the final stage of the
//! optimization pipeline can be destructed to access its internals with a
//! method called `unapply`.
//! - The `Send` trait bounds on the `Self` and `From` types ensure that
//! [`Optimize`] instances can be passed to different threads (this is
//! required of off-thread optimization).
//!
//! For details, see the `20230714_optimizer_interface.md` design doc in this
//! repository.
pub mod copy_to;
pub mod dataflows;
pub mod index;
pub mod materialized_view;
pub mod peek;
pub mod subscribe;
pub mod view;
use std::fmt::Debug;
use std::panic::AssertUnwindSafe;
use mz_adapter_types::connection::ConnectionId;
use mz_catalog::memory::objects::{CatalogEntry, Index};
use mz_compute_types::dataflows::DataflowDescription;
use mz_compute_types::plan::Plan;
use mz_controller_types::ClusterId;
use mz_expr::{EvalError, MirRelationExpr, OptimizedMirRelationExpr, UnmaterializableFunc};
use mz_ore::stack::RecursionLimitError;
use mz_repr::adt::timestamp::TimestampError;
use mz_repr::optimize::{OptimizerFeatureOverrides, OptimizerFeatures, OverrideFrom};
use mz_repr::{CatalogItemId, GlobalId};
use mz_sql::names::{FullItemName, QualifiedItemName};
use mz_sql::plan::PlanError;
use mz_sql::session::vars::SystemVars;
use mz_transform::{TransformCtx, TransformError};
// Alias types
// -----------
/// A type for a [`DataflowDescription`] backed by `Mir~` plans. Used internally
/// by the optimizer implementations.
type MirDataflowDescription = DataflowDescription<OptimizedMirRelationExpr>;
/// A type for a [`DataflowDescription`] backed by `Lir~` plans. Used internally
/// by the optimizer implementations.
type LirDataflowDescription = DataflowDescription<Plan>;
// Core API
// --------
/// A trait that represents an optimization stage.
///
/// The trait is implemented by structs that encapsulate the context needed to
/// run an end-to-end optimization pipeline for a specific statement type
/// (`Index`, `View`, `MaterializedView`, `Subscribe`, `Select`).
///
/// Each implementation represents a concrete optimization stage for a fixed
/// statement type that consumes an input of type `From` and produces output of
/// type `Self::To`.
///
/// The generic lifetime `'ctx` models the lifetime of the optimizer context and
/// can be passed to the optimizer struct and the `Self::To` types.
///
/// The `'s: 'ctx` bound in the `optimize` method call ensures that an optimizer
/// instance can run an optimization stage that produces a `Self::To` with
/// `&'ctx` references.
pub trait Optimize<From>: Send
where
From: Send,
{
type To: Send;
/// Execute the optimization stage, transforming the input plan of type
/// `From` to an output plan of type `To`.
fn optimize(&mut self, plan: From) -> Result<Self::To, OptimizerError>;
/// Like [`Self::optimize`], but additionally ensures that panics occurring
/// in the [`Self::optimize`] call are caught and demoted to an
/// [`OptimizerError::Internal`] error.
#[mz_ore::instrument(target = "optimizer", level = "debug", name = "optimize")]
fn catch_unwind_optimize(&mut self, plan: From) -> Result<Self::To, OptimizerError> {
match mz_ore::panic::catch_unwind_str(AssertUnwindSafe(|| self.optimize(plan))) {
Ok(result) => {
match result.map_err(Into::into) {
Err(OptimizerError::TransformError(TransformError::CallerShouldPanic(msg))) => {
// Promote a `CallerShouldPanic` error from the result
// to a proper panic. This is needed in order to ensure
// that `mz_unsafe.mz_panic('forced panic')` calls still
// panic the caller.
panic!("{}", msg)
}
result => result,
}
}
Err(panic) => {
let msg = format!("unexpected panic during query optimization: {panic}");
Err(OptimizerError::Internal(msg))
}
}
}
/// Execute the optimization stage and panic if an error occurs.
///
/// See [`Optimize::optimize`].
#[allow(dead_code)] // This function is never used, but it's useful to keep around.
fn must_optimize(&mut self, expr: From) -> Self::To {
match self.optimize(expr) {
Ok(ok) => ok,
Err(err) => panic!("must_optimize call failed: {err}"),
}
}
}
// Optimizer configuration
// -----------------------
/// Feature flags for the optimizer.
///
/// To add a new feature flag, do the following steps:
///
/// 1. To make the flag available to all stages in our [`Optimize`] pipelines
/// and allow engineers to set a system-wide override:
/// 1. Add the flag to the `optimizer_feature_flags!(...)` macro call.
/// 2. Add the flag to the `feature_flags!(...)` macro call and extend the
/// `From<&SystemVars>` implementation for [`OptimizerFeatures`].
///
/// 2. To enable `EXPLAIN ... WITH(...)` overrides which will allow engineers to
/// inspect plan differences before deploying the optimizer changes:
/// 1. Add the flag to the `ExplainPlanOptionName` definition.
/// 2. Add the flag to the `generate_extracted_config!(ExplainPlanOption,
/// ...)` macro call.
/// 3. Extend the `TryFrom<ExplainPlanOptionExtracted>` implementation for
/// [`mz_repr::explain::ExplainConfig`].
///
/// 3. To enable `CLUSTER ... FEATURES(...)` overrides which will allow
/// engineers to experiment with runtime differences before deploying the
/// optimizer changes:
/// 1. Add the flag to the `ClusterFeatureName` definition.
/// 2. Add the flag to the `generate_extracted_config!(ClusterFeature, ...)`
/// macro call.
/// 3. Extend the `let optimizer_feature_overrides = ...` call in
/// `plan_create_cluster`.
#[derive(Clone, Debug)]
pub struct OptimizerConfig {
/// The mode in which the optimizer runs.
pub mode: OptimizeMode,
/// If the [`GlobalId`] is set the optimizer works in "replan" mode.
///
/// This means that it will not consider catalog items (more specifically
/// indexes) with [`GlobalId`] greater or equal than the one provided here.
pub replan: Option<GlobalId>,
/// Show the slow path plan even if a fast path plan was created. Useful for debugging.
/// Enforced if `timing` is set.
pub no_fast_path: bool,
/// Optimizer feature flags.
pub features: OptimizerFeatures,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum OptimizeMode {
/// A mode where the optimized statement is executed.
Execute,
/// A mode where the optimized statement is explained.
Explain,
}
impl From<&SystemVars> for OptimizerConfig {
fn from(vars: &SystemVars) -> Self {
Self {
mode: OptimizeMode::Execute,
replan: None,
no_fast_path: false,
features: OptimizerFeatures::from(vars),
}
}
}
/// Override [`OptimizerConfig::features`] from [`OptimizerFeatureOverrides`].
impl OverrideFrom<OptimizerFeatureOverrides> for OptimizerConfig {
fn override_from(mut self, overrides: &OptimizerFeatureOverrides) -> Self {
self.features = self.features.override_from(overrides);
self
}
}
/// [`OptimizerConfig`] overrides coming from an [`ExplainContext`].
impl OverrideFrom<ExplainContext> for OptimizerConfig {
fn override_from(mut self, ctx: &ExplainContext) -> Self {
let ExplainContext::Plan(ctx) = ctx else {
return self; // Return immediately for all other contexts.
};
// Override general parameters.
self.mode = OptimizeMode::Explain;
self.replan = ctx.replan;
self.no_fast_path = ctx.config.no_fast_path;
// Override feature flags that can be enabled in the EXPLAIN config.
self.features = self.features.override_from(&ctx.config.features);
// Return the final result.
self
}
}
impl From<&OptimizerConfig> for mz_sql::plan::HirToMirConfig {
fn from(config: &OptimizerConfig) -> Self {
Self {
enable_new_outer_join_lowering: config.features.enable_new_outer_join_lowering,
enable_variadic_left_join_lowering: config.features.enable_variadic_left_join_lowering,
enable_value_window_function_fusion: config
.features
.enable_value_window_function_fusion,
enable_window_aggregation_fusion: config.features.enable_window_aggregation_fusion,
}
}
}
// OptimizerCatalog
// ===============
pub trait OptimizerCatalog: Debug + Send + Sync {
fn get_entry(&self, id: &GlobalId) -> &CatalogEntry;
fn get_entry_by_item_id(&self, id: &CatalogItemId) -> &CatalogEntry;
fn resolve_full_name(
&self,
name: &QualifiedItemName,
conn_id: Option<&ConnectionId>,
) -> FullItemName;
/// Returns all indexes on the given object and cluster known in the
/// catalog.
fn get_indexes_on(
&self,
id: GlobalId,
cluster: ClusterId,
) -> Box<dyn Iterator<Item = (GlobalId, &Index)> + '_>;
}
// OptimizerError
// ===============
/// Error types that can be generated during optimization.
#[derive(Debug, thiserror::Error)]
pub enum OptimizerError {
#[error("{0}")]
PlanError(#[from] PlanError),
#[error("{0}")]
RecursionLimitError(#[from] RecursionLimitError),
#[error("{0}")]
TransformError(#[from] TransformError),
#[error("{0}")]
EvalError(#[from] EvalError),
#[error("cannot materialize call to {0}")]
UnmaterializableFunction(UnmaterializableFunc),
#[error("cannot call {func} in {context} ")]
UncallableFunction {
func: UnmaterializableFunc,
context: &'static str,
},
#[error("MfpPlan couldn't be converted into SafeMfpPlan")]
UnsafeMfpPlan,
#[error("internal optimizer error: {0}")]
Internal(String),
}
impl From<String> for OptimizerError {
fn from(msg: String) -> Self {
Self::Internal(msg)
}
}
impl OptimizerError {
pub fn detail(&self) -> Option<String> {
match self {
Self::UnmaterializableFunction(UnmaterializableFunc::CurrentTimestamp) => {
Some("See: https://materialize.com/docs/sql/functions/now_and_mz_now/".into())
}
_ => None,
}
}
pub fn hint(&self) -> Option<String> {
match self {
Self::UnmaterializableFunction(UnmaterializableFunc::CurrentTimestamp) => {
Some("In temporal filters `mz_now()` may work instead.".into())
}
_ => None,
}
}
}
impl From<TimestampError> for OptimizerError {
fn from(value: TimestampError) -> Self {
OptimizerError::EvalError(EvalError::from(value))
}
}
impl From<anyhow::Error> for OptimizerError {
fn from(value: anyhow::Error) -> Self {
OptimizerError::Internal(value.to_string())
}
}
// Tracing helpers
// ---------------
#[mz_ore::instrument(target = "optimizer", level = "debug", name = "local")]
fn optimize_mir_local(
expr: MirRelationExpr,
ctx: &mut TransformCtx,
) -> Result<OptimizedMirRelationExpr, OptimizerError> {
#[allow(deprecated)]
let optimizer = mz_transform::Optimizer::logical_optimizer(ctx);
let expr = optimizer.optimize(expr, ctx)?;
// Trace the result of this phase.
mz_repr::explain::trace_plan(expr.as_inner());
Ok::<_, OptimizerError>(expr)
}
macro_rules! trace_plan {
(at: $span:literal, $plan:expr) => {
tracing::debug_span!(target: "optimizer", $span).in_scope(|| {
mz_repr::explain::trace_plan($plan);
});
}
}
use trace_plan;
use crate::coord::ExplainContext;