Skip to main content

mz_adapter/optimize/
subscribe.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Optimizer implementation for `SUBSCRIBE` statements.
11
12use std::marker::PhantomData;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use differential_dataflow::lattice::Lattice;
17use mz_compute_types::ComputeInstanceId;
18use mz_compute_types::plan::Plan;
19use mz_compute_types::sinks::{ComputeSinkConnection, ComputeSinkDesc, SubscribeSinkConnection};
20use mz_ore::collections::CollectionExt;
21use mz_ore::soft_assert_or_log;
22use mz_repr::{GlobalId, Timestamp};
23use mz_sql::optimizer_metrics::OptimizerMetrics;
24use mz_sql::plan::SubscribeFrom;
25use mz_transform::TransformCtx;
26use mz_transform::dataflow::{DataflowMetainfo, optimize_dataflow_snapshot};
27use mz_transform::normalize_lets::normalize_lets;
28use mz_transform::reprtypecheck::{
29    SharedContext as ReprTypecheckContext, empty_context as empty_repr_context,
30};
31use timely::progress::Antichain;
32
33use crate::CollectionIdBundle;
34use crate::optimize::dataflows::{
35    ComputeInstanceSnapshot, DataflowBuilder, ExprPrep, ExprPrepMaintained,
36    dataflow_import_id_bundle,
37};
38use crate::optimize::{
39    LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerCatalog,
40    OptimizerConfig, OptimizerError, optimize_mir_local, trace_plan,
41};
42
43pub struct Optimizer {
44    /// A representation typechecking context to use throughout the optimizer pipeline.
45    repr_typecheck_ctx: ReprTypecheckContext,
46    /// A snapshot of the catalog state.
47    catalog: Arc<dyn OptimizerCatalog>,
48    /// A snapshot of the cluster that will run the dataflows.
49    compute_instance: ComputeInstanceSnapshot,
50    /// A transient GlobalId to be used for the exported sink.
51    sink_id: GlobalId,
52    /// A transient GlobalId to be used when constructing a dataflow for
53    /// `SUBSCRIBE FROM <SELECT>` variants.
54    view_id: GlobalId,
55    /// Should the plan produce an initial snapshot?
56    with_snapshot: bool,
57    /// Sink timestamp.
58    up_to: Option<Timestamp>,
59    /// A human-readable name exposed internally (useful for debugging).
60    debug_name: String,
61    /// Optimizer config.
62    config: OptimizerConfig,
63    /// Optimizer metrics.
64    metrics: OptimizerMetrics,
65    /// The time spent performing optimization so far.
66    duration: Duration,
67}
68
69// A bogey `Debug` implementation that hides fields. This is needed to make the
70// `event!` call in `sequence_peek_stage` not emit a lot of data.
71//
72// For now, we skip almost all fields, but we might revisit that bit if it turns
73// out that we really need those for debugging purposes.
74impl std::fmt::Debug for Optimizer {
75    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76        f.debug_struct("Optimizer")
77            .field("config", &self.config)
78            .finish_non_exhaustive()
79    }
80}
81
82impl Optimizer {
83    pub fn new(
84        catalog: Arc<dyn OptimizerCatalog>,
85        compute_instance: ComputeInstanceSnapshot,
86        view_id: GlobalId,
87        sink_id: GlobalId,
88        with_snapshot: bool,
89        up_to: Option<Timestamp>,
90        debug_name: String,
91        config: OptimizerConfig,
92        metrics: OptimizerMetrics,
93    ) -> Self {
94        Self {
95            repr_typecheck_ctx: empty_repr_context(),
96            catalog,
97            compute_instance,
98            view_id,
99            sink_id,
100            with_snapshot,
101            up_to,
102            debug_name,
103            config,
104            metrics,
105            duration: Default::default(),
106        }
107    }
108
109    pub fn cluster_id(&self) -> ComputeInstanceId {
110        self.compute_instance.instance_id()
111    }
112
113    pub fn up_to(&self) -> Option<Timestamp> {
114        self.up_to.clone()
115    }
116
117    pub fn sink_id(&self) -> GlobalId {
118        self.sink_id
119    }
120}
121
122/// The (sealed intermediate) result after:
123///
124/// 1. embedding a [`SubscribeFrom`] plan into a [`MirDataflowDescription`],
125/// 2. transitively inlining referenced views, and
126/// 3. jointly optimizing the `MIR` plans in the [`MirDataflowDescription`].
127#[derive(Clone, Debug)]
128pub struct GlobalMirPlan<T: Clone> {
129    df_desc: MirDataflowDescription,
130    df_meta: DataflowMetainfo,
131    phantom: PhantomData<T>,
132}
133
134impl<T: Clone> GlobalMirPlan<T> {
135    /// Computes the [`CollectionIdBundle`] of the wrapped dataflow.
136    pub fn id_bundle(&self, compute_instance_id: ComputeInstanceId) -> CollectionIdBundle {
137        dataflow_import_id_bundle(&self.df_desc, compute_instance_id)
138    }
139}
140
141/// The (final) result after MIR ⇒ LIR lowering and optimizing the resulting
142/// `DataflowDescription` with `LIR` plans.
143#[derive(Clone, Debug)]
144pub struct GlobalLirPlan {
145    df_desc: LirDataflowDescription,
146    df_meta: DataflowMetainfo,
147}
148
149impl GlobalLirPlan {
150    /// Returns the id of the dataflow's sink export.
151    ///
152    /// # Panics
153    ///
154    /// Panics if the dataflow has no sink exports or has more than one.
155    pub fn sink_id(&self) -> GlobalId {
156        self.df_desc.sink_id()
157    }
158
159    pub fn as_of(&self) -> Option<Timestamp> {
160        self.df_desc.as_of.clone().map(|as_of| as_of.into_element())
161    }
162
163    /// Returns the description of the dataflow's sink export.
164    ///
165    /// # Panics
166    ///
167    /// Panics if the dataflow has no sink exports or has more than one.
168    pub fn sink_desc(&self) -> &ComputeSinkDesc {
169        let sink_exports = &self.df_desc.sink_exports;
170        let sink_desc = sink_exports.values().into_element();
171        sink_desc
172    }
173}
174
175/// Marker type for [`GlobalMirPlan`] structs representing an optimization
176/// result without a resolved timestamp.
177#[derive(Clone, Debug)]
178pub struct Unresolved;
179
180/// Marker type for [`GlobalMirPlan`] structs representing an optimization
181/// result with a resolved timestamp.
182///
183/// The actual timestamp value is set in the [`MirDataflowDescription`] of the
184/// surrounding [`GlobalMirPlan`] when we call `resolve()`.
185#[derive(Clone, Debug)]
186pub struct Resolved;
187
188impl Optimize<SubscribeFrom> for Optimizer {
189    type To = GlobalMirPlan<Unresolved>;
190
191    fn optimize(&mut self, plan: SubscribeFrom) -> Result<Self::To, OptimizerError> {
192        let time = Instant::now();
193
194        let mut df_builder = {
195            let compute = self.compute_instance.clone();
196            DataflowBuilder::new(&*self.catalog, compute).with_config(&self.config)
197        };
198        let mut df_desc = MirDataflowDescription::new(self.debug_name.clone());
199        let mut df_meta = DataflowMetainfo::default();
200
201        match plan {
202            SubscribeFrom::Id(from_id) => {
203                let from = self.catalog.get_entry(&from_id);
204                let from_desc = from
205                    .relation_desc()
206                    .expect("subscribes can only be run on items with descs")
207                    .into_owned();
208
209                df_builder.import_into_dataflow(&from_id, &mut df_desc, &self.config.features)?;
210                df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
211
212                // Make SinkDesc
213                let subscribe_conn = SubscribeSinkConnection::default();
214                let sink_description = ComputeSinkDesc {
215                    from: from_id,
216                    from_desc,
217                    connection: ComputeSinkConnection::Subscribe(subscribe_conn),
218                    with_snapshot: self.with_snapshot,
219                    up_to: self.up_to.map(Antichain::from_elem).unwrap_or_default(),
220                    // No `FORCE NOT NULL` for subscribes
221                    non_null_assertions: vec![],
222                    // No `REFRESH` for subscribes
223                    refresh_schedule: None,
224                };
225                df_desc.export_sink(self.sink_id, sink_description);
226            }
227            SubscribeFrom::Query { expr, desc } => {
228                // TODO: Change the `expr` type to be `HirRelationExpr` and run
229                // HIR ⇒ MIR lowering and decorrelation here. This would allow
230                // us implement something like `EXPLAIN RAW PLAN FOR SUBSCRIBE.`
231                //
232                // let typ = expr.top_level_typ();
233                // let expr = expr.lower(&self.config)?;
234
235                // MIR ⇒ MIR optimization (local)
236                let mut transform_ctx = TransformCtx::local(
237                    &self.config.features,
238                    &self.repr_typecheck_ctx,
239                    &mut df_meta,
240                    Some(&mut self.metrics),
241                    Some(self.view_id),
242                );
243                let expr = optimize_mir_local(expr, &mut transform_ctx)?;
244
245                df_builder.import_view_into_dataflow(
246                    &self.view_id,
247                    &expr,
248                    &mut df_desc,
249                    &self.config.features,
250                )?;
251                df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
252
253                // Make SinkDesc
254                let subscribe_conn = SubscribeSinkConnection::default();
255                let sink_description = ComputeSinkDesc {
256                    from: self.view_id,
257                    from_desc: desc.clone(),
258                    connection: ComputeSinkConnection::Subscribe(subscribe_conn),
259                    with_snapshot: self.with_snapshot,
260                    up_to: self.up_to.map(Antichain::from_elem).unwrap_or_default(),
261                    // No `FORCE NOT NULL` for subscribes
262                    non_null_assertions: vec![],
263                    // No `REFRESH` for subscribes
264                    refresh_schedule: None,
265                };
266                df_desc.export_sink(self.sink_id, sink_description);
267            }
268        };
269
270        // Prepare expressions in the assembled dataflow.
271        let style = ExprPrepMaintained;
272        df_desc.visit_children(
273            |r| style.prep_relation_expr(r),
274            |s| style.prep_scalar_expr(s),
275        )?;
276
277        // Construct TransformCtx for global optimization.
278        let mut transform_ctx = TransformCtx::global(
279            &df_builder,
280            &mz_transform::EmptyStatisticsOracle, // TODO: wire proper stats
281            &self.config.features,
282            &self.repr_typecheck_ctx,
283            &mut df_meta,
284            Some(&mut self.metrics),
285        );
286        // Run global optimization.
287        mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
288
289        if self.config.mode == OptimizeMode::Explain {
290            // Collect the list of indexes used by the dataflow at this point.
291            trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
292        }
293
294        self.duration += time.elapsed();
295
296        // Return the (sealed) plan at the end of this optimization step.
297        Ok(GlobalMirPlan {
298            df_desc,
299            df_meta,
300            phantom: PhantomData::<Unresolved>,
301        })
302    }
303}
304
305impl GlobalMirPlan<Unresolved> {
306    /// Produces the [`GlobalMirPlan`] with [`Resolved`] timestamp.
307    ///
308    /// We need to resolve timestamps before the `GlobalMirPlan ⇒ GlobalLirPlan`
309    /// optimization stage in order to profit from possible single-time
310    /// optimizations in the `Plan::finalize_dataflow` call.
311    pub fn resolve(mut self, as_of: Antichain<Timestamp>) -> GlobalMirPlan<Resolved> {
312        // A dataflow description for a `SUBSCRIBE` statement should not have
313        // index exports.
314        soft_assert_or_log!(
315            self.df_desc.index_exports.is_empty(),
316            "unexpectedly setting until for a DataflowDescription with an index",
317        );
318
319        // Set the `as_of` timestamp for the dataflow.
320        self.df_desc.set_as_of(as_of);
321
322        // The only outputs of the dataflow are sinks, so we might be able to
323        // turn off the computation early, if they all have non-trivial
324        // `up_to`s.
325        self.df_desc.until = Antichain::from_elem(Timestamp::MIN);
326        for (_, sink) in &self.df_desc.sink_exports {
327            self.df_desc.until.join_assign(&sink.up_to);
328        }
329
330        GlobalMirPlan {
331            df_desc: self.df_desc,
332            df_meta: self.df_meta,
333            phantom: PhantomData::<Resolved>,
334        }
335    }
336}
337
338impl Optimize<GlobalMirPlan<Resolved>> for Optimizer {
339    type To = GlobalLirPlan;
340
341    fn optimize(&mut self, plan: GlobalMirPlan<Resolved>) -> Result<Self::To, OptimizerError> {
342        let time = Instant::now();
343
344        let GlobalMirPlan {
345            mut df_desc,
346            df_meta,
347            phantom: _,
348        } = plan;
349
350        // Ensure all expressions are normalized before finalizing.
351        for build in df_desc.objects_to_build.iter_mut() {
352            normalize_lets(&mut build.plan.0, &self.config.features)?
353        }
354
355        if self.config.subscribe_snapshot_optimization {
356            // Determine whether we can elide any snapshots for this subscribe.
357            optimize_dataflow_snapshot(&mut df_desc)?;
358        }
359
360        // Finalize the dataflow. This includes:
361        // - MIR ⇒ LIR lowering
362        // - LIR ⇒ LIR transforms
363        let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
364
365        self.duration += time.elapsed();
366        self.metrics
367            .observe_e2e_optimization_time("subscribe", self.duration);
368
369        // Return the plan at the end of this `optimize` step.
370        Ok(GlobalLirPlan { df_desc, df_meta })
371    }
372}
373
374impl GlobalLirPlan {
375    /// Unwraps the parts of the final result of the optimization pipeline.
376    pub fn unapply(self) -> (LirDataflowDescription, DataflowMetainfo) {
377        (self.df_desc, self.df_meta)
378    }
379}