Skip to main content

mz_adapter/optimize/
subscribe.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Optimizer implementation for `SUBSCRIBE` statements.
11
12use std::marker::PhantomData;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use differential_dataflow::lattice::Lattice;
17use mz_compute_types::ComputeInstanceId;
18use mz_compute_types::plan::Plan;
19use mz_compute_types::sinks::{ComputeSinkConnection, ComputeSinkDesc, SubscribeSinkConnection};
20use mz_ore::collections::CollectionExt;
21use mz_ore::soft_assert_or_log;
22use mz_repr::{GlobalId, Timestamp};
23use mz_sql::optimizer_metrics::OptimizerMetrics;
24use mz_sql::plan::SubscribeFrom;
25use mz_transform::TransformCtx;
26use mz_transform::dataflow::{DataflowMetainfo, optimize_dataflow_snapshot};
27use mz_transform::normalize_lets::normalize_lets;
28use mz_transform::typecheck::{SharedTypecheckingContext, empty_typechecking_context};
29use timely::progress::Antichain;
30
31use crate::CollectionIdBundle;
32use crate::optimize::dataflows::{
33    ComputeInstanceSnapshot, DataflowBuilder, ExprPrep, ExprPrepMaintained,
34    dataflow_import_id_bundle,
35};
36use crate::optimize::{
37    LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerCatalog,
38    OptimizerConfig, OptimizerError, optimize_mir_local, trace_plan,
39};
40
41pub struct Optimizer {
42    /// A representation typechecking context to use throughout the optimizer pipeline.
43    typecheck_ctx: SharedTypecheckingContext,
44    /// A snapshot of the catalog state.
45    catalog: Arc<dyn OptimizerCatalog>,
46    /// A snapshot of the cluster that will run the dataflows.
47    compute_instance: ComputeInstanceSnapshot,
48    /// A transient GlobalId to be used for the exported sink.
49    sink_id: GlobalId,
50    /// A transient GlobalId to be used when constructing a dataflow for
51    /// `SUBSCRIBE FROM <SELECT>` variants.
52    view_id: GlobalId,
53    /// Should the plan produce an initial snapshot?
54    with_snapshot: bool,
55    /// Sink timestamp.
56    up_to: Option<Timestamp>,
57    /// A human-readable name exposed internally (useful for debugging).
58    debug_name: String,
59    /// Optimizer config.
60    config: OptimizerConfig,
61    /// Optimizer metrics.
62    metrics: OptimizerMetrics,
63    /// The time spent performing optimization so far.
64    duration: Duration,
65}
66
67// A bogey `Debug` implementation that hides fields. This is needed to make the
68// `event!` call in `sequence_peek_stage` not emit a lot of data.
69//
70// For now, we skip almost all fields, but we might revisit that bit if it turns
71// out that we really need those for debugging purposes.
72impl std::fmt::Debug for Optimizer {
73    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74        f.debug_struct("Optimizer")
75            .field("config", &self.config)
76            .finish_non_exhaustive()
77    }
78}
79
80impl Optimizer {
81    pub fn new(
82        catalog: Arc<dyn OptimizerCatalog>,
83        compute_instance: ComputeInstanceSnapshot,
84        view_id: GlobalId,
85        sink_id: GlobalId,
86        with_snapshot: bool,
87        up_to: Option<Timestamp>,
88        debug_name: String,
89        config: OptimizerConfig,
90        metrics: OptimizerMetrics,
91    ) -> Self {
92        Self {
93            typecheck_ctx: empty_typechecking_context(),
94            catalog,
95            compute_instance,
96            view_id,
97            sink_id,
98            with_snapshot,
99            up_to,
100            debug_name,
101            config,
102            metrics,
103            duration: Default::default(),
104        }
105    }
106
107    pub fn cluster_id(&self) -> ComputeInstanceId {
108        self.compute_instance.instance_id()
109    }
110
111    pub fn up_to(&self) -> Option<Timestamp> {
112        self.up_to.clone()
113    }
114
115    pub fn sink_id(&self) -> GlobalId {
116        self.sink_id
117    }
118}
119
120/// The (sealed intermediate) result after:
121///
122/// 1. embedding a [`SubscribeFrom`] plan into a [`MirDataflowDescription`],
123/// 2. transitively inlining referenced views, and
124/// 3. jointly optimizing the `MIR` plans in the [`MirDataflowDescription`].
125#[derive(Clone, Debug)]
126pub struct GlobalMirPlan<T: Clone> {
127    df_desc: MirDataflowDescription,
128    df_meta: DataflowMetainfo,
129    phantom: PhantomData<T>,
130}
131
132impl<T: Clone> GlobalMirPlan<T> {
133    /// Computes the [`CollectionIdBundle`] of the wrapped dataflow.
134    pub fn id_bundle(&self, compute_instance_id: ComputeInstanceId) -> CollectionIdBundle {
135        dataflow_import_id_bundle(&self.df_desc, compute_instance_id)
136    }
137}
138
139/// The (final) result after MIR ⇒ LIR lowering and optimizing the resulting
140/// `DataflowDescription` with `LIR` plans.
141#[derive(Clone, Debug)]
142pub struct GlobalLirPlan {
143    df_desc: LirDataflowDescription,
144    df_meta: DataflowMetainfo,
145}
146
147impl GlobalLirPlan {
148    /// Returns the id of the dataflow's sink export.
149    ///
150    /// # Panics
151    ///
152    /// Panics if the dataflow has no sink exports or has more than one.
153    pub fn sink_id(&self) -> GlobalId {
154        self.df_desc.sink_id()
155    }
156
157    pub fn as_of(&self) -> Option<Timestamp> {
158        self.df_desc.as_of.clone().map(|as_of| as_of.into_element())
159    }
160
161    /// Returns the description of the dataflow's sink export.
162    ///
163    /// # Panics
164    ///
165    /// Panics if the dataflow has no sink exports or has more than one.
166    pub fn sink_desc(&self) -> &ComputeSinkDesc {
167        let sink_exports = &self.df_desc.sink_exports;
168        let sink_desc = sink_exports.values().into_element();
169        sink_desc
170    }
171}
172
173/// Marker type for [`GlobalMirPlan`] structs representing an optimization
174/// result without a resolved timestamp.
175#[derive(Clone, Debug)]
176pub struct Unresolved;
177
178/// Marker type for [`GlobalMirPlan`] structs representing an optimization
179/// result with a resolved timestamp.
180///
181/// The actual timestamp value is set in the [`MirDataflowDescription`] of the
182/// surrounding [`GlobalMirPlan`] when we call `resolve()`.
183#[derive(Clone, Debug)]
184pub struct Resolved;
185
186impl Optimize<SubscribeFrom> for Optimizer {
187    type To = GlobalMirPlan<Unresolved>;
188
189    fn optimize(&mut self, plan: SubscribeFrom) -> Result<Self::To, OptimizerError> {
190        let time = Instant::now();
191
192        let mut df_builder = {
193            let compute = self.compute_instance.clone();
194            DataflowBuilder::new(&*self.catalog, compute).with_config(&self.config)
195        };
196        let mut df_desc = MirDataflowDescription::new(self.debug_name.clone());
197        let mut df_meta = DataflowMetainfo::default();
198
199        match plan {
200            SubscribeFrom::Id(from_id) => {
201                let from = self.catalog.get_entry(&from_id);
202                let from_desc = from
203                    .relation_desc()
204                    .expect("subscribes can only be run on items with descs")
205                    .into_owned();
206
207                df_builder.import_into_dataflow(&from_id, &mut df_desc, &self.config.features)?;
208                df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
209
210                // Make SinkDesc
211                let subscribe_conn = SubscribeSinkConnection::default();
212                let sink_description = ComputeSinkDesc {
213                    from: from_id,
214                    from_desc,
215                    connection: ComputeSinkConnection::Subscribe(subscribe_conn),
216                    with_snapshot: self.with_snapshot,
217                    up_to: self.up_to.map(Antichain::from_elem).unwrap_or_default(),
218                    // No `FORCE NOT NULL` for subscribes
219                    non_null_assertions: vec![],
220                    // No `REFRESH` for subscribes
221                    refresh_schedule: None,
222                };
223                df_desc.export_sink(self.sink_id, sink_description);
224            }
225            SubscribeFrom::Query { expr, desc } => {
226                // TODO: Change the `expr` type to be `HirRelationExpr` and run
227                // HIR ⇒ MIR lowering and decorrelation here. This would allow
228                // us implement something like `EXPLAIN RAW PLAN FOR SUBSCRIBE.`
229                //
230                // let typ = expr.top_level_typ();
231                // let expr = expr.lower(&self.config)?;
232
233                // MIR ⇒ MIR optimization (local)
234                let mut transform_ctx = TransformCtx::local(
235                    &self.config.features,
236                    &self.typecheck_ctx,
237                    &mut df_meta,
238                    Some(&mut self.metrics),
239                    Some(self.view_id),
240                );
241                let expr = optimize_mir_local(expr, &mut transform_ctx)?;
242
243                df_builder.import_view_into_dataflow(
244                    &self.view_id,
245                    &expr,
246                    &mut df_desc,
247                    &self.config.features,
248                )?;
249                df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
250
251                // Make SinkDesc
252                let subscribe_conn = SubscribeSinkConnection::default();
253                let sink_description = ComputeSinkDesc {
254                    from: self.view_id,
255                    from_desc: desc.clone(),
256                    connection: ComputeSinkConnection::Subscribe(subscribe_conn),
257                    with_snapshot: self.with_snapshot,
258                    up_to: self.up_to.map(Antichain::from_elem).unwrap_or_default(),
259                    // No `FORCE NOT NULL` for subscribes
260                    non_null_assertions: vec![],
261                    // No `REFRESH` for subscribes
262                    refresh_schedule: None,
263                };
264                df_desc.export_sink(self.sink_id, sink_description);
265            }
266        };
267
268        // Prepare expressions in the assembled dataflow.
269        let style = ExprPrepMaintained;
270        df_desc.visit_children(
271            |r| style.prep_relation_expr(r),
272            |s| style.prep_scalar_expr(s),
273        )?;
274
275        // Construct TransformCtx for global optimization.
276        let mut transform_ctx = TransformCtx::global(
277            &df_builder,
278            &mz_transform::EmptyStatisticsOracle, // TODO: wire proper stats
279            &self.config.features,
280            &self.typecheck_ctx,
281            &mut df_meta,
282            Some(&mut self.metrics),
283        );
284        // Run global optimization.
285        mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
286
287        if self.config.mode == OptimizeMode::Explain {
288            // Collect the list of indexes used by the dataflow at this point.
289            trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
290        }
291
292        self.duration += time.elapsed();
293
294        // Return the (sealed) plan at the end of this optimization step.
295        Ok(GlobalMirPlan {
296            df_desc,
297            df_meta,
298            phantom: PhantomData::<Unresolved>,
299        })
300    }
301}
302
303impl GlobalMirPlan<Unresolved> {
304    /// Produces the [`GlobalMirPlan`] with [`Resolved`] timestamp.
305    ///
306    /// We need to resolve timestamps before the `GlobalMirPlan ⇒ GlobalLirPlan`
307    /// optimization stage in order to profit from possible single-time
308    /// optimizations in the `Plan::finalize_dataflow` call.
309    pub fn resolve(mut self, as_of: Antichain<Timestamp>) -> GlobalMirPlan<Resolved> {
310        // A dataflow description for a `SUBSCRIBE` statement should not have
311        // index exports.
312        soft_assert_or_log!(
313            self.df_desc.index_exports.is_empty(),
314            "unexpectedly setting until for a DataflowDescription with an index",
315        );
316
317        // Set the `as_of` timestamp for the dataflow.
318        self.df_desc.set_as_of(as_of);
319
320        // The only outputs of the dataflow are sinks, so we might be able to
321        // turn off the computation early, if they all have non-trivial
322        // `up_to`s.
323        self.df_desc.until = Antichain::from_elem(Timestamp::MIN);
324        for (_, sink) in &self.df_desc.sink_exports {
325            self.df_desc.until.join_assign(&sink.up_to);
326        }
327
328        GlobalMirPlan {
329            df_desc: self.df_desc,
330            df_meta: self.df_meta,
331            phantom: PhantomData::<Resolved>,
332        }
333    }
334}
335
336impl Optimize<GlobalMirPlan<Resolved>> for Optimizer {
337    type To = GlobalLirPlan;
338
339    fn optimize(&mut self, plan: GlobalMirPlan<Resolved>) -> Result<Self::To, OptimizerError> {
340        let time = Instant::now();
341
342        let GlobalMirPlan {
343            mut df_desc,
344            df_meta,
345            phantom: _,
346        } = plan;
347
348        // Ensure all expressions are normalized before finalizing.
349        for build in df_desc.objects_to_build.iter_mut() {
350            normalize_lets(&mut build.plan.0, &self.config.features)?
351        }
352
353        if self.config.subscribe_snapshot_optimization {
354            // Determine whether we can elide any snapshots for this subscribe.
355            optimize_dataflow_snapshot(&mut df_desc)?;
356        }
357
358        // Finalize the dataflow. This includes:
359        // - MIR ⇒ LIR lowering
360        // - LIR ⇒ LIR transforms
361        let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
362
363        self.duration += time.elapsed();
364        self.metrics
365            .observe_e2e_optimization_time("subscribe", self.duration);
366
367        // Return the plan at the end of this `optimize` step.
368        Ok(GlobalLirPlan { df_desc, df_meta })
369    }
370}
371
372impl GlobalLirPlan {
373    /// Unwraps the parts of the final result of the optimization pipeline.
374    pub fn unapply(self) -> (LirDataflowDescription, DataflowMetainfo) {
375        (self.df_desc, self.df_meta)
376    }
377}