mz_adapter/optimize/
peek.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Optimizer implementation for `SELECT` statements.
11
12use std::fmt::Debug;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use mz_compute_types::ComputeInstanceId;
17use mz_compute_types::dataflows::IndexDesc;
18use mz_compute_types::plan::Plan;
19use mz_expr::{MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr, RowSetFinishing};
20use mz_ore::soft_assert_or_log;
21use mz_repr::explain::trace_plan;
22use mz_repr::{GlobalId, SqlRelationType, Timestamp};
23use mz_sql::optimizer_metrics::OptimizerMetrics;
24use mz_sql::plan::HirRelationExpr;
25use mz_sql::session::metadata::SessionMetadata;
26use mz_transform::dataflow::DataflowMetainfo;
27use mz_transform::normalize_lets::normalize_lets;
28use mz_transform::reprtypecheck::{
29    SharedContext as ReprTypecheckContext, empty_context as empty_repr_context,
30};
31use mz_transform::{StatisticsOracle, TransformCtx};
32use timely::progress::Antichain;
33use tracing::debug_span;
34
35use crate::TimestampContext;
36use crate::catalog::Catalog;
37use crate::coord::peek::{PeekDataflowPlan, PeekPlan, create_fast_path_plan};
38use crate::optimize::dataflows::{
39    ComputeInstanceSnapshot, DataflowBuilder, EvalTime, ExprPrep, ExprPrepOneShot,
40};
41use crate::optimize::{
42    MirDataflowDescription, Optimize, OptimizeMode, OptimizerConfig, OptimizerError,
43    optimize_mir_local, trace_plan,
44};
45
46pub struct Optimizer {
47    /// A representation typechecking context to use throughout the optimizer pipeline.
48    repr_typecheck_ctx: ReprTypecheckContext,
49    /// A snapshot of the catalog state.
50    catalog: Arc<Catalog>,
51    /// A snapshot of the cluster that will run the dataflows.
52    compute_instance: ComputeInstanceSnapshot,
53    /// Optional row-set finishing to be applied to the final result.
54    finishing: RowSetFinishing,
55    /// A transient GlobalId to be used when constructing the dataflow.
56    select_id: GlobalId,
57    /// A transient GlobalId to be used when constructing a PeekPlan.
58    index_id: GlobalId,
59    /// Optimizer config.
60    config: OptimizerConfig,
61    /// Optimizer metrics.
62    metrics: OptimizerMetrics,
63    /// The time spent performing optimization so far.
64    duration: Duration,
65}
66
67impl Optimizer {
68    pub fn new(
69        catalog: Arc<Catalog>,
70        compute_instance: ComputeInstanceSnapshot,
71        finishing: RowSetFinishing,
72        select_id: GlobalId,
73        index_id: GlobalId,
74        config: OptimizerConfig,
75        metrics: OptimizerMetrics,
76    ) -> Self {
77        Self {
78            repr_typecheck_ctx: empty_repr_context(),
79            catalog,
80            compute_instance,
81            finishing,
82            select_id,
83            index_id,
84            config,
85            metrics,
86            duration: Default::default(),
87        }
88    }
89
90    pub fn cluster_id(&self) -> ComputeInstanceId {
91        self.compute_instance.instance_id()
92    }
93
94    pub fn finishing(&self) -> &RowSetFinishing {
95        &self.finishing
96    }
97
98    pub fn select_id(&self) -> GlobalId {
99        self.select_id
100    }
101
102    pub fn index_id(&self) -> GlobalId {
103        self.index_id
104    }
105
106    pub fn config(&self) -> &OptimizerConfig {
107        &self.config
108    }
109
110    pub fn metrics(&self) -> &OptimizerMetrics {
111        &self.metrics
112    }
113
114    pub fn duration(&self) -> Duration {
115        self.duration
116    }
117}
118
119// A bogey `Debug` implementation that hides fields. This is needed to make the
120// `event!` call in `sequence_peek_stage` not emit a lot of data.
121//
122// For now, we skip almost all fields, but we might revisit that bit if it turns
123// out that we really need those for debugging purposes.
124impl Debug for Optimizer {
125    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126        f.debug_struct("OptimizePeek")
127            .field("config", &self.config)
128            .finish_non_exhaustive()
129    }
130}
131
132/// Marker type for [`LocalMirPlan`] representing an optimization result without
133/// context.
134pub struct Unresolved;
135
136/// The (sealed intermediate) result after HIR ⇒ MIR lowering and decorrelation
137/// and local MIR optimization.
138#[derive(Clone)]
139pub struct LocalMirPlan<T = Unresolved> {
140    expr: MirRelationExpr,
141    df_meta: DataflowMetainfo,
142    context: T,
143}
144
145/// Marker type for [`LocalMirPlan`] structs representing an optimization result
146/// with attached environment context required for the next optimization stage.
147pub struct Resolved<'s> {
148    timestamp_ctx: TimestampContext<Timestamp>,
149    stats: Box<dyn StatisticsOracle>,
150    session: &'s dyn SessionMetadata,
151}
152
153/// The (final) result after
154///
155/// 1. embedding a [`LocalMirPlan`] into a `DataflowDescription`,
156/// 2. transitively inlining referenced views,
157/// 3. timestamp resolution,
158/// 4. optimizing the resulting `DataflowDescription` with `MIR` plans.
159/// 5. MIR ⇒ LIR lowering, and
160/// 6. optimizing the resulting `DataflowDescription` with `LIR` plans.
161#[derive(Debug)]
162pub struct GlobalLirPlan {
163    peek_plan: PeekPlan,
164    df_meta: DataflowMetainfo,
165    typ: SqlRelationType,
166}
167
168impl Optimize<HirRelationExpr> for Optimizer {
169    type To = LocalMirPlan;
170
171    fn optimize(&mut self, expr: HirRelationExpr) -> Result<Self::To, OptimizerError> {
172        let time = Instant::now();
173
174        // Trace the pipeline input under `optimize/raw`.
175        trace_plan!(at: "raw", &expr);
176
177        // HIR ⇒ MIR lowering and decorrelation
178        let expr = expr.lower(&self.config, Some(&self.metrics))?;
179
180        // MIR ⇒ MIR optimization (local)
181        let mut df_meta = DataflowMetainfo::default();
182        let mut transform_ctx = TransformCtx::local(
183            &self.config.features,
184            &self.repr_typecheck_ctx,
185            &mut df_meta,
186            Some(&mut self.metrics),
187            Some(self.select_id),
188        );
189        let expr = optimize_mir_local(expr, &mut transform_ctx)?.into_inner();
190
191        self.duration += time.elapsed();
192
193        // Return the (sealed) plan at the end of this optimization step.
194        Ok(LocalMirPlan {
195            expr,
196            df_meta,
197            context: Unresolved,
198        })
199    }
200}
201
202impl LocalMirPlan<Unresolved> {
203    /// Produces the [`LocalMirPlan`] with [`Resolved`] contextual information
204    /// required for the next stage.
205    pub fn resolve(
206        self,
207        timestamp_ctx: TimestampContext<Timestamp>,
208        session: &dyn SessionMetadata,
209        stats: Box<dyn StatisticsOracle>,
210    ) -> LocalMirPlan<Resolved<'_>> {
211        LocalMirPlan {
212            expr: self.expr,
213            df_meta: self.df_meta,
214            context: Resolved {
215                timestamp_ctx,
216                session,
217                stats,
218            },
219        }
220    }
221}
222
223impl<'s> Optimize<LocalMirPlan<Resolved<'s>>> for Optimizer {
224    type To = GlobalLirPlan;
225
226    fn optimize(&mut self, plan: LocalMirPlan<Resolved<'s>>) -> Result<Self::To, OptimizerError> {
227        let time = Instant::now();
228
229        let LocalMirPlan {
230            expr,
231            mut df_meta,
232            context:
233                Resolved {
234                    timestamp_ctx,
235                    stats,
236                    session,
237                },
238        } = plan;
239
240        let expr = OptimizedMirRelationExpr(expr);
241
242        // We create a dataflow and optimize it, to determine if we can avoid building it.
243        // This can happen if the result optimizes to a constant, or to a `Get` expression
244        // around a maintained arrangement.
245        let typ = expr.typ();
246        let key = typ
247            .default_key()
248            .iter()
249            .map(|k| MirScalarExpr::column(*k))
250            .collect();
251
252        // The assembled dataflow contains a view and an index of that view.
253        let mut df_builder = {
254            let catalog = self.catalog.state();
255            let compute = self.compute_instance.clone();
256            DataflowBuilder::new(catalog, compute).with_config(&self.config)
257        };
258
259        let debug_name = format!("oneshot-select-{}", self.select_id);
260        let mut df_desc = MirDataflowDescription::new(debug_name.to_string());
261
262        df_builder.import_view_into_dataflow(
263            &self.select_id,
264            &expr,
265            &mut df_desc,
266            &self.config.features,
267        )?;
268        df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
269
270        // Resolve all unmaterializable function calls except mz_now(), because
271        // we don't yet have a timestamp.
272        let style = ExprPrepOneShot {
273            logical_time: EvalTime::Deferred,
274            session,
275            catalog_state: self.catalog.state(),
276        };
277        df_desc.visit_children(
278            |r| style.prep_relation_expr(r),
279            |s| style.prep_scalar_expr(s),
280        )?;
281
282        // TODO: Instead of conditioning here we should really
283        // reconsider how to render multi-plan peek dataflows. The main
284        // difficulty here is rendering the optional finishing bit.
285        if self.config.mode != OptimizeMode::Explain {
286            df_desc.export_index(
287                self.index_id,
288                IndexDesc {
289                    on_id: self.select_id,
290                    key,
291                },
292                typ.clone(),
293            );
294        }
295
296        // Set the `as_of` and `until` timestamps for the dataflow.
297        df_desc.set_as_of(timestamp_ctx.antichain());
298
299        // Get the single timestamp representing the `as_of` time.
300        let as_of = df_desc
301            .as_of
302            .clone()
303            .expect("as_of antichain")
304            .into_option()
305            .expect("unique as_of element");
306
307        // Resolve all unmaterializable function calls including mz_now().
308        let style = ExprPrepOneShot {
309            logical_time: EvalTime::Time(as_of),
310            session,
311            catalog_state: self.catalog.state(),
312        };
313        df_desc.visit_children(
314            |r| style.prep_relation_expr(r),
315            |s| style.prep_scalar_expr(s),
316        )?;
317
318        // Use the opportunity to name an `until` frontier that will prevent
319        // work we needn't perform. By default, `until` will be
320        // `Antichain::new()`, which prevents no updates and is safe.
321        //
322        // If `timestamp_ctx.antichain()` is empty, `timestamp_ctx.timestamp()`
323        // will return `None` and we use the default (empty) `until`. Otherwise,
324        // we expect to be able to set `until = as_of + 1` without an overflow, unless
325        // we query at the maximum timestamp. In this case, the default empty `until`
326        // is the correct choice.
327        if let Some(until) = timestamp_ctx
328            .timestamp()
329            .and_then(Timestamp::try_step_forward)
330        {
331            df_desc.until = Antichain::from_elem(until);
332        }
333
334        // Construct TransformCtx for global optimization.
335        let mut transform_ctx = TransformCtx::global(
336            &df_builder,
337            &*stats,
338            &self.config.features,
339            &self.repr_typecheck_ctx,
340            &mut df_meta,
341            Some(&mut self.metrics),
342        );
343
344        // Let's already try creating a fast path plan. If successful, we don't need to run the
345        // whole optimizer pipeline, but just a tiny subset of it. (But we'll need to run
346        // `create_fast_path_plan` later again, because, e.g., running `LiteralConstraints` is still
347        // ahead of us.)
348        let use_fast_path_optimizer = match create_fast_path_plan(
349            &mut df_desc,
350            self.select_id,
351            Some(&self.finishing),
352            self.config.features.persist_fast_path_limit,
353            self.config.persist_fast_path_order,
354        ) {
355            Ok(maybe_fast_path_plan) => maybe_fast_path_plan.is_some(),
356            Err(OptimizerError::InternalUnsafeMfpPlan(_)) => {
357                // This is expected, in that `create_fast_path_plan` can choke on `mz_now`, which we
358                // haven't removed yet.
359                false
360            }
361            Err(e) => {
362                return Err(e);
363            }
364        };
365
366        // Run global optimization.
367        mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, use_fast_path_optimizer)?;
368
369        if self.config.mode == OptimizeMode::Explain {
370            // Collect the list of indexes used by the dataflow at this point.
371            trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
372        }
373
374        // TODO: use the following code once we can be sure that the
375        // index_exports always exist.
376        //
377        // let typ = self.df_desc
378        //     .index_exports
379        //     .first_key_value()
380        //     .map(|(_key, (_desc, typ))| typ.clone())
381        //     .expect("GlobalMirPlan type");
382
383        let peek_plan = match create_fast_path_plan(
384            &mut df_desc,
385            self.select_id,
386            Some(&self.finishing),
387            self.config.features.persist_fast_path_limit,
388            self.config.persist_fast_path_order,
389        )? {
390            Some(plan) if !self.config.no_fast_path => {
391                if self.config.mode == OptimizeMode::Explain {
392                    // Trace the `used_indexes` for the FastPathPlan.
393                    debug_span!(target: "optimizer", "fast_path").in_scope(|| {
394                        // Fast path plans come with an updated finishing.
395                        let finishing = if !self.finishing.is_trivial(typ.arity()) {
396                            Some(&self.finishing)
397                        } else {
398                            None
399                        };
400                        trace_plan(&plan.used_indexes(finishing));
401                    });
402                }
403                // Trace the FastPathPlan.
404                trace_plan!(at: "fast_path", &plan);
405
406                // Trace the pipeline output under `optimize`.
407                trace_plan(&plan);
408
409                // Build the PeekPlan
410                PeekPlan::FastPath(plan)
411            }
412            _ => {
413                soft_assert_or_log!(
414                    !use_fast_path_optimizer || self.config.no_fast_path,
415                    "The fast_path_optimizer shouldn't make a fast path plan slow path."
416                );
417
418                // Ensure all expressions are normalized before finalizing.
419                for build in df_desc.objects_to_build.iter_mut() {
420                    normalize_lets(&mut build.plan.0, &self.config.features)?
421                }
422
423                // Finalize the dataflow. This includes:
424                // - MIR ⇒ LIR lowering
425                // - LIR ⇒ LIR transforms
426                let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
427
428                // Trace the pipeline output under `optimize`.
429                trace_plan(&df_desc);
430
431                // Build the PeekPlan
432                PeekPlan::SlowPath(PeekDataflowPlan::new(df_desc, self.index_id(), &typ))
433            }
434        };
435
436        self.duration += time.elapsed();
437        let label = match &peek_plan {
438            PeekPlan::FastPath(_) => "peek:fast_path",
439            PeekPlan::SlowPath(_) => "peek:slow_path",
440        };
441        self.metrics
442            .observe_e2e_optimization_time(label, self.duration);
443
444        Ok(GlobalLirPlan {
445            peek_plan,
446            df_meta,
447            typ,
448        })
449    }
450}
451
452impl GlobalLirPlan {
453    /// Unwraps the parts of the final result of the optimization pipeline.
454    pub fn unapply(self) -> (PeekPlan, DataflowMetainfo, SqlRelationType) {
455        (self.peek_plan, self.df_meta, self.typ)
456    }
457}