Skip to main content

mz_adapter/optimize/
peek.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Optimizer implementation for `SELECT` statements.
11
12use std::fmt::Debug;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use mz_compute_types::ComputeInstanceId;
17use mz_compute_types::dataflows::IndexDesc;
18use mz_compute_types::plan::Plan;
19use mz_expr::{MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr, RowSetFinishing};
20use mz_ore::soft_assert_or_log;
21use mz_repr::explain::trace_plan;
22use mz_repr::{GlobalId, ReprRelationType, SqlRelationType, Timestamp};
23use mz_sql::optimizer_metrics::OptimizerMetrics;
24use mz_sql::plan::HirRelationExpr;
25use mz_sql::session::metadata::SessionMetadata;
26use mz_transform::dataflow::DataflowMetainfo;
27use mz_transform::normalize_lets::normalize_lets;
28use mz_transform::typecheck::{SharedTypecheckingContext, empty_typechecking_context};
29use mz_transform::{StatisticsOracle, TransformCtx};
30use timely::progress::Antichain;
31use tracing::debug_span;
32
33use crate::TimestampContext;
34use crate::catalog::Catalog;
35use crate::coord::infer_sql_type_for_catalog;
36use crate::coord::peek::{PeekDataflowPlan, PeekPlan, create_fast_path_plan};
37use crate::optimize::dataflows::{
38    ComputeInstanceSnapshot, DataflowBuilder, EvalTime, ExprPrep, ExprPrepOneShot,
39};
40use crate::optimize::{
41    MirDataflowDescription, Optimize, OptimizeMode, OptimizerConfig, OptimizerError,
42    optimize_mir_local, trace_plan,
43};
44
45pub struct Optimizer {
46    /// A representation typechecking context to use throughout the optimizer pipeline.
47    typecheck_ctx: SharedTypecheckingContext,
48    /// A snapshot of the catalog state.
49    catalog: Arc<Catalog>,
50    /// A snapshot of the cluster that will run the dataflows.
51    compute_instance: ComputeInstanceSnapshot,
52    /// Optional row-set finishing to be applied to the final result.
53    finishing: RowSetFinishing,
54    /// A transient GlobalId to be used when constructing the dataflow.
55    select_id: GlobalId,
56    /// A transient GlobalId to be used when constructing a PeekPlan.
57    index_id: GlobalId,
58    /// Optimizer config.
59    config: OptimizerConfig,
60    /// Optimizer metrics.
61    metrics: OptimizerMetrics,
62    /// The time spent performing optimization so far.
63    duration: Duration,
64}
65
66impl Optimizer {
67    pub fn new(
68        catalog: Arc<Catalog>,
69        compute_instance: ComputeInstanceSnapshot,
70        finishing: RowSetFinishing,
71        select_id: GlobalId,
72        index_id: GlobalId,
73        config: OptimizerConfig,
74        metrics: OptimizerMetrics,
75    ) -> Self {
76        Self {
77            typecheck_ctx: empty_typechecking_context(),
78            catalog,
79            compute_instance,
80            finishing,
81            select_id,
82            index_id,
83            config,
84            metrics,
85            duration: Default::default(),
86        }
87    }
88
89    pub fn cluster_id(&self) -> ComputeInstanceId {
90        self.compute_instance.instance_id()
91    }
92
93    pub fn finishing(&self) -> &RowSetFinishing {
94        &self.finishing
95    }
96
97    pub fn select_id(&self) -> GlobalId {
98        self.select_id
99    }
100
101    pub fn index_id(&self) -> GlobalId {
102        self.index_id
103    }
104
105    pub fn config(&self) -> &OptimizerConfig {
106        &self.config
107    }
108
109    pub fn metrics(&self) -> &OptimizerMetrics {
110        &self.metrics
111    }
112
113    pub fn duration(&self) -> Duration {
114        self.duration
115    }
116}
117
118// A bogey `Debug` implementation that hides fields. This is needed to make the
119// `event!` call in `sequence_peek_stage` not emit a lot of data.
120//
121// For now, we skip almost all fields, but we might revisit that bit if it turns
122// out that we really need those for debugging purposes.
123impl Debug for Optimizer {
124    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125        f.debug_struct("OptimizePeek")
126            .field("config", &self.config)
127            .finish_non_exhaustive()
128    }
129}
130
131/// Marker type for [`LocalMirPlan`] representing an optimization result without
132/// context.
133pub struct Unresolved;
134
135/// The (sealed intermediate) result after HIR ⇒ MIR lowering and decorrelation
136/// and local MIR optimization.
137#[derive(Clone)]
138pub struct LocalMirPlan<T = Unresolved> {
139    expr: MirRelationExpr,
140    typ: SqlRelationType,
141    df_meta: DataflowMetainfo,
142    context: T,
143}
144
145/// Marker type for [`LocalMirPlan`] structs representing an optimization result
146/// with attached environment context required for the next optimization stage.
147pub struct Resolved<'s> {
148    timestamp_ctx: TimestampContext<Timestamp>,
149    stats: Box<dyn StatisticsOracle>,
150    session: &'s dyn SessionMetadata,
151}
152
153/// The (final) result after
154///
155/// 1. embedding a [`LocalMirPlan`] into a `DataflowDescription`,
156/// 2. transitively inlining referenced views,
157/// 3. timestamp resolution,
158/// 4. optimizing the resulting `DataflowDescription` with `MIR` plans.
159/// 5. MIR ⇒ LIR lowering, and
160/// 6. optimizing the resulting `DataflowDescription` with `LIR` plans.
161#[derive(Debug)]
162pub struct GlobalLirPlan {
163    peek_plan: PeekPlan,
164    df_meta: DataflowMetainfo,
165    typ: SqlRelationType,
166}
167
168impl Optimize<HirRelationExpr> for Optimizer {
169    type To = LocalMirPlan;
170
171    fn optimize(&mut self, expr: HirRelationExpr) -> Result<Self::To, OptimizerError> {
172        let time = Instant::now();
173
174        // Trace the pipeline input under `optimize/raw`.
175        trace_plan!(at: "raw", &expr);
176
177        // HIR ⇒ MIR lowering and decorrelation
178        let mir_expr = expr.clone().lower(&self.config, Some(&self.metrics))?;
179
180        // MIR ⇒ MIR optimization (local)
181        let mut df_meta = DataflowMetainfo::default();
182        let mut transform_ctx = TransformCtx::local(
183            &self.config.features,
184            &self.typecheck_ctx,
185            &mut df_meta,
186            Some(&mut self.metrics),
187            Some(self.select_id),
188        );
189        let mir_expr = optimize_mir_local(mir_expr, &mut transform_ctx)?.into_inner();
190        let typ = infer_sql_type_for_catalog(&expr, &mir_expr);
191
192        self.duration += time.elapsed();
193
194        // Return the (sealed) plan at the end of this optimization step.
195        Ok(LocalMirPlan {
196            expr: mir_expr,
197            typ,
198            df_meta,
199            context: Unresolved,
200        })
201    }
202}
203
204impl LocalMirPlan<Unresolved> {
205    /// Produces the [`LocalMirPlan`] with [`Resolved`] contextual information
206    /// required for the next stage.
207    pub fn resolve(
208        self,
209        timestamp_ctx: TimestampContext<Timestamp>,
210        session: &dyn SessionMetadata,
211        stats: Box<dyn StatisticsOracle>,
212    ) -> LocalMirPlan<Resolved<'_>> {
213        LocalMirPlan {
214            expr: self.expr,
215            typ: self.typ,
216            df_meta: self.df_meta,
217            context: Resolved {
218                timestamp_ctx,
219                session,
220                stats,
221            },
222        }
223    }
224}
225
226impl<'s> Optimize<LocalMirPlan<Resolved<'s>>> for Optimizer {
227    type To = GlobalLirPlan;
228
229    fn optimize(&mut self, plan: LocalMirPlan<Resolved<'s>>) -> Result<Self::To, OptimizerError> {
230        let time = Instant::now();
231
232        let LocalMirPlan {
233            expr,
234            typ,
235            mut df_meta,
236            context:
237                Resolved {
238                    timestamp_ctx,
239                    stats,
240                    session,
241                },
242        } = plan;
243
244        let expr = OptimizedMirRelationExpr(expr);
245
246        // We create a dataflow and optimize it, to determine if we can avoid building it.
247        // This can happen if the result optimizes to a constant, or to a `Get` expression
248        // around a maintained arrangement.
249        let key = typ
250            .default_key()
251            .iter()
252            .map(|k| MirScalarExpr::column(*k))
253            .collect();
254
255        // The assembled dataflow contains a view and an index of that view.
256        let mut df_builder = {
257            let catalog = self.catalog.state();
258            let compute = self.compute_instance.clone();
259            DataflowBuilder::new(catalog, compute).with_config(&self.config)
260        };
261
262        let debug_name = format!("oneshot-select-{}", self.select_id);
263        let mut df_desc = MirDataflowDescription::new(debug_name.to_string());
264
265        df_builder.import_view_into_dataflow(
266            &self.select_id,
267            &expr,
268            &mut df_desc,
269            &self.config.features,
270        )?;
271        df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
272
273        // Resolve all unmaterializable function calls except mz_now(), because
274        // we don't yet have a timestamp.
275        let style = ExprPrepOneShot {
276            logical_time: EvalTime::Deferred,
277            session,
278            catalog_state: self.catalog.state(),
279        };
280        df_desc.visit_children(
281            |r| style.prep_relation_expr(r),
282            |s| style.prep_scalar_expr(s),
283        )?;
284
285        // TODO: Instead of conditioning here we should really
286        // reconsider how to render multi-plan peek dataflows. The main
287        // difficulty here is rendering the optional finishing bit.
288        if self.config.mode != OptimizeMode::Explain {
289            df_desc.export_index(
290                self.index_id,
291                IndexDesc {
292                    on_id: self.select_id,
293                    key,
294                },
295                ReprRelationType::from(&typ),
296            );
297        }
298
299        // Set the `as_of` and `until` timestamps for the dataflow.
300        df_desc.set_as_of(timestamp_ctx.antichain());
301
302        // Get the single timestamp representing the `as_of` time.
303        let as_of = df_desc
304            .as_of
305            .clone()
306            .expect("as_of antichain")
307            .into_option()
308            .expect("unique as_of element");
309
310        // Resolve all unmaterializable function calls including mz_now().
311        let style = ExprPrepOneShot {
312            logical_time: EvalTime::Time(as_of),
313            session,
314            catalog_state: self.catalog.state(),
315        };
316        df_desc.visit_children(
317            |r| style.prep_relation_expr(r),
318            |s| style.prep_scalar_expr(s),
319        )?;
320
321        // Use the opportunity to name an `until` frontier that will prevent
322        // work we needn't perform. By default, `until` will be
323        // `Antichain::new()`, which prevents no updates and is safe.
324        //
325        // If `timestamp_ctx.antichain()` is empty, `timestamp_ctx.timestamp()`
326        // will return `None` and we use the default (empty) `until`. Otherwise,
327        // we expect to be able to set `until = as_of + 1` without an overflow, unless
328        // we query at the maximum timestamp. In this case, the default empty `until`
329        // is the correct choice.
330        if let Some(until) = timestamp_ctx
331            .timestamp()
332            .and_then(Timestamp::try_step_forward)
333        {
334            df_desc.until = Antichain::from_elem(until);
335        }
336
337        // Construct TransformCtx for global optimization.
338        let mut transform_ctx = TransformCtx::global(
339            &df_builder,
340            &*stats,
341            &self.config.features,
342            &self.typecheck_ctx,
343            &mut df_meta,
344            Some(&mut self.metrics),
345        );
346
347        // Let's already try creating a fast path plan. If successful, we don't need to run the
348        // whole optimizer pipeline, but just a tiny subset of it. (But we'll need to run
349        // `create_fast_path_plan` later again, because, e.g., running `LiteralConstraints` is still
350        // ahead of us.)
351        let use_fast_path_optimizer = match create_fast_path_plan(
352            &mut df_desc,
353            self.select_id,
354            Some(&self.finishing),
355            self.config.features.persist_fast_path_limit,
356            self.config.persist_fast_path_order,
357        ) {
358            Ok(maybe_fast_path_plan) => maybe_fast_path_plan.is_some(),
359            Err(OptimizerError::InternalUnsafeMfpPlan(_)) => {
360                // This is expected, in that `create_fast_path_plan` can choke on `mz_now`, which we
361                // haven't removed yet.
362                false
363            }
364            Err(e) => {
365                return Err(e);
366            }
367        };
368
369        // Run global optimization.
370        mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, use_fast_path_optimizer)?;
371
372        if self.config.mode == OptimizeMode::Explain {
373            // Collect the list of indexes used by the dataflow at this point.
374            trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
375        }
376
377        // TODO: use the following code once we can be sure that the
378        // index_exports always exist.
379        //
380        // let typ = self.df_desc
381        //     .index_exports
382        //     .first_key_value()
383        //     .map(|(_key, (_desc, typ))| typ.clone())
384        //     .expect("GlobalMirPlan type");
385
386        let peek_plan = match create_fast_path_plan(
387            &mut df_desc,
388            self.select_id,
389            Some(&self.finishing),
390            self.config.features.persist_fast_path_limit,
391            self.config.persist_fast_path_order,
392        )? {
393            Some(plan) if !self.config.no_fast_path => {
394                if self.config.mode == OptimizeMode::Explain {
395                    // Trace the `used_indexes` for the FastPathPlan.
396                    debug_span!(target: "optimizer", "fast_path").in_scope(|| {
397                        // Fast path plans come with an updated finishing.
398                        let finishing = if !self.finishing.is_trivial(typ.arity()) {
399                            Some(&self.finishing)
400                        } else {
401                            None
402                        };
403                        trace_plan(&plan.used_indexes(finishing));
404                    });
405                }
406                // Trace the FastPathPlan.
407                trace_plan!(at: "fast_path", &plan);
408
409                // Trace the pipeline output under `optimize`.
410                trace_plan(&plan);
411
412                // Build the PeekPlan
413                PeekPlan::FastPath(plan)
414            }
415            _ => {
416                soft_assert_or_log!(
417                    !use_fast_path_optimizer || self.config.no_fast_path,
418                    "The fast_path_optimizer shouldn't make a fast path plan slow path."
419                );
420
421                // Ensure all expressions are normalized before finalizing.
422                for build in df_desc.objects_to_build.iter_mut() {
423                    normalize_lets(&mut build.plan.0, &self.config.features)?
424                }
425
426                // Finalize the dataflow. This includes:
427                // - MIR ⇒ LIR lowering
428                // - LIR ⇒ LIR transforms
429                let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
430
431                // Trace the pipeline output under `optimize`.
432                trace_plan(&df_desc);
433
434                // Build the PeekPlan
435                PeekPlan::SlowPath(PeekDataflowPlan::new(df_desc, self.index_id(), &typ))
436            }
437        };
438
439        self.duration += time.elapsed();
440        let label = match &peek_plan {
441            PeekPlan::FastPath(_) => "peek:fast_path",
442            PeekPlan::SlowPath(_) => "peek:slow_path",
443        };
444        self.metrics
445            .observe_e2e_optimization_time(label, self.duration);
446
447        Ok(GlobalLirPlan {
448            peek_plan,
449            df_meta,
450            typ,
451        })
452    }
453}
454
455impl GlobalLirPlan {
456    /// Returns a reference to the peek plan.
457    pub fn peek_plan(&self) -> &PeekPlan {
458        &self.peek_plan
459    }
460
461    /// Unwraps the parts of the final result of the optimization pipeline.
462    pub fn unapply(self) -> (PeekPlan, DataflowMetainfo, SqlRelationType) {
463        (self.peek_plan, self.df_meta, self.typ)
464    }
465}