Skip to main content

mz_adapter/optimize/
subscribe.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Optimizer implementation for `SUBSCRIBE` statements.
11
12use std::marker::PhantomData;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use differential_dataflow::lattice::Lattice;
17use mz_compute_types::ComputeInstanceId;
18use mz_compute_types::plan::Plan;
19use mz_compute_types::sinks::{ComputeSinkConnection, ComputeSinkDesc, SubscribeSinkConnection};
20use mz_ore::soft_assert_or_log;
21use mz_repr::{GlobalId, Timestamp};
22use mz_sql::optimizer_metrics::OptimizerMetrics;
23use mz_sql::plan::{HirToMirConfig, SubscribeFrom, SubscribePlan};
24use mz_transform::TransformCtx;
25use mz_transform::dataflow::{DataflowMetainfo, optimize_dataflow_snapshot};
26use mz_transform::normalize_lets::normalize_lets;
27use mz_transform::typecheck::{SharedTypecheckingContext, empty_typechecking_context};
28use timely::progress::Antichain;
29
30use crate::CollectionIdBundle;
31use crate::optimize::dataflows::{
32    ComputeInstanceSnapshot, DataflowBuilder, ExprPrep, ExprPrepMaintained,
33    dataflow_import_id_bundle,
34};
35use crate::optimize::{
36    LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerCatalog,
37    OptimizerConfig, OptimizerError, optimize_mir_local, trace_plan,
38};
39
40pub struct Optimizer {
41    /// A representation typechecking context to use throughout the optimizer pipeline.
42    typecheck_ctx: SharedTypecheckingContext,
43    /// A snapshot of the catalog state.
44    catalog: Arc<dyn OptimizerCatalog>,
45    /// A snapshot of the cluster that will run the dataflows.
46    compute_instance: ComputeInstanceSnapshot,
47    /// A transient GlobalId to be used for the exported sink.
48    sink_id: GlobalId,
49    /// A transient GlobalId to be used when constructing a dataflow for
50    /// `SUBSCRIBE FROM <SELECT>` variants.
51    view_id: GlobalId,
52    /// Should the plan produce an initial snapshot?
53    with_snapshot: bool,
54    /// Sink timestamp.
55    up_to: Option<Timestamp>,
56    /// A human-readable name exposed internally (useful for debugging).
57    debug_name: String,
58    /// Optimizer config.
59    config: OptimizerConfig,
60    /// Optimizer metrics.
61    metrics: OptimizerMetrics,
62    /// The time spent performing optimization so far.
63    duration: Duration,
64}
65
66// A bogey `Debug` implementation that hides fields. This is needed to make the
67// `event!` call in `sequence_peek_stage` not emit a lot of data.
68//
69// For now, we skip almost all fields, but we might revisit that bit if it turns
70// out that we really need those for debugging purposes.
71impl std::fmt::Debug for Optimizer {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        f.debug_struct("Optimizer")
74            .field("config", &self.config)
75            .finish_non_exhaustive()
76    }
77}
78
79impl Optimizer {
80    pub fn new(
81        catalog: Arc<dyn OptimizerCatalog>,
82        compute_instance: ComputeInstanceSnapshot,
83        view_id: GlobalId,
84        sink_id: GlobalId,
85        with_snapshot: bool,
86        up_to: Option<Timestamp>,
87        debug_name: String,
88        config: OptimizerConfig,
89        metrics: OptimizerMetrics,
90    ) -> Self {
91        Self {
92            typecheck_ctx: empty_typechecking_context(),
93            catalog,
94            compute_instance,
95            view_id,
96            sink_id,
97            with_snapshot,
98            up_to,
99            debug_name,
100            config,
101            metrics,
102            duration: Default::default(),
103        }
104    }
105
106    pub fn cluster_id(&self) -> ComputeInstanceId {
107        self.compute_instance.instance_id()
108    }
109
110    pub fn up_to(&self) -> Option<Timestamp> {
111        self.up_to.clone()
112    }
113
114    pub fn sink_id(&self) -> GlobalId {
115        self.sink_id
116    }
117}
118
119/// The (sealed intermediate) result after:
120///
121/// 1. embedding a [`SubscribeFrom`] plan into a [`MirDataflowDescription`],
122/// 2. transitively inlining referenced views, and
123/// 3. jointly optimizing the `MIR` plans in the [`MirDataflowDescription`].
124#[derive(Clone, Debug)]
125pub struct GlobalMirPlan<T: Clone> {
126    df_desc: MirDataflowDescription,
127    df_meta: DataflowMetainfo,
128    phantom: PhantomData<T>,
129}
130
131impl<T: Clone> GlobalMirPlan<T> {
132    /// Computes the [`CollectionIdBundle`] of the wrapped dataflow.
133    pub fn id_bundle(&self, compute_instance_id: ComputeInstanceId) -> CollectionIdBundle {
134        dataflow_import_id_bundle(&self.df_desc, compute_instance_id)
135    }
136}
137
138/// The (final) result after MIR ⇒ LIR lowering and optimizing the resulting
139/// `DataflowDescription` with `LIR` plans.
140#[derive(Clone, Debug)]
141pub struct GlobalLirPlan {
142    df_desc: LirDataflowDescription,
143    df_meta: DataflowMetainfo,
144}
145
146impl GlobalLirPlan {
147    /// Returns the id of the dataflow's sink export.
148    ///
149    /// # Panics
150    ///
151    /// Panics if the dataflow has no sink exports or has more than one.
152    pub fn sink_id(&self) -> GlobalId {
153        self.df_desc.sink_id()
154    }
155}
156
157/// Marker type for [`GlobalMirPlan`] structs representing an optimization
158/// result without a resolved timestamp.
159#[derive(Clone, Debug)]
160pub struct Unresolved;
161
162/// Marker type for [`GlobalMirPlan`] structs representing an optimization
163/// result with a resolved timestamp.
164///
165/// The actual timestamp value is set in the [`MirDataflowDescription`] of the
166/// surrounding [`GlobalMirPlan`] when we call `resolve()`.
167#[derive(Clone, Debug)]
168pub struct Resolved;
169
170impl Optimize<SubscribePlan> for Optimizer {
171    type To = GlobalMirPlan<Unresolved>;
172
173    fn optimize(&mut self, plan: SubscribePlan) -> Result<Self::To, OptimizerError> {
174        let output = plan.output;
175        let plan = plan.from;
176        let time = Instant::now();
177
178        let mut df_builder = {
179            let compute = self.compute_instance.clone();
180            DataflowBuilder::new(&*self.catalog, compute).with_config(&self.config)
181        };
182        let mut df_desc = MirDataflowDescription::new(self.debug_name.clone());
183        let mut df_meta = DataflowMetainfo::default();
184
185        match plan {
186            SubscribeFrom::Id(from_id) => {
187                let from = self.catalog.get_entry(&from_id);
188                let from_desc = from
189                    .relation_desc()
190                    .expect("subscribes can only be run on items with descs")
191                    .into_owned();
192
193                df_builder.import_into_dataflow(&from_id, &mut df_desc, &self.config.features)?;
194                df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
195
196                // Make SinkDesc
197                let sink_description = ComputeSinkDesc {
198                    from: from_id,
199                    from_desc,
200                    connection: ComputeSinkConnection::Subscribe(SubscribeSinkConnection {
201                        output: output.row_order().to_vec(),
202                    }),
203                    with_snapshot: self.with_snapshot,
204                    up_to: self.up_to.map(Antichain::from_elem).unwrap_or_default(),
205                    // No `FORCE NOT NULL` for subscribes
206                    non_null_assertions: vec![],
207                    // No `REFRESH` for subscribes
208                    refresh_schedule: None,
209                };
210                df_desc.export_sink(self.sink_id, sink_description);
211            }
212            SubscribeFrom::Query { expr, desc } => {
213                // TODO: Change the `expr` type to be `HirRelationExpr` and run
214                // HIR ⇒ MIR lowering and decorrelation here. This would allow
215                // us implement something like `EXPLAIN RAW PLAN FOR SUBSCRIBE.`
216                //
217                // let typ = expr.top_level_typ();
218                // let expr = expr.lower(&self.config)?;
219
220                // MIR ⇒ MIR optimization (local)
221                let mut transform_ctx = TransformCtx::local(
222                    &self.config.features,
223                    &self.typecheck_ctx,
224                    &mut df_meta,
225                    Some(&mut self.metrics),
226                    Some(self.view_id),
227                );
228
229                let expr = expr.lower(HirToMirConfig::from(&self.config), None)?;
230                let expr = optimize_mir_local(expr, &mut transform_ctx)?;
231
232                df_builder.import_view_into_dataflow(
233                    &self.view_id,
234                    &expr,
235                    &mut df_desc,
236                    &self.config.features,
237                )?;
238                df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
239
240                // Make SinkDesc
241                let sink_description = ComputeSinkDesc {
242                    from: self.view_id,
243                    from_desc: desc.clone(),
244                    connection: ComputeSinkConnection::Subscribe(SubscribeSinkConnection {
245                        output: output.row_order().to_vec(),
246                    }),
247                    with_snapshot: self.with_snapshot,
248                    up_to: self.up_to.map(Antichain::from_elem).unwrap_or_default(),
249                    // No `FORCE NOT NULL` for subscribes
250                    non_null_assertions: vec![],
251                    // No `REFRESH` for subscribes
252                    refresh_schedule: None,
253                };
254                df_desc.export_sink(self.sink_id, sink_description);
255            }
256        };
257
258        // Prepare expressions in the assembled dataflow.
259        let style = ExprPrepMaintained;
260        df_desc.visit_children(
261            |r| style.prep_relation_expr(r),
262            |s| style.prep_scalar_expr(s),
263        )?;
264
265        // Construct TransformCtx for global optimization.
266        let mut transform_ctx = TransformCtx::global(
267            &df_builder,
268            &mz_transform::EmptyStatisticsOracle, // TODO: wire proper stats
269            &self.config.features,
270            &self.typecheck_ctx,
271            &mut df_meta,
272            Some(&mut self.metrics),
273        );
274        // Run global optimization.
275        mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
276
277        if self.config.mode == OptimizeMode::Explain {
278            // Collect the list of indexes used by the dataflow at this point.
279            trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
280        }
281
282        self.duration += time.elapsed();
283
284        // Return the (sealed) plan at the end of this optimization step.
285        Ok(GlobalMirPlan {
286            df_desc,
287            df_meta,
288            phantom: PhantomData::<Unresolved>,
289        })
290    }
291}
292
293impl GlobalMirPlan<Unresolved> {
294    /// Produces the [`GlobalMirPlan`] with [`Resolved`] timestamp.
295    ///
296    /// We need to resolve timestamps before the `GlobalMirPlan ⇒ GlobalLirPlan`
297    /// optimization stage in order to profit from possible single-time
298    /// optimizations in the `Plan::finalize_dataflow` call.
299    pub fn resolve(mut self, as_of: Antichain<Timestamp>) -> GlobalMirPlan<Resolved> {
300        // A dataflow description for a `SUBSCRIBE` statement should not have
301        // index exports.
302        soft_assert_or_log!(
303            self.df_desc.index_exports.is_empty(),
304            "unexpectedly setting until for a DataflowDescription with an index",
305        );
306
307        // Set the `as_of` timestamp for the dataflow.
308        self.df_desc.set_as_of(as_of);
309
310        // The only outputs of the dataflow are sinks, so we might be able to
311        // turn off the computation early, if they all have non-trivial
312        // `up_to`s.
313        self.df_desc.until = Antichain::from_elem(Timestamp::MIN);
314        for (_, sink) in &self.df_desc.sink_exports {
315            self.df_desc.until.join_assign(&sink.up_to);
316        }
317
318        GlobalMirPlan {
319            df_desc: self.df_desc,
320            df_meta: self.df_meta,
321            phantom: PhantomData::<Resolved>,
322        }
323    }
324}
325
326impl Optimize<GlobalMirPlan<Resolved>> for Optimizer {
327    type To = GlobalLirPlan;
328
329    fn optimize(&mut self, plan: GlobalMirPlan<Resolved>) -> Result<Self::To, OptimizerError> {
330        let time = Instant::now();
331
332        let GlobalMirPlan {
333            mut df_desc,
334            df_meta,
335            phantom: _,
336        } = plan;
337
338        // Ensure all expressions are normalized before finalizing.
339        for build in df_desc.objects_to_build.iter_mut() {
340            normalize_lets(&mut build.plan.0, &self.config.features)?
341        }
342
343        if self.config.subscribe_snapshot_optimization {
344            // Determine whether we can elide any snapshots for this subscribe.
345            optimize_dataflow_snapshot(&mut df_desc)?;
346        }
347
348        // Finalize the dataflow. This includes:
349        // - MIR ⇒ LIR lowering
350        // - LIR ⇒ LIR transforms
351        let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
352
353        self.duration += time.elapsed();
354        self.metrics
355            .observe_e2e_optimization_time("subscribe", self.duration);
356
357        // Return the plan at the end of this `optimize` step.
358        Ok(GlobalLirPlan { df_desc, df_meta })
359    }
360}
361
362impl GlobalLirPlan {
363    /// Unwraps the parts of the final result of the optimization pipeline.
364    pub fn unapply(self) -> (LirDataflowDescription, DataflowMetainfo) {
365        (self.df_desc, self.df_meta)
366    }
367}