mz_adapter/optimize/
peek.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Optimizer implementation for `SELECT` statements.
11
12use std::fmt::Debug;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use mz_compute_types::ComputeInstanceId;
17use mz_compute_types::dataflows::IndexDesc;
18use mz_compute_types::plan::Plan;
19use mz_expr::{MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr, RowSetFinishing};
20use mz_ore::soft_assert_or_log;
21use mz_repr::explain::trace_plan;
22use mz_repr::{GlobalId, SqlRelationType, Timestamp};
23use mz_sql::optimizer_metrics::OptimizerMetrics;
24use mz_sql::plan::HirRelationExpr;
25use mz_sql::session::metadata::SessionMetadata;
26use mz_transform::dataflow::DataflowMetainfo;
27use mz_transform::normalize_lets::normalize_lets;
28use mz_transform::reprtypecheck::{
29    SharedContext as ReprTypecheckContext, empty_context as empty_repr_context,
30};
31use mz_transform::typecheck::{SharedContext as TypecheckContext, empty_context};
32use mz_transform::{StatisticsOracle, TransformCtx};
33use timely::progress::Antichain;
34use tracing::debug_span;
35
36use crate::TimestampContext;
37use crate::catalog::Catalog;
38use crate::coord::peek::{PeekDataflowPlan, PeekPlan, create_fast_path_plan};
39use crate::optimize::dataflows::{
40    ComputeInstanceSnapshot, DataflowBuilder, EvalTime, ExprPrepStyle, prep_relation_expr,
41    prep_scalar_expr,
42};
43use crate::optimize::{
44    MirDataflowDescription, Optimize, OptimizeMode, OptimizerConfig, OptimizerError,
45    optimize_mir_local, trace_plan,
46};
47
48pub struct Optimizer {
49    /// A typechecking context to use throughout the optimizer pipeline.
50    typecheck_ctx: TypecheckContext,
51    /// A representation typechecking context to use throughout the optimizer pipeline.
52    repr_typecheck_ctx: ReprTypecheckContext,
53    /// A snapshot of the catalog state.
54    catalog: Arc<Catalog>,
55    /// A snapshot of the cluster that will run the dataflows.
56    compute_instance: ComputeInstanceSnapshot,
57    /// Optional row-set finishing to be applied to the final result.
58    finishing: RowSetFinishing,
59    /// A transient GlobalId to be used when constructing the dataflow.
60    select_id: GlobalId,
61    /// A transient GlobalId to be used when constructing a PeekPlan.
62    index_id: GlobalId,
63    /// Optimizer config.
64    config: OptimizerConfig,
65    /// Optimizer metrics.
66    metrics: OptimizerMetrics,
67    /// The time spent performing optimization so far.
68    duration: Duration,
69}
70
71impl Optimizer {
72    pub fn new(
73        catalog: Arc<Catalog>,
74        compute_instance: ComputeInstanceSnapshot,
75        finishing: RowSetFinishing,
76        select_id: GlobalId,
77        index_id: GlobalId,
78        config: OptimizerConfig,
79        metrics: OptimizerMetrics,
80    ) -> Self {
81        Self {
82            typecheck_ctx: empty_context(),
83            repr_typecheck_ctx: empty_repr_context(),
84            catalog,
85            compute_instance,
86            finishing,
87            select_id,
88            index_id,
89            config,
90            metrics,
91            duration: Default::default(),
92        }
93    }
94
95    pub fn cluster_id(&self) -> ComputeInstanceId {
96        self.compute_instance.instance_id()
97    }
98
99    pub fn finishing(&self) -> &RowSetFinishing {
100        &self.finishing
101    }
102
103    pub fn select_id(&self) -> GlobalId {
104        self.select_id
105    }
106
107    pub fn index_id(&self) -> GlobalId {
108        self.index_id
109    }
110
111    pub fn config(&self) -> &OptimizerConfig {
112        &self.config
113    }
114
115    pub fn metrics(&self) -> &OptimizerMetrics {
116        &self.metrics
117    }
118
119    pub fn duration(&self) -> Duration {
120        self.duration
121    }
122}
123
124// A bogey `Debug` implementation that hides fields. This is needed to make the
125// `event!` call in `sequence_peek_stage` not emit a lot of data.
126//
127// For now, we skip almost all fields, but we might revisit that bit if it turns
128// out that we really need those for debugging purposes.
129impl Debug for Optimizer {
130    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131        f.debug_struct("OptimizePeek")
132            .field("config", &self.config)
133            .finish_non_exhaustive()
134    }
135}
136
137/// Marker type for [`LocalMirPlan`] representing an optimization result without
138/// context.
139pub struct Unresolved;
140
141/// The (sealed intermediate) result after HIR ⇒ MIR lowering and decorrelation
142/// and local MIR optimization.
143#[derive(Clone)]
144pub struct LocalMirPlan<T = Unresolved> {
145    expr: MirRelationExpr,
146    df_meta: DataflowMetainfo,
147    context: T,
148}
149
150/// Marker type for [`LocalMirPlan`] structs representing an optimization result
151/// with attached environment context required for the next optimization stage.
152pub struct Resolved<'s> {
153    timestamp_ctx: TimestampContext<Timestamp>,
154    stats: Box<dyn StatisticsOracle>,
155    session: &'s dyn SessionMetadata,
156}
157
158/// The (final) result after
159///
160/// 1. embedding a [`LocalMirPlan`] into a `DataflowDescription`,
161/// 2. transitively inlining referenced views,
162/// 3. timestamp resolution,
163/// 4. optimizing the resulting `DataflowDescription` with `MIR` plans.
164/// 5. MIR ⇒ LIR lowering, and
165/// 6. optimizing the resulting `DataflowDescription` with `LIR` plans.
166#[derive(Debug)]
167pub struct GlobalLirPlan {
168    peek_plan: PeekPlan,
169    df_meta: DataflowMetainfo,
170    typ: SqlRelationType,
171}
172
173impl Optimize<HirRelationExpr> for Optimizer {
174    type To = LocalMirPlan;
175
176    fn optimize(&mut self, expr: HirRelationExpr) -> Result<Self::To, OptimizerError> {
177        let time = Instant::now();
178
179        // Trace the pipeline input under `optimize/raw`.
180        trace_plan!(at: "raw", &expr);
181
182        // HIR ⇒ MIR lowering and decorrelation
183        let expr = expr.lower(&self.config, Some(&self.metrics))?;
184
185        // MIR ⇒ MIR optimization (local)
186        let mut df_meta = DataflowMetainfo::default();
187        let mut transform_ctx = TransformCtx::local(
188            &self.config.features,
189            &self.typecheck_ctx,
190            &self.repr_typecheck_ctx,
191            &mut df_meta,
192            Some(&self.metrics),
193            Some(self.select_id),
194        );
195        let expr = optimize_mir_local(expr, &mut transform_ctx)?.into_inner();
196
197        self.duration += time.elapsed();
198
199        // Return the (sealed) plan at the end of this optimization step.
200        Ok(LocalMirPlan {
201            expr,
202            df_meta,
203            context: Unresolved,
204        })
205    }
206}
207
208impl LocalMirPlan<Unresolved> {
209    /// Produces the [`LocalMirPlan`] with [`Resolved`] contextual information
210    /// required for the next stage.
211    pub fn resolve(
212        self,
213        timestamp_ctx: TimestampContext<Timestamp>,
214        session: &dyn SessionMetadata,
215        stats: Box<dyn StatisticsOracle>,
216    ) -> LocalMirPlan<Resolved<'_>> {
217        LocalMirPlan {
218            expr: self.expr,
219            df_meta: self.df_meta,
220            context: Resolved {
221                timestamp_ctx,
222                session,
223                stats,
224            },
225        }
226    }
227}
228
229impl<'s> Optimize<LocalMirPlan<Resolved<'s>>> for Optimizer {
230    type To = GlobalLirPlan;
231
232    fn optimize(&mut self, plan: LocalMirPlan<Resolved<'s>>) -> Result<Self::To, OptimizerError> {
233        let time = Instant::now();
234
235        let LocalMirPlan {
236            expr,
237            mut df_meta,
238            context:
239                Resolved {
240                    timestamp_ctx,
241                    stats,
242                    session,
243                },
244        } = plan;
245
246        let expr = OptimizedMirRelationExpr(expr);
247
248        // We create a dataflow and optimize it, to determine if we can avoid building it.
249        // This can happen if the result optimizes to a constant, or to a `Get` expression
250        // around a maintained arrangement.
251        let typ = expr.typ();
252        let key = typ
253            .default_key()
254            .iter()
255            .map(|k| MirScalarExpr::column(*k))
256            .collect();
257
258        // The assembled dataflow contains a view and an index of that view.
259        let mut df_builder = {
260            let catalog = self.catalog.state();
261            let compute = self.compute_instance.clone();
262            DataflowBuilder::new(catalog, compute).with_config(&self.config)
263        };
264
265        let debug_name = format!("oneshot-select-{}", self.select_id);
266        let mut df_desc = MirDataflowDescription::new(debug_name.to_string());
267
268        df_builder.import_view_into_dataflow(
269            &self.select_id,
270            &expr,
271            &mut df_desc,
272            &self.config.features,
273        )?;
274        df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
275
276        // Resolve all unmaterializable function calls except mz_now(), because
277        // we don't yet have a timestamp.
278        let style = ExprPrepStyle::OneShot {
279            logical_time: EvalTime::Deferred,
280            session,
281            catalog_state: self.catalog.state(),
282        };
283        df_desc.visit_children(
284            |r| prep_relation_expr(r, style),
285            |s| prep_scalar_expr(s, style),
286        )?;
287
288        // TODO: Instead of conditioning here we should really
289        // reconsider how to render multi-plan peek dataflows. The main
290        // difficulty here is rendering the optional finishing bit.
291        if self.config.mode != OptimizeMode::Explain {
292            df_desc.export_index(
293                self.index_id,
294                IndexDesc {
295                    on_id: self.select_id,
296                    key,
297                },
298                typ.clone(),
299            );
300        }
301
302        // Set the `as_of` and `until` timestamps for the dataflow.
303        df_desc.set_as_of(timestamp_ctx.antichain());
304
305        // Get the single timestamp representing the `as_of` time.
306        let as_of = df_desc
307            .as_of
308            .clone()
309            .expect("as_of antichain")
310            .into_option()
311            .expect("unique as_of element");
312
313        // Resolve all unmaterializable function calls including mz_now().
314        let style = ExprPrepStyle::OneShot {
315            logical_time: EvalTime::Time(as_of),
316            session,
317            catalog_state: self.catalog.state(),
318        };
319        df_desc.visit_children(
320            |r| prep_relation_expr(r, style),
321            |s| prep_scalar_expr(s, style),
322        )?;
323
324        // Use the opportunity to name an `until` frontier that will prevent
325        // work we needn't perform. By default, `until` will be
326        // `Antichain::new()`, which prevents no updates and is safe.
327        //
328        // If `timestamp_ctx.antichain()` is empty, `timestamp_ctx.timestamp()`
329        // will return `None` and we use the default (empty) `until`. Otherwise,
330        // we expect to be able to set `until = as_of + 1` without an overflow, unless
331        // we query at the maximum timestamp. In this case, the default empty `until`
332        // is the correct choice.
333        if let Some(until) = timestamp_ctx
334            .timestamp()
335            .and_then(Timestamp::try_step_forward)
336        {
337            df_desc.until = Antichain::from_elem(until);
338        }
339
340        // Construct TransformCtx for global optimization.
341        let mut transform_ctx = TransformCtx::global(
342            &df_builder,
343            &*stats,
344            &self.config.features,
345            &self.typecheck_ctx,
346            &self.repr_typecheck_ctx,
347            &mut df_meta,
348            Some(&self.metrics),
349        );
350
351        // Let's already try creating a fast path plan. If successful, we don't need to run the
352        // whole optimizer pipeline, but just a tiny subset of it. (But we'll need to run
353        // `create_fast_path_plan` later again, because, e.g., running `LiteralConstraints` is still
354        // ahead of us.)
355        let use_fast_path_optimizer = match create_fast_path_plan(
356            &mut df_desc,
357            self.select_id,
358            Some(&self.finishing),
359            self.config.features.persist_fast_path_limit,
360            self.config.persist_fast_path_order,
361        ) {
362            Ok(maybe_fast_path_plan) => maybe_fast_path_plan.is_some(),
363            Err(OptimizerError::InternalUnsafeMfpPlan(_)) => {
364                // This is expected, in that `create_fast_path_plan` can choke on `mz_now`, which we
365                // haven't removed yet.
366                false
367            }
368            Err(e) => {
369                return Err(e);
370            }
371        };
372
373        // Run global optimization.
374        mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, use_fast_path_optimizer)?;
375
376        if self.config.mode == OptimizeMode::Explain {
377            // Collect the list of indexes used by the dataflow at this point.
378            trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
379        }
380
381        // TODO: use the following code once we can be sure that the
382        // index_exports always exist.
383        //
384        // let typ = self.df_desc
385        //     .index_exports
386        //     .first_key_value()
387        //     .map(|(_key, (_desc, typ))| typ.clone())
388        //     .expect("GlobalMirPlan type");
389
390        let peek_plan = match create_fast_path_plan(
391            &mut df_desc,
392            self.select_id,
393            Some(&self.finishing),
394            self.config.features.persist_fast_path_limit,
395            self.config.persist_fast_path_order,
396        )? {
397            Some(plan) if !self.config.no_fast_path => {
398                if self.config.mode == OptimizeMode::Explain {
399                    // Trace the `used_indexes` for the FastPathPlan.
400                    debug_span!(target: "optimizer", "fast_path").in_scope(|| {
401                        // Fast path plans come with an updated finishing.
402                        let finishing = if !self.finishing.is_trivial(typ.arity()) {
403                            Some(&self.finishing)
404                        } else {
405                            None
406                        };
407                        trace_plan(&plan.used_indexes(finishing));
408                    });
409                }
410                // Trace the FastPathPlan.
411                trace_plan!(at: "fast_path", &plan);
412
413                // Trace the pipeline output under `optimize`.
414                trace_plan(&plan);
415
416                // Build the PeekPlan
417                PeekPlan::FastPath(plan)
418            }
419            _ => {
420                soft_assert_or_log!(
421                    !use_fast_path_optimizer || self.config.no_fast_path,
422                    "The fast_path_optimizer shouldn't make a fast path plan slow path."
423                );
424
425                // Ensure all expressions are normalized before finalizing.
426                for build in df_desc.objects_to_build.iter_mut() {
427                    normalize_lets(&mut build.plan.0, &self.config.features)?
428                }
429
430                // Finalize the dataflow. This includes:
431                // - MIR ⇒ LIR lowering
432                // - LIR ⇒ LIR transforms
433                let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
434
435                // Trace the pipeline output under `optimize`.
436                trace_plan(&df_desc);
437
438                // Build the PeekPlan
439                PeekPlan::SlowPath(PeekDataflowPlan::new(df_desc, self.index_id(), &typ))
440            }
441        };
442
443        self.duration += time.elapsed();
444        let label = match &peek_plan {
445            PeekPlan::FastPath(_) => "peek:fast_path",
446            PeekPlan::SlowPath(_) => "peek:slow_path",
447        };
448        self.metrics
449            .observe_e2e_optimization_time(label, self.duration);
450
451        Ok(GlobalLirPlan {
452            peek_plan,
453            df_meta,
454            typ,
455        })
456    }
457}
458
459impl GlobalLirPlan {
460    /// Unwraps the parts of the final result of the optimization pipeline.
461    pub fn unapply(self) -> (PeekPlan, DataflowMetainfo, SqlRelationType) {
462        (self.peek_plan, self.df_meta, self.typ)
463    }
464}