mz_adapter/optimize/
subscribe.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Optimizer implementation for `SUBSCRIBE` statements.
11
12use std::marker::PhantomData;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use differential_dataflow::lattice::Lattice;
17use mz_adapter_types::connection::ConnectionId;
18use mz_compute_types::ComputeInstanceId;
19use mz_compute_types::plan::Plan;
20use mz_compute_types::sinks::{ComputeSinkConnection, ComputeSinkDesc, SubscribeSinkConnection};
21use mz_ore::collections::CollectionExt;
22use mz_ore::soft_assert_or_log;
23use mz_repr::{GlobalId, RelationDesc, Timestamp};
24use mz_sql::optimizer_metrics::OptimizerMetrics;
25use mz_sql::plan::SubscribeFrom;
26use mz_transform::TransformCtx;
27use mz_transform::dataflow::DataflowMetainfo;
28use mz_transform::normalize_lets::normalize_lets;
29use mz_transform::reprtypecheck::{
30    SharedContext as ReprTypecheckContext, empty_context as empty_repr_context,
31};
32use mz_transform::typecheck::{SharedContext as TypecheckContext, empty_context};
33use timely::progress::Antichain;
34
35use crate::CollectionIdBundle;
36use crate::optimize::dataflows::{
37    ComputeInstanceSnapshot, DataflowBuilder, ExprPrepStyle, dataflow_import_id_bundle,
38    prep_relation_expr, prep_scalar_expr,
39};
40use crate::optimize::{
41    LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerCatalog,
42    OptimizerConfig, OptimizerError, optimize_mir_local, trace_plan,
43};
44
45pub struct Optimizer {
46    /// A typechecking context to use throughout the optimizer pipeline.
47    typecheck_ctx: TypecheckContext,
48    /// A representation typechecking context to use throughout the optimizer pipeline.
49    repr_typecheck_ctx: ReprTypecheckContext,
50    /// A snapshot of the catalog state.
51    catalog: Arc<dyn OptimizerCatalog>,
52    /// A snapshot of the cluster that will run the dataflows.
53    compute_instance: ComputeInstanceSnapshot,
54    /// A transient GlobalId to be used for the exported sink.
55    sink_id: GlobalId,
56    /// A transient GlobalId to be used when constructing a dataflow for
57    /// `SUBSCRIBE FROM <SELECT>` variants.
58    view_id: GlobalId,
59    /// The id of the session connection in which the optimizer will run.
60    conn_id: Option<ConnectionId>,
61    /// Should the plan produce an initial snapshot?
62    with_snapshot: bool,
63    /// Sink timestamp.
64    up_to: Option<Timestamp>,
65    /// A human-readable name exposed internally (useful for debugging).
66    debug_name: String,
67    /// Optimizer config.
68    config: OptimizerConfig,
69    /// Optimizer metrics.
70    metrics: OptimizerMetrics,
71    /// The time spent performing optimization so far.
72    duration: Duration,
73}
74
75// A bogey `Debug` implementation that hides fields. This is needed to make the
76// `event!` call in `sequence_peek_stage` not emit a lot of data.
77//
78// For now, we skip almost all fields, but we might revisit that bit if it turns
79// out that we really need those for debugging purposes.
80impl std::fmt::Debug for Optimizer {
81    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82        f.debug_struct("Optimizer")
83            .field("config", &self.config)
84            .finish_non_exhaustive()
85    }
86}
87
88impl Optimizer {
89    pub fn new(
90        catalog: Arc<dyn OptimizerCatalog>,
91        compute_instance: ComputeInstanceSnapshot,
92        view_id: GlobalId,
93        sink_id: GlobalId,
94        conn_id: Option<ConnectionId>,
95        with_snapshot: bool,
96        up_to: Option<Timestamp>,
97        debug_name: String,
98        config: OptimizerConfig,
99        metrics: OptimizerMetrics,
100    ) -> Self {
101        Self {
102            typecheck_ctx: empty_context(),
103            repr_typecheck_ctx: empty_repr_context(),
104            catalog,
105            compute_instance,
106            view_id,
107            sink_id,
108            conn_id,
109            with_snapshot,
110            up_to,
111            debug_name,
112            config,
113            metrics,
114            duration: Default::default(),
115        }
116    }
117
118    pub fn cluster_id(&self) -> ComputeInstanceId {
119        self.compute_instance.instance_id()
120    }
121
122    pub fn up_to(&self) -> Option<Timestamp> {
123        self.up_to.clone()
124    }
125}
126
127/// The (sealed intermediate) result after:
128///
129/// 1. embedding a [`SubscribeFrom`] plan into a [`MirDataflowDescription`],
130/// 2. transitively inlining referenced views, and
131/// 3. jointly optimizing the `MIR` plans in the [`MirDataflowDescription`].
132#[derive(Clone, Debug)]
133pub struct GlobalMirPlan<T: Clone> {
134    df_desc: MirDataflowDescription,
135    df_meta: DataflowMetainfo,
136    phantom: PhantomData<T>,
137}
138
139impl<T: Clone> GlobalMirPlan<T> {
140    /// Computes the [`CollectionIdBundle`] of the wrapped dataflow.
141    pub fn id_bundle(&self, compute_instance_id: ComputeInstanceId) -> CollectionIdBundle {
142        dataflow_import_id_bundle(&self.df_desc, compute_instance_id)
143    }
144}
145
146/// The (final) result after MIR ⇒ LIR lowering and optimizing the resulting
147/// `DataflowDescription` with `LIR` plans.
148#[derive(Clone, Debug)]
149pub struct GlobalLirPlan {
150    df_desc: LirDataflowDescription,
151    df_meta: DataflowMetainfo,
152}
153
154impl GlobalLirPlan {
155    /// Returns the id of the dataflow's sink export.
156    ///
157    /// # Panics
158    ///
159    /// Panics if the dataflow has no sink exports or has more than one.
160    pub fn sink_id(&self) -> GlobalId {
161        self.df_desc.sink_id()
162    }
163
164    pub fn as_of(&self) -> Option<Timestamp> {
165        self.df_desc.as_of.clone().map(|as_of| as_of.into_element())
166    }
167
168    /// Returns the description of the dataflow's sink export.
169    ///
170    /// # Panics
171    ///
172    /// Panics if the dataflow has no sink exports or has more than one.
173    pub fn sink_desc(&self) -> &ComputeSinkDesc {
174        let sink_exports = &self.df_desc.sink_exports;
175        let sink_desc = sink_exports.values().into_element();
176        sink_desc
177    }
178}
179
180/// Marker type for [`GlobalMirPlan`] structs representing an optimization
181/// result without a resolved timestamp.
182#[derive(Clone, Debug)]
183pub struct Unresolved;
184
185/// Marker type for [`GlobalMirPlan`] structs representing an optimization
186/// result with a resolved timestamp.
187///
188/// The actual timestamp value is set in the [`MirDataflowDescription`] of the
189/// surrounding [`GlobalMirPlan`] when we call `resolve()`.
190#[derive(Clone, Debug)]
191pub struct Resolved;
192
193impl Optimize<SubscribeFrom> for Optimizer {
194    type To = GlobalMirPlan<Unresolved>;
195
196    fn optimize(&mut self, plan: SubscribeFrom) -> Result<Self::To, OptimizerError> {
197        let time = Instant::now();
198
199        let mut df_builder = {
200            let compute = self.compute_instance.clone();
201            DataflowBuilder::new(&*self.catalog, compute).with_config(&self.config)
202        };
203        let mut df_desc = MirDataflowDescription::new(self.debug_name.clone());
204        let mut df_meta = DataflowMetainfo::default();
205
206        match plan {
207            SubscribeFrom::Id(from_id) => {
208                let from = self.catalog.get_entry(&from_id);
209                let from_desc = from
210                    .desc(
211                        &self
212                            .catalog
213                            .resolve_full_name(from.name(), self.conn_id.as_ref()),
214                    )
215                    .expect("subscribes can only be run on items with descs")
216                    .into_owned();
217
218                df_builder.import_into_dataflow(&from_id, &mut df_desc, &self.config.features)?;
219                df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
220
221                // Make SinkDesc
222                let sink_description = ComputeSinkDesc {
223                    from: from_id,
224                    from_desc,
225                    connection: ComputeSinkConnection::Subscribe(SubscribeSinkConnection::default()),
226                    with_snapshot: self.with_snapshot,
227                    up_to: self.up_to.map(Antichain::from_elem).unwrap_or_default(),
228                    // No `FORCE NOT NULL` for subscribes
229                    non_null_assertions: vec![],
230                    // No `REFRESH` for subscribes
231                    refresh_schedule: None,
232                };
233                df_desc.export_sink(self.sink_id, sink_description);
234            }
235            SubscribeFrom::Query { expr, desc } => {
236                // TODO: Change the `expr` type to be `HirRelationExpr` and run
237                // HIR ⇒ MIR lowering and decorrelation here. This would allow
238                // us implement something like `EXPLAIN RAW PLAN FOR SUBSCRIBE.`
239                //
240                // let expr = expr.lower(&self.config)?;
241
242                // MIR ⇒ MIR optimization (local)
243                let mut transform_ctx = TransformCtx::local(
244                    &self.config.features,
245                    &self.typecheck_ctx,
246                    &self.repr_typecheck_ctx,
247                    &mut df_meta,
248                    Some(&mut self.metrics),
249                    Some(self.view_id),
250                );
251                let expr = optimize_mir_local(expr, &mut transform_ctx)?;
252
253                df_builder.import_view_into_dataflow(
254                    &self.view_id,
255                    &expr,
256                    &mut df_desc,
257                    &self.config.features,
258                )?;
259                df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
260
261                // Make SinkDesc
262                let sink_description = ComputeSinkDesc {
263                    from: self.view_id,
264                    from_desc: RelationDesc::new(expr.typ(), desc.iter_names()),
265                    connection: ComputeSinkConnection::Subscribe(SubscribeSinkConnection::default()),
266                    with_snapshot: self.with_snapshot,
267                    up_to: self.up_to.map(Antichain::from_elem).unwrap_or_default(),
268                    // No `FORCE NOT NULL` for subscribes
269                    non_null_assertions: vec![],
270                    // No `REFRESH` for subscribes
271                    refresh_schedule: None,
272                };
273                df_desc.export_sink(self.sink_id, sink_description);
274            }
275        };
276
277        // Prepare expressions in the assembled dataflow.
278        let style = ExprPrepStyle::Maintained;
279        df_desc.visit_children(
280            |r| prep_relation_expr(r, style),
281            |s| prep_scalar_expr(s, style),
282        )?;
283
284        // Construct TransformCtx for global optimization.
285        let mut transform_ctx = TransformCtx::global(
286            &df_builder,
287            &mz_transform::EmptyStatisticsOracle, // TODO: wire proper stats
288            &self.config.features,
289            &self.typecheck_ctx,
290            &self.repr_typecheck_ctx,
291            &mut df_meta,
292            Some(&mut self.metrics),
293        );
294        // Run global optimization.
295        mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
296
297        if self.config.mode == OptimizeMode::Explain {
298            // Collect the list of indexes used by the dataflow at this point.
299            trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
300        }
301
302        self.duration += time.elapsed();
303
304        // Return the (sealed) plan at the end of this optimization step.
305        Ok(GlobalMirPlan {
306            df_desc,
307            df_meta,
308            phantom: PhantomData::<Unresolved>,
309        })
310    }
311}
312
313impl GlobalMirPlan<Unresolved> {
314    /// Produces the [`GlobalMirPlan`] with [`Resolved`] timestamp.
315    ///
316    /// We need to resolve timestamps before the `GlobalMirPlan ⇒ GlobalLirPlan`
317    /// optimization stage in order to profit from possible single-time
318    /// optimizations in the `Plan::finalize_dataflow` call.
319    pub fn resolve(mut self, as_of: Antichain<Timestamp>) -> GlobalMirPlan<Resolved> {
320        // A dataflow description for a `SUBSCRIBE` statement should not have
321        // index exports.
322        soft_assert_or_log!(
323            self.df_desc.index_exports.is_empty(),
324            "unexpectedly setting until for a DataflowDescription with an index",
325        );
326
327        // Set the `as_of` timestamp for the dataflow.
328        self.df_desc.set_as_of(as_of);
329
330        // The only outputs of the dataflow are sinks, so we might be able to
331        // turn off the computation early, if they all have non-trivial
332        // `up_to`s.
333        self.df_desc.until = Antichain::from_elem(Timestamp::MIN);
334        for (_, sink) in &self.df_desc.sink_exports {
335            self.df_desc.until.join_assign(&sink.up_to);
336        }
337
338        GlobalMirPlan {
339            df_desc: self.df_desc,
340            df_meta: self.df_meta,
341            phantom: PhantomData::<Resolved>,
342        }
343    }
344}
345
346impl Optimize<GlobalMirPlan<Resolved>> for Optimizer {
347    type To = GlobalLirPlan;
348
349    fn optimize(&mut self, plan: GlobalMirPlan<Resolved>) -> Result<Self::To, OptimizerError> {
350        let time = Instant::now();
351
352        let GlobalMirPlan {
353            mut df_desc,
354            df_meta,
355            phantom: _,
356        } = plan;
357
358        // Ensure all expressions are normalized before finalizing.
359        for build in df_desc.objects_to_build.iter_mut() {
360            normalize_lets(&mut build.plan.0, &self.config.features)?
361        }
362
363        // Finalize the dataflow. This includes:
364        // - MIR ⇒ LIR lowering
365        // - LIR ⇒ LIR transforms
366        let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
367
368        self.duration += time.elapsed();
369        self.metrics
370            .observe_e2e_optimization_time("subscribe", self.duration);
371
372        // Return the plan at the end of this `optimize` step.
373        Ok(GlobalLirPlan { df_desc, df_meta })
374    }
375}
376
377impl GlobalLirPlan {
378    /// Unwraps the parts of the final result of the optimization pipeline.
379    pub fn unapply(self) -> (LirDataflowDescription, DataflowMetainfo) {
380        (self.df_desc, self.df_meta)
381    }
382}