Skip to main content

mz_adapter/optimize/
peek.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Optimizer implementation for `SELECT` statements.
11
12use std::fmt::Debug;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use mz_compute_types::ComputeInstanceId;
17use mz_compute_types::dataflows::IndexDesc;
18use mz_compute_types::plan::Plan;
19use mz_expr::{MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr, RowSetFinishing};
20use mz_ore::soft_assert_or_log;
21use mz_repr::explain::trace_plan;
22use mz_repr::{GlobalId, SqlRelationType, Timestamp};
23use mz_sql::optimizer_metrics::OptimizerMetrics;
24use mz_sql::plan::HirRelationExpr;
25use mz_sql::session::metadata::SessionMetadata;
26use mz_transform::dataflow::DataflowMetainfo;
27use mz_transform::normalize_lets::normalize_lets;
28use mz_transform::reprtypecheck::{
29    SharedContext as ReprTypecheckContext, empty_context as empty_repr_context,
30};
31use mz_transform::{StatisticsOracle, TransformCtx};
32use timely::progress::Antichain;
33use tracing::debug_span;
34
35use crate::TimestampContext;
36use crate::catalog::Catalog;
37use crate::coord::infer_sql_type_for_catalog;
38use crate::coord::peek::{PeekDataflowPlan, PeekPlan, create_fast_path_plan};
39use crate::optimize::dataflows::{
40    ComputeInstanceSnapshot, DataflowBuilder, EvalTime, ExprPrep, ExprPrepOneShot,
41};
42use crate::optimize::{
43    MirDataflowDescription, Optimize, OptimizeMode, OptimizerConfig, OptimizerError,
44    optimize_mir_local, trace_plan,
45};
46
47pub struct Optimizer {
48    /// A representation typechecking context to use throughout the optimizer pipeline.
49    repr_typecheck_ctx: ReprTypecheckContext,
50    /// A snapshot of the catalog state.
51    catalog: Arc<Catalog>,
52    /// A snapshot of the cluster that will run the dataflows.
53    compute_instance: ComputeInstanceSnapshot,
54    /// Optional row-set finishing to be applied to the final result.
55    finishing: RowSetFinishing,
56    /// A transient GlobalId to be used when constructing the dataflow.
57    select_id: GlobalId,
58    /// A transient GlobalId to be used when constructing a PeekPlan.
59    index_id: GlobalId,
60    /// Optimizer config.
61    config: OptimizerConfig,
62    /// Optimizer metrics.
63    metrics: OptimizerMetrics,
64    /// The time spent performing optimization so far.
65    duration: Duration,
66}
67
68impl Optimizer {
69    pub fn new(
70        catalog: Arc<Catalog>,
71        compute_instance: ComputeInstanceSnapshot,
72        finishing: RowSetFinishing,
73        select_id: GlobalId,
74        index_id: GlobalId,
75        config: OptimizerConfig,
76        metrics: OptimizerMetrics,
77    ) -> Self {
78        Self {
79            repr_typecheck_ctx: empty_repr_context(),
80            catalog,
81            compute_instance,
82            finishing,
83            select_id,
84            index_id,
85            config,
86            metrics,
87            duration: Default::default(),
88        }
89    }
90
91    pub fn cluster_id(&self) -> ComputeInstanceId {
92        self.compute_instance.instance_id()
93    }
94
95    pub fn finishing(&self) -> &RowSetFinishing {
96        &self.finishing
97    }
98
99    pub fn select_id(&self) -> GlobalId {
100        self.select_id
101    }
102
103    pub fn index_id(&self) -> GlobalId {
104        self.index_id
105    }
106
107    pub fn config(&self) -> &OptimizerConfig {
108        &self.config
109    }
110
111    pub fn metrics(&self) -> &OptimizerMetrics {
112        &self.metrics
113    }
114
115    pub fn duration(&self) -> Duration {
116        self.duration
117    }
118}
119
120// A bogey `Debug` implementation that hides fields. This is needed to make the
121// `event!` call in `sequence_peek_stage` not emit a lot of data.
122//
123// For now, we skip almost all fields, but we might revisit that bit if it turns
124// out that we really need those for debugging purposes.
125impl Debug for Optimizer {
126    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127        f.debug_struct("OptimizePeek")
128            .field("config", &self.config)
129            .finish_non_exhaustive()
130    }
131}
132
133/// Marker type for [`LocalMirPlan`] representing an optimization result without
134/// context.
135pub struct Unresolved;
136
137/// The (sealed intermediate) result after HIR ⇒ MIR lowering and decorrelation
138/// and local MIR optimization.
139#[derive(Clone)]
140pub struct LocalMirPlan<T = Unresolved> {
141    expr: MirRelationExpr,
142    typ: SqlRelationType,
143    df_meta: DataflowMetainfo,
144    context: T,
145}
146
147/// Marker type for [`LocalMirPlan`] structs representing an optimization result
148/// with attached environment context required for the next optimization stage.
149pub struct Resolved<'s> {
150    timestamp_ctx: TimestampContext<Timestamp>,
151    stats: Box<dyn StatisticsOracle>,
152    session: &'s dyn SessionMetadata,
153}
154
155/// The (final) result after
156///
157/// 1. embedding a [`LocalMirPlan`] into a `DataflowDescription`,
158/// 2. transitively inlining referenced views,
159/// 3. timestamp resolution,
160/// 4. optimizing the resulting `DataflowDescription` with `MIR` plans.
161/// 5. MIR ⇒ LIR lowering, and
162/// 6. optimizing the resulting `DataflowDescription` with `LIR` plans.
163#[derive(Debug)]
164pub struct GlobalLirPlan {
165    peek_plan: PeekPlan,
166    df_meta: DataflowMetainfo,
167    typ: SqlRelationType,
168}
169
170impl Optimize<HirRelationExpr> for Optimizer {
171    type To = LocalMirPlan;
172
173    fn optimize(&mut self, expr: HirRelationExpr) -> Result<Self::To, OptimizerError> {
174        let time = Instant::now();
175
176        // Trace the pipeline input under `optimize/raw`.
177        trace_plan!(at: "raw", &expr);
178
179        // HIR ⇒ MIR lowering and decorrelation
180        let mir_expr = expr.clone().lower(&self.config, Some(&self.metrics))?;
181
182        // MIR ⇒ MIR optimization (local)
183        let mut df_meta = DataflowMetainfo::default();
184        let mut transform_ctx = TransformCtx::local(
185            &self.config.features,
186            &self.repr_typecheck_ctx,
187            &mut df_meta,
188            Some(&mut self.metrics),
189            Some(self.select_id),
190        );
191        let mir_expr = optimize_mir_local(mir_expr, &mut transform_ctx)?.into_inner();
192        let typ = infer_sql_type_for_catalog(&expr, &mir_expr);
193
194        self.duration += time.elapsed();
195
196        // Return the (sealed) plan at the end of this optimization step.
197        Ok(LocalMirPlan {
198            expr: mir_expr,
199            typ,
200            df_meta,
201            context: Unresolved,
202        })
203    }
204}
205
206impl LocalMirPlan<Unresolved> {
207    /// Produces the [`LocalMirPlan`] with [`Resolved`] contextual information
208    /// required for the next stage.
209    pub fn resolve(
210        self,
211        timestamp_ctx: TimestampContext<Timestamp>,
212        session: &dyn SessionMetadata,
213        stats: Box<dyn StatisticsOracle>,
214    ) -> LocalMirPlan<Resolved<'_>> {
215        LocalMirPlan {
216            expr: self.expr,
217            typ: self.typ,
218            df_meta: self.df_meta,
219            context: Resolved {
220                timestamp_ctx,
221                session,
222                stats,
223            },
224        }
225    }
226}
227
228impl<'s> Optimize<LocalMirPlan<Resolved<'s>>> for Optimizer {
229    type To = GlobalLirPlan;
230
231    fn optimize(&mut self, plan: LocalMirPlan<Resolved<'s>>) -> Result<Self::To, OptimizerError> {
232        let time = Instant::now();
233
234        let LocalMirPlan {
235            expr,
236            typ,
237            mut df_meta,
238            context:
239                Resolved {
240                    timestamp_ctx,
241                    stats,
242                    session,
243                },
244        } = plan;
245
246        let expr = OptimizedMirRelationExpr(expr);
247
248        // We create a dataflow and optimize it, to determine if we can avoid building it.
249        // This can happen if the result optimizes to a constant, or to a `Get` expression
250        // around a maintained arrangement.
251        let key = typ
252            .default_key()
253            .iter()
254            .map(|k| MirScalarExpr::column(*k))
255            .collect();
256
257        // The assembled dataflow contains a view and an index of that view.
258        let mut df_builder = {
259            let catalog = self.catalog.state();
260            let compute = self.compute_instance.clone();
261            DataflowBuilder::new(catalog, compute).with_config(&self.config)
262        };
263
264        let debug_name = format!("oneshot-select-{}", self.select_id);
265        let mut df_desc = MirDataflowDescription::new(debug_name.to_string());
266
267        df_builder.import_view_into_dataflow(
268            &self.select_id,
269            &expr,
270            &mut df_desc,
271            &self.config.features,
272        )?;
273        df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
274
275        // Resolve all unmaterializable function calls except mz_now(), because
276        // we don't yet have a timestamp.
277        let style = ExprPrepOneShot {
278            logical_time: EvalTime::Deferred,
279            session,
280            catalog_state: self.catalog.state(),
281        };
282        df_desc.visit_children(
283            |r| style.prep_relation_expr(r),
284            |s| style.prep_scalar_expr(s),
285        )?;
286
287        // TODO: Instead of conditioning here we should really
288        // reconsider how to render multi-plan peek dataflows. The main
289        // difficulty here is rendering the optional finishing bit.
290        if self.config.mode != OptimizeMode::Explain {
291            df_desc.export_index(
292                self.index_id,
293                IndexDesc {
294                    on_id: self.select_id,
295                    key,
296                },
297                typ.clone(),
298            );
299        }
300
301        // Set the `as_of` and `until` timestamps for the dataflow.
302        df_desc.set_as_of(timestamp_ctx.antichain());
303
304        // Get the single timestamp representing the `as_of` time.
305        let as_of = df_desc
306            .as_of
307            .clone()
308            .expect("as_of antichain")
309            .into_option()
310            .expect("unique as_of element");
311
312        // Resolve all unmaterializable function calls including mz_now().
313        let style = ExprPrepOneShot {
314            logical_time: EvalTime::Time(as_of),
315            session,
316            catalog_state: self.catalog.state(),
317        };
318        df_desc.visit_children(
319            |r| style.prep_relation_expr(r),
320            |s| style.prep_scalar_expr(s),
321        )?;
322
323        // Use the opportunity to name an `until` frontier that will prevent
324        // work we needn't perform. By default, `until` will be
325        // `Antichain::new()`, which prevents no updates and is safe.
326        //
327        // If `timestamp_ctx.antichain()` is empty, `timestamp_ctx.timestamp()`
328        // will return `None` and we use the default (empty) `until`. Otherwise,
329        // we expect to be able to set `until = as_of + 1` without an overflow, unless
330        // we query at the maximum timestamp. In this case, the default empty `until`
331        // is the correct choice.
332        if let Some(until) = timestamp_ctx
333            .timestamp()
334            .and_then(Timestamp::try_step_forward)
335        {
336            df_desc.until = Antichain::from_elem(until);
337        }
338
339        // Construct TransformCtx for global optimization.
340        let mut transform_ctx = TransformCtx::global(
341            &df_builder,
342            &*stats,
343            &self.config.features,
344            &self.repr_typecheck_ctx,
345            &mut df_meta,
346            Some(&mut self.metrics),
347        );
348
349        // Let's already try creating a fast path plan. If successful, we don't need to run the
350        // whole optimizer pipeline, but just a tiny subset of it. (But we'll need to run
351        // `create_fast_path_plan` later again, because, e.g., running `LiteralConstraints` is still
352        // ahead of us.)
353        let use_fast_path_optimizer = match create_fast_path_plan(
354            &mut df_desc,
355            self.select_id,
356            Some(&self.finishing),
357            self.config.features.persist_fast_path_limit,
358            self.config.persist_fast_path_order,
359        ) {
360            Ok(maybe_fast_path_plan) => maybe_fast_path_plan.is_some(),
361            Err(OptimizerError::InternalUnsafeMfpPlan(_)) => {
362                // This is expected, in that `create_fast_path_plan` can choke on `mz_now`, which we
363                // haven't removed yet.
364                false
365            }
366            Err(e) => {
367                return Err(e);
368            }
369        };
370
371        // Run global optimization.
372        mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, use_fast_path_optimizer)?;
373
374        if self.config.mode == OptimizeMode::Explain {
375            // Collect the list of indexes used by the dataflow at this point.
376            trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
377        }
378
379        // TODO: use the following code once we can be sure that the
380        // index_exports always exist.
381        //
382        // let typ = self.df_desc
383        //     .index_exports
384        //     .first_key_value()
385        //     .map(|(_key, (_desc, typ))| typ.clone())
386        //     .expect("GlobalMirPlan type");
387
388        let peek_plan = match create_fast_path_plan(
389            &mut df_desc,
390            self.select_id,
391            Some(&self.finishing),
392            self.config.features.persist_fast_path_limit,
393            self.config.persist_fast_path_order,
394        )? {
395            Some(plan) if !self.config.no_fast_path => {
396                if self.config.mode == OptimizeMode::Explain {
397                    // Trace the `used_indexes` for the FastPathPlan.
398                    debug_span!(target: "optimizer", "fast_path").in_scope(|| {
399                        // Fast path plans come with an updated finishing.
400                        let finishing = if !self.finishing.is_trivial(typ.arity()) {
401                            Some(&self.finishing)
402                        } else {
403                            None
404                        };
405                        trace_plan(&plan.used_indexes(finishing));
406                    });
407                }
408                // Trace the FastPathPlan.
409                trace_plan!(at: "fast_path", &plan);
410
411                // Trace the pipeline output under `optimize`.
412                trace_plan(&plan);
413
414                // Build the PeekPlan
415                PeekPlan::FastPath(plan)
416            }
417            _ => {
418                soft_assert_or_log!(
419                    !use_fast_path_optimizer || self.config.no_fast_path,
420                    "The fast_path_optimizer shouldn't make a fast path plan slow path."
421                );
422
423                // Ensure all expressions are normalized before finalizing.
424                for build in df_desc.objects_to_build.iter_mut() {
425                    normalize_lets(&mut build.plan.0, &self.config.features)?
426                }
427
428                // Finalize the dataflow. This includes:
429                // - MIR ⇒ LIR lowering
430                // - LIR ⇒ LIR transforms
431                let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
432
433                // Trace the pipeline output under `optimize`.
434                trace_plan(&df_desc);
435
436                // Build the PeekPlan
437                PeekPlan::SlowPath(PeekDataflowPlan::new(df_desc, self.index_id(), &typ))
438            }
439        };
440
441        self.duration += time.elapsed();
442        let label = match &peek_plan {
443            PeekPlan::FastPath(_) => "peek:fast_path",
444            PeekPlan::SlowPath(_) => "peek:slow_path",
445        };
446        self.metrics
447            .observe_e2e_optimization_time(label, self.duration);
448
449        Ok(GlobalLirPlan {
450            peek_plan,
451            df_meta,
452            typ,
453        })
454    }
455}
456
457impl GlobalLirPlan {
458    /// Returns a reference to the peek plan.
459    pub fn peek_plan(&self) -> &PeekPlan {
460        &self.peek_plan
461    }
462
463    /// Unwraps the parts of the final result of the optimization pipeline.
464    pub fn unapply(self) -> (PeekPlan, DataflowMetainfo, SqlRelationType) {
465        (self.peek_plan, self.df_meta, self.typ)
466    }
467}