mz_adapter/optimize/
subscribe.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Optimizer implementation for `SUBSCRIBE` statements.
11
12use std::marker::PhantomData;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use differential_dataflow::lattice::Lattice;
17use mz_adapter_types::connection::ConnectionId;
18use mz_compute_types::ComputeInstanceId;
19use mz_compute_types::plan::Plan;
20use mz_compute_types::sinks::{ComputeSinkConnection, ComputeSinkDesc, SubscribeSinkConnection};
21use mz_ore::collections::CollectionExt;
22use mz_ore::soft_assert_or_log;
23use mz_repr::{GlobalId, RelationDesc, Timestamp};
24use mz_sql::optimizer_metrics::OptimizerMetrics;
25use mz_sql::plan::SubscribeFrom;
26use mz_transform::TransformCtx;
27use mz_transform::dataflow::DataflowMetainfo;
28use mz_transform::normalize_lets::normalize_lets;
29use mz_transform::typecheck::{SharedContext as TypecheckContext, empty_context};
30use timely::progress::Antichain;
31
32use crate::CollectionIdBundle;
33use crate::optimize::dataflows::{
34    ComputeInstanceSnapshot, DataflowBuilder, ExprPrepStyle, dataflow_import_id_bundle,
35    prep_relation_expr, prep_scalar_expr,
36};
37use crate::optimize::{
38    LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerCatalog,
39    OptimizerConfig, OptimizerError, optimize_mir_local, trace_plan,
40};
41
42pub struct Optimizer {
43    /// A typechecking context to use throughout the optimizer pipeline.
44    typecheck_ctx: TypecheckContext,
45    /// A snapshot of the catalog state.
46    catalog: Arc<dyn OptimizerCatalog>,
47    /// A snapshot of the cluster that will run the dataflows.
48    compute_instance: ComputeInstanceSnapshot,
49    /// A transient GlobalId to be used for the exported sink.
50    sink_id: GlobalId,
51    /// A transient GlobalId to be used when constructing a dataflow for
52    /// `SUBSCRIBE FROM <SELECT>` variants.
53    view_id: GlobalId,
54    /// The id of the session connection in which the optimizer will run.
55    conn_id: Option<ConnectionId>,
56    /// Should the plan produce an initial snapshot?
57    with_snapshot: bool,
58    /// Sink timestamp.
59    up_to: Option<Timestamp>,
60    /// A human-readable name exposed internally (useful for debugging).
61    debug_name: String,
62    /// Optimizer config.
63    config: OptimizerConfig,
64    /// Optimizer metrics.
65    metrics: OptimizerMetrics,
66    /// The time spent performing optimization so far.
67    duration: Duration,
68}
69
70// A bogey `Debug` implementation that hides fields. This is needed to make the
71// `event!` call in `sequence_peek_stage` not emit a lot of data.
72//
73// For now, we skip almost all fields, but we might revisit that bit if it turns
74// out that we really need those for debugging purposes.
75impl std::fmt::Debug for Optimizer {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        f.debug_struct("Optimizer")
78            .field("config", &self.config)
79            .finish_non_exhaustive()
80    }
81}
82
83impl Optimizer {
84    pub fn new(
85        catalog: Arc<dyn OptimizerCatalog>,
86        compute_instance: ComputeInstanceSnapshot,
87        view_id: GlobalId,
88        sink_id: GlobalId,
89        conn_id: Option<ConnectionId>,
90        with_snapshot: bool,
91        up_to: Option<Timestamp>,
92        debug_name: String,
93        config: OptimizerConfig,
94        metrics: OptimizerMetrics,
95    ) -> Self {
96        Self {
97            typecheck_ctx: empty_context(),
98            catalog,
99            compute_instance,
100            view_id,
101            sink_id,
102            conn_id,
103            with_snapshot,
104            up_to,
105            debug_name,
106            config,
107            metrics,
108            duration: Default::default(),
109        }
110    }
111
112    pub fn cluster_id(&self) -> ComputeInstanceId {
113        self.compute_instance.instance_id()
114    }
115
116    pub fn up_to(&self) -> Option<Timestamp> {
117        self.up_to.clone()
118    }
119}
120
121/// The (sealed intermediate) result after:
122///
123/// 1. embedding a [`SubscribeFrom`] plan into a [`MirDataflowDescription`],
124/// 2. transitively inlining referenced views, and
125/// 3. jointly optimizing the `MIR` plans in the [`MirDataflowDescription`].
126#[derive(Clone, Debug)]
127pub struct GlobalMirPlan<T: Clone> {
128    df_desc: MirDataflowDescription,
129    df_meta: DataflowMetainfo,
130    phantom: PhantomData<T>,
131}
132
133impl<T: Clone> GlobalMirPlan<T> {
134    /// Computes the [`CollectionIdBundle`] of the wrapped dataflow.
135    pub fn id_bundle(&self, compute_instance_id: ComputeInstanceId) -> CollectionIdBundle {
136        dataflow_import_id_bundle(&self.df_desc, compute_instance_id)
137    }
138}
139
140/// The (final) result after MIR ⇒ LIR lowering and optimizing the resulting
141/// `DataflowDescription` with `LIR` plans.
142#[derive(Clone, Debug)]
143pub struct GlobalLirPlan {
144    df_desc: LirDataflowDescription,
145    df_meta: DataflowMetainfo,
146}
147
148impl GlobalLirPlan {
149    pub fn sink_id(&self) -> GlobalId {
150        let sink_exports = &self.df_desc.sink_exports;
151        let sink_id = sink_exports.keys().next().expect("valid sink");
152        *sink_id
153    }
154
155    pub fn as_of(&self) -> Option<Timestamp> {
156        self.df_desc.as_of.clone().map(|as_of| as_of.into_element())
157    }
158
159    pub fn sink_desc(&self) -> &ComputeSinkDesc {
160        let sink_exports = &self.df_desc.sink_exports;
161        let sink_desc = sink_exports.values().next().expect("valid sink");
162        sink_desc
163    }
164}
165
166/// Marker type for [`GlobalMirPlan`] structs representing an optimization
167/// result without a resolved timestamp.
168#[derive(Clone, Debug)]
169pub struct Unresolved;
170
171/// Marker type for [`GlobalMirPlan`] structs representing an optimization
172/// result with a resolved timestamp.
173///
174/// The actual timestamp value is set in the [`MirDataflowDescription`] of the
175/// surrounding [`GlobalMirPlan`] when we call `resolve()`.
176#[derive(Clone, Debug)]
177pub struct Resolved;
178
179impl Optimize<SubscribeFrom> for Optimizer {
180    type To = GlobalMirPlan<Unresolved>;
181
182    fn optimize(&mut self, plan: SubscribeFrom) -> Result<Self::To, OptimizerError> {
183        let time = Instant::now();
184
185        let mut df_builder = {
186            let compute = self.compute_instance.clone();
187            DataflowBuilder::new(&*self.catalog, compute).with_config(&self.config)
188        };
189        let mut df_desc = MirDataflowDescription::new(self.debug_name.clone());
190        let mut df_meta = DataflowMetainfo::default();
191
192        match plan {
193            SubscribeFrom::Id(from_id) => {
194                let from = self.catalog.get_entry(&from_id);
195                let from_desc = from
196                    .desc(
197                        &self
198                            .catalog
199                            .resolve_full_name(from.name(), self.conn_id.as_ref()),
200                    )
201                    .expect("subscribes can only be run on items with descs")
202                    .into_owned();
203
204                df_builder.import_into_dataflow(&from_id, &mut df_desc, &self.config.features)?;
205                df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
206
207                // Make SinkDesc
208                let sink_description = ComputeSinkDesc {
209                    from: from_id,
210                    from_desc,
211                    connection: ComputeSinkConnection::Subscribe(SubscribeSinkConnection::default()),
212                    with_snapshot: self.with_snapshot,
213                    up_to: self.up_to.map(Antichain::from_elem).unwrap_or_default(),
214                    // No `FORCE NOT NULL` for subscribes
215                    non_null_assertions: vec![],
216                    // No `REFRESH` for subscribes
217                    refresh_schedule: None,
218                };
219                df_desc.export_sink(self.sink_id, sink_description);
220            }
221            SubscribeFrom::Query { expr, desc } => {
222                // TODO: Change the `expr` type to be `HirRelationExpr` and run
223                // HIR ⇒ MIR lowering and decorrelation here. This would allow
224                // us implement something like `EXPLAIN RAW PLAN FOR SUBSCRIBE.`
225                //
226                // let expr = expr.lower(&self.config)?;
227
228                // MIR ⇒ MIR optimization (local)
229                let mut transform_ctx = TransformCtx::local(
230                    &self.config.features,
231                    &self.typecheck_ctx,
232                    &mut df_meta,
233                    Some(&self.metrics),
234                    Some(self.view_id),
235                );
236                let expr = optimize_mir_local(expr, &mut transform_ctx)?;
237
238                df_builder.import_view_into_dataflow(
239                    &self.view_id,
240                    &expr,
241                    &mut df_desc,
242                    &self.config.features,
243                )?;
244                df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
245
246                // Make SinkDesc
247                let sink_description = ComputeSinkDesc {
248                    from: self.view_id,
249                    from_desc: RelationDesc::new(expr.typ(), desc.iter_names()),
250                    connection: ComputeSinkConnection::Subscribe(SubscribeSinkConnection::default()),
251                    with_snapshot: self.with_snapshot,
252                    up_to: self.up_to.map(Antichain::from_elem).unwrap_or_default(),
253                    // No `FORCE NOT NULL` for subscribes
254                    non_null_assertions: vec![],
255                    // No `REFRESH` for subscribes
256                    refresh_schedule: None,
257                };
258                df_desc.export_sink(self.sink_id, sink_description);
259            }
260        };
261
262        // Prepare expressions in the assembled dataflow.
263        let style = ExprPrepStyle::Maintained;
264        df_desc.visit_children(
265            |r| prep_relation_expr(r, style),
266            |s| prep_scalar_expr(s, style),
267        )?;
268
269        // Construct TransformCtx for global optimization.
270        let mut transform_ctx = TransformCtx::global(
271            &df_builder,
272            &mz_transform::EmptyStatisticsOracle, // TODO: wire proper stats
273            &self.config.features,
274            &self.typecheck_ctx,
275            &mut df_meta,
276            Some(&self.metrics),
277        );
278        // Run global optimization.
279        mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
280
281        if self.config.mode == OptimizeMode::Explain {
282            // Collect the list of indexes used by the dataflow at this point.
283            trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
284        }
285
286        self.duration += time.elapsed();
287
288        // Return the (sealed) plan at the end of this optimization step.
289        Ok(GlobalMirPlan {
290            df_desc,
291            df_meta,
292            phantom: PhantomData::<Unresolved>,
293        })
294    }
295}
296
297impl GlobalMirPlan<Unresolved> {
298    /// Produces the [`GlobalMirPlan`] with [`Resolved`] timestamp.
299    ///
300    /// We need to resolve timestamps before the `GlobalMirPlan ⇒ GlobalLirPlan`
301    /// optimization stage in order to profit from possible single-time
302    /// optimizations in the `Plan::finalize_dataflow` call.
303    pub fn resolve(mut self, as_of: Antichain<Timestamp>) -> GlobalMirPlan<Resolved> {
304        // A dataflow description for a `SUBSCRIBE` statement should not have
305        // index exports.
306        soft_assert_or_log!(
307            self.df_desc.index_exports.is_empty(),
308            "unexpectedly setting until for a DataflowDescription with an index",
309        );
310
311        // Set the `as_of` timestamp for the dataflow.
312        self.df_desc.set_as_of(as_of);
313
314        // The only outputs of the dataflow are sinks, so we might be able to
315        // turn off the computation early, if they all have non-trivial
316        // `up_to`s.
317        self.df_desc.until = Antichain::from_elem(Timestamp::MIN);
318        for (_, sink) in &self.df_desc.sink_exports {
319            self.df_desc.until.join_assign(&sink.up_to);
320        }
321
322        GlobalMirPlan {
323            df_desc: self.df_desc,
324            df_meta: self.df_meta,
325            phantom: PhantomData::<Resolved>,
326        }
327    }
328}
329
330impl Optimize<GlobalMirPlan<Resolved>> for Optimizer {
331    type To = GlobalLirPlan;
332
333    fn optimize(&mut self, plan: GlobalMirPlan<Resolved>) -> Result<Self::To, OptimizerError> {
334        let time = Instant::now();
335
336        let GlobalMirPlan {
337            mut df_desc,
338            df_meta,
339            phantom: _,
340        } = plan;
341
342        // Ensure all expressions are normalized before finalizing.
343        for build in df_desc.objects_to_build.iter_mut() {
344            normalize_lets(&mut build.plan.0, &self.config.features)?
345        }
346
347        // Finalize the dataflow. This includes:
348        // - MIR ⇒ LIR lowering
349        // - LIR ⇒ LIR transforms
350        let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
351
352        self.duration += time.elapsed();
353        self.metrics
354            .observe_e2e_optimization_time("subscribe", self.duration);
355
356        // Return the plan at the end of this `optimize` step.
357        Ok(GlobalLirPlan { df_desc, df_meta })
358    }
359}
360
361impl GlobalLirPlan {
362    /// Unwraps the parts of the final result of the optimization pipeline.
363    pub fn unapply(self) -> (LirDataflowDescription, DataflowMetainfo) {
364        (self.df_desc, self.df_meta)
365    }
366}