mz_adapter/coord/sequencer/inner/
subscribe.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use mz_ore::instrument;
11use mz_repr::optimize::OverrideFrom;
12use mz_sql::plan::{self, QueryWhen};
13use mz_sql::session::metadata::SessionMetadata;
14use timely::progress::Antichain;
15use tokio::sync::mpsc;
16use tracing::Span;
17
18use crate::active_compute_sink::{ActiveComputeSink, ActiveSubscribe};
19use crate::command::ExecuteResponse;
20use crate::coord::sequencer::inner::{check_log_reads, return_if_err};
21use crate::coord::{
22    Coordinator, Message, PlanValidity, StageResult, Staged, SubscribeFinish, SubscribeOptimizeMir,
23    SubscribeStage, SubscribeTimestampOptimizeLir, TargetCluster,
24};
25use crate::error::AdapterError;
26use crate::optimize::Optimize;
27use crate::session::{Session, TransactionOps};
28use crate::{AdapterNotice, ExecuteContext, TimelineContext, optimize};
29
30impl Staged for SubscribeStage {
31    type Ctx = ExecuteContext;
32
33    fn validity(&mut self) -> &mut PlanValidity {
34        match self {
35            SubscribeStage::OptimizeMir(stage) => &mut stage.validity,
36            SubscribeStage::TimestampOptimizeLir(stage) => &mut stage.validity,
37            SubscribeStage::Finish(stage) => &mut stage.validity,
38        }
39    }
40
41    async fn stage(
42        self,
43        coord: &mut Coordinator,
44        ctx: &mut ExecuteContext,
45    ) -> Result<StageResult<Box<Self>>, AdapterError> {
46        match self {
47            SubscribeStage::OptimizeMir(stage) => {
48                coord.subscribe_optimize_mir(ctx.session(), stage)
49            }
50            SubscribeStage::TimestampOptimizeLir(stage) => {
51                coord.subscribe_timestamp_optimize_lir(ctx, stage).await
52            }
53            SubscribeStage::Finish(stage) => coord.subscribe_finish(ctx, stage).await,
54        }
55    }
56
57    fn message(self, ctx: ExecuteContext, span: Span) -> Message {
58        Message::SubscribeStageReady {
59            ctx,
60            span,
61            stage: self,
62        }
63    }
64
65    fn cancel_enabled(&self) -> bool {
66        true
67    }
68}
69
70impl Coordinator {
71    #[instrument]
72    pub(crate) async fn sequence_subscribe(
73        &mut self,
74        mut ctx: ExecuteContext,
75        plan: plan::SubscribePlan,
76        target_cluster: TargetCluster,
77    ) {
78        let stage = return_if_err!(
79            self.subscribe_validate(ctx.session_mut(), plan, target_cluster),
80            ctx
81        );
82        self.sequence_staged(ctx, Span::current(), stage).await;
83    }
84
85    #[instrument]
86    fn subscribe_validate(
87        &mut self,
88        session: &mut Session,
89        plan: plan::SubscribePlan,
90        target_cluster: TargetCluster,
91    ) -> Result<SubscribeStage, AdapterError> {
92        let plan::SubscribePlan { from, when, .. } = &plan;
93
94        let cluster = self
95            .catalog()
96            .resolve_target_cluster(target_cluster, session)?;
97        let cluster_id = cluster.id;
98
99        if cluster.replicas().next().is_none() {
100            return Err(AdapterError::NoClusterReplicasAvailable {
101                name: cluster.name.clone(),
102                is_managed: cluster.is_managed(),
103            });
104        }
105
106        let mut replica_id = session
107            .vars()
108            .cluster_replica()
109            .map(|name| {
110                cluster
111                    .replica_id(name)
112                    .ok_or(AdapterError::UnknownClusterReplica {
113                        cluster_name: cluster.name.clone(),
114                        replica_name: name.to_string(),
115                    })
116            })
117            .transpose()?;
118
119        // SUBSCRIBE AS OF, similar to peeks, doesn't need to worry about transaction
120        // timestamp semantics.
121        if when == &QueryWhen::Immediately {
122            // If this isn't a SUBSCRIBE AS OF, the SUBSCRIBE can be in a transaction if it's the
123            // only operation.
124            session.add_transaction_ops(TransactionOps::Subscribe)?;
125        }
126
127        let depends_on = from.depends_on();
128
129        // Run `check_log_reads` and emit notices.
130        let notices = check_log_reads(
131            self.catalog(),
132            cluster,
133            &depends_on,
134            &mut replica_id,
135            session.vars(),
136        )?;
137        session.add_notices(notices);
138
139        // Determine timeline.
140        let mut timeline = self
141            .catalog()
142            .validate_timeline_context(depends_on.iter().copied())?;
143        if matches!(timeline, TimelineContext::TimestampIndependent) && from.contains_temporal() {
144            // If the from IDs are timestamp independent but the query contains temporal functions
145            // then the timeline context needs to be upgraded to timestamp dependent.
146            timeline = TimelineContext::TimestampDependent;
147        }
148
149        let dependencies = depends_on
150            .iter()
151            .map(|id| self.catalog().resolve_item_id(id))
152            .collect();
153        let validity = PlanValidity::new(
154            self.catalog().transient_revision(),
155            dependencies,
156            Some(cluster_id),
157            replica_id,
158            session.role_metadata().clone(),
159        );
160
161        Ok(SubscribeStage::OptimizeMir(SubscribeOptimizeMir {
162            validity,
163            plan,
164            timeline,
165            dependency_ids: depends_on,
166            cluster_id,
167            replica_id,
168        }))
169    }
170
171    #[instrument]
172    fn subscribe_optimize_mir(
173        &mut self,
174        session: &Session,
175        SubscribeOptimizeMir {
176            mut validity,
177            plan,
178            timeline,
179            dependency_ids,
180            cluster_id,
181            replica_id,
182        }: SubscribeOptimizeMir,
183    ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
184        let plan::SubscribePlan {
185            with_snapshot,
186            up_to,
187            ..
188        } = &plan;
189
190        // Collect optimizer parameters.
191        let compute_instance = self
192            .instance_snapshot(cluster_id)
193            .expect("compute instance does not exist");
194        let (_, view_id) = self.allocate_transient_id();
195        let (_, sink_id) = self.allocate_transient_id();
196        let conn_id = session.conn_id().clone();
197        let debug_name = format!("subscribe-{}", sink_id);
198        let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
199            .override_from(&self.catalog.get_cluster(cluster_id).config.features());
200
201        // Build an optimizer for this SUBSCRIBE.
202        let mut optimizer = optimize::subscribe::Optimizer::new(
203            self.owned_catalog(),
204            compute_instance,
205            view_id,
206            sink_id,
207            Some(conn_id),
208            *with_snapshot,
209            *up_to,
210            debug_name,
211            optimizer_config,
212            self.optimizer_metrics(),
213        );
214        let catalog = self.owned_catalog();
215
216        let span = Span::current();
217        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
218            || "optimize subscribe (mir)",
219            move || {
220                span.in_scope(|| {
221                    // MIR ⇒ MIR optimization (global)
222                    let global_mir_plan = optimizer.catch_unwind_optimize(plan.from.clone())?;
223                    // Add introduced indexes as validity dependencies.
224                    validity.extend_dependencies(
225                        global_mir_plan
226                            .id_bundle(optimizer.cluster_id())
227                            .iter()
228                            .map(|id| catalog.resolve_item_id(&id)),
229                    );
230
231                    let stage =
232                        SubscribeStage::TimestampOptimizeLir(SubscribeTimestampOptimizeLir {
233                            validity,
234                            plan,
235                            timeline,
236                            optimizer,
237                            global_mir_plan,
238                            dependency_ids,
239                            replica_id,
240                        });
241                    Ok(Box::new(stage))
242                })
243            },
244        )))
245    }
246
247    #[instrument]
248    async fn subscribe_timestamp_optimize_lir(
249        &mut self,
250        ctx: &ExecuteContext,
251        SubscribeTimestampOptimizeLir {
252            validity,
253            plan,
254            timeline,
255            mut optimizer,
256            global_mir_plan,
257            dependency_ids,
258            replica_id,
259        }: SubscribeTimestampOptimizeLir,
260    ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
261        let plan::SubscribePlan { when, .. } = &plan;
262
263        // Timestamp selection
264        let oracle_read_ts = self.oracle_read_ts(ctx.session(), &timeline, when).await;
265        let bundle = &global_mir_plan.id_bundle(optimizer.cluster_id());
266        let (determination, read_holds) = self.determine_timestamp(
267            ctx.session(),
268            bundle,
269            when,
270            optimizer.cluster_id(),
271            &timeline,
272            oracle_read_ts,
273            None,
274        )?;
275
276        let as_of = determination.timestamp_context.timestamp_or_default();
277
278        if let Some(id) = ctx.extra().contents() {
279            self.set_statement_execution_timestamp(id, as_of);
280        }
281        if let Some(up_to) = optimizer.up_to() {
282            if as_of == up_to {
283                ctx.session()
284                    .add_notice(AdapterNotice::EqualSubscribeBounds { bound: up_to });
285            } else if as_of > up_to {
286                return Err(AdapterError::AbsurdSubscribeBounds { as_of, up_to });
287            }
288        }
289
290        self.store_transaction_read_holds(ctx.session(), read_holds);
291
292        let global_mir_plan = global_mir_plan.resolve(Antichain::from_elem(as_of));
293
294        // Optimize LIR
295        let span = Span::current();
296        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
297            || "optimize subscribe (lir)",
298            move || {
299                span.in_scope(|| {
300                    // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
301                    let global_lir_plan =
302                        optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
303
304                    let stage = SubscribeStage::Finish(SubscribeFinish {
305                        validity,
306                        cluster_id: optimizer.cluster_id(),
307                        plan,
308                        global_lir_plan,
309                        dependency_ids,
310                        replica_id,
311                    });
312                    Ok(Box::new(stage))
313                })
314            },
315        )))
316    }
317
318    #[instrument]
319    async fn subscribe_finish(
320        &mut self,
321        ctx: &mut ExecuteContext,
322        SubscribeFinish {
323            validity: _,
324            cluster_id,
325            plan:
326                plan::SubscribePlan {
327                    copy_to,
328                    emit_progress,
329                    output,
330                    ..
331                },
332            global_lir_plan,
333            dependency_ids,
334            replica_id,
335        }: SubscribeFinish,
336    ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
337        let sink_id = global_lir_plan.sink_id();
338
339        let (tx, rx) = mpsc::unbounded_channel();
340        let active_subscribe = ActiveSubscribe {
341            conn_id: ctx.session().conn_id().clone(),
342            session_uuid: ctx.session().uuid(),
343            channel: tx,
344            emit_progress,
345            as_of: global_lir_plan
346                .as_of()
347                .expect("set to Some in an earlier stage"),
348            arity: global_lir_plan.sink_desc().from_desc.arity(),
349            cluster_id,
350            depends_on: dependency_ids,
351            start_time: self.now(),
352            output,
353        };
354        active_subscribe.initialize();
355
356        let (df_desc, df_meta) = global_lir_plan.unapply();
357
358        // Emit notices.
359        self.emit_optimizer_notices(ctx.session(), &df_meta.optimizer_notices);
360
361        // Add metadata for the new SUBSCRIBE.
362        let write_notify_fut = self
363            .add_active_compute_sink(sink_id, ActiveComputeSink::Subscribe(active_subscribe))
364            .await;
365        // Ship dataflow.
366        let ship_dataflow_fut = self.ship_dataflow(df_desc, cluster_id, replica_id);
367
368        // Both adding metadata for the new SUBSCRIBE and shipping the underlying dataflow, send
369        // requests to external services, which can take time, so we run them concurrently.
370        let ((), ()) = futures::future::join(write_notify_fut, ship_dataflow_fut).await;
371
372        // Release the pre-optimization read holds because the controller is now handling those.
373        let txn_read_holds = self
374            .txn_read_holds
375            .remove(ctx.session().conn_id())
376            .expect("must have previously installed read holds");
377
378        // Explicitly drop read holds, just to make it obvious what's happening.
379        drop(txn_read_holds);
380
381        let resp = ExecuteResponse::Subscribing {
382            rx,
383            ctx_extra: std::mem::take(ctx.extra_mut()),
384            instance_id: cluster_id,
385        };
386        let resp = match copy_to {
387            None => resp,
388            Some(format) => ExecuteResponse::CopyTo {
389                format,
390                resp: Box::new(resp),
391            },
392        };
393        Ok(StageResult::Response(resp))
394    }
395}