mz_adapter/coord/sequencer/inner/
subscribe.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use mz_ore::instrument;
11use mz_repr::optimize::OverrideFrom;
12use mz_sql::plan::{self, QueryWhen};
13use mz_sql::session::metadata::SessionMetadata;
14use timely::progress::Antichain;
15use tokio::sync::mpsc;
16use tracing::Span;
17
18use crate::active_compute_sink::{ActiveComputeSink, ActiveSubscribe};
19use crate::command::ExecuteResponse;
20use crate::coord::sequencer::inner::{check_log_reads, return_if_err};
21use crate::coord::{
22    Coordinator, Message, PlanValidity, StageResult, Staged, SubscribeFinish, SubscribeOptimizeMir,
23    SubscribeStage, SubscribeTimestampOptimizeLir, TargetCluster,
24};
25use crate::error::AdapterError;
26use crate::optimize::Optimize;
27use crate::session::{Session, TransactionOps};
28use crate::{AdapterNotice, ExecuteContext, TimelineContext, optimize};
29
30impl Staged for SubscribeStage {
31    type Ctx = ExecuteContext;
32
33    fn validity(&mut self) -> &mut PlanValidity {
34        match self {
35            SubscribeStage::OptimizeMir(stage) => &mut stage.validity,
36            SubscribeStage::TimestampOptimizeLir(stage) => &mut stage.validity,
37            SubscribeStage::Finish(stage) => &mut stage.validity,
38        }
39    }
40
41    async fn stage(
42        self,
43        coord: &mut Coordinator,
44        ctx: &mut ExecuteContext,
45    ) -> Result<StageResult<Box<Self>>, AdapterError> {
46        match self {
47            SubscribeStage::OptimizeMir(stage) => {
48                coord.subscribe_optimize_mir(ctx.session(), stage)
49            }
50            SubscribeStage::TimestampOptimizeLir(stage) => {
51                coord.subscribe_timestamp_optimize_lir(ctx, stage).await
52            }
53            SubscribeStage::Finish(stage) => coord.subscribe_finish(ctx, stage).await,
54        }
55    }
56
57    fn message(self, ctx: ExecuteContext, span: Span) -> Message {
58        Message::SubscribeStageReady {
59            ctx,
60            span,
61            stage: self,
62        }
63    }
64
65    fn cancel_enabled(&self) -> bool {
66        true
67    }
68}
69
70impl Coordinator {
71    #[instrument]
72    pub(crate) async fn sequence_subscribe(
73        &mut self,
74        mut ctx: ExecuteContext,
75        plan: plan::SubscribePlan,
76        target_cluster: TargetCluster,
77    ) {
78        let stage = return_if_err!(
79            self.subscribe_validate(ctx.session_mut(), plan, target_cluster),
80            ctx
81        );
82        self.sequence_staged(ctx, Span::current(), stage).await;
83    }
84
85    #[instrument]
86    fn subscribe_validate(
87        &mut self,
88        session: &mut Session,
89        plan: plan::SubscribePlan,
90        target_cluster: TargetCluster,
91    ) -> Result<SubscribeStage, AdapterError> {
92        let plan::SubscribePlan { from, when, .. } = &plan;
93
94        let cluster = self
95            .catalog()
96            .resolve_target_cluster(target_cluster, session)?;
97        let cluster_id = cluster.id;
98
99        if cluster.replicas().next().is_none() {
100            return Err(AdapterError::NoClusterReplicasAvailable {
101                name: cluster.name.clone(),
102                is_managed: cluster.is_managed(),
103            });
104        }
105
106        let mut replica_id = session
107            .vars()
108            .cluster_replica()
109            .map(|name| {
110                cluster
111                    .replica_id(name)
112                    .ok_or(AdapterError::UnknownClusterReplica {
113                        cluster_name: cluster.name.clone(),
114                        replica_name: name.to_string(),
115                    })
116            })
117            .transpose()?;
118
119        // SUBSCRIBE AS OF, similar to peeks, doesn't need to worry about transaction
120        // timestamp semantics.
121        if when == &QueryWhen::Immediately {
122            // If this isn't a SUBSCRIBE AS OF, the SUBSCRIBE can be in a transaction if it's the
123            // only operation.
124            session.add_transaction_ops(TransactionOps::Subscribe)?;
125        }
126
127        let depends_on = from.depends_on();
128
129        // Run `check_log_reads` and emit notices.
130        let notices = check_log_reads(
131            self.catalog(),
132            cluster,
133            &depends_on,
134            &mut replica_id,
135            session.vars(),
136        )?;
137        session.add_notices(notices);
138
139        // Determine timeline.
140        let mut timeline = self
141            .catalog()
142            .validate_timeline_context(depends_on.iter().copied())?;
143        if matches!(timeline, TimelineContext::TimestampIndependent) && from.contains_temporal() {
144            // If the from IDs are timestamp independent but the query contains temporal functions
145            // then the timeline context needs to be upgraded to timestamp dependent.
146            timeline = TimelineContext::TimestampDependent;
147        }
148
149        let dependencies = depends_on
150            .iter()
151            .map(|id| self.catalog().resolve_item_id(id))
152            .collect();
153        let validity = PlanValidity::new(
154            self.catalog().transient_revision(),
155            dependencies,
156            Some(cluster_id),
157            replica_id,
158            session.role_metadata().clone(),
159        );
160
161        Ok(SubscribeStage::OptimizeMir(SubscribeOptimizeMir {
162            validity,
163            plan,
164            timeline,
165            dependency_ids: depends_on,
166            cluster_id,
167            replica_id,
168        }))
169    }
170
171    #[instrument]
172    fn subscribe_optimize_mir(
173        &mut self,
174        session: &Session,
175        SubscribeOptimizeMir {
176            mut validity,
177            plan,
178            timeline,
179            dependency_ids,
180            cluster_id,
181            replica_id,
182        }: SubscribeOptimizeMir,
183    ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
184        let plan::SubscribePlan {
185            with_snapshot,
186            up_to,
187            ..
188        } = &plan;
189
190        // Collect optimizer parameters.
191        let compute_instance = self
192            .instance_snapshot(cluster_id)
193            .expect("compute instance does not exist");
194        let (_, view_id) = self.allocate_transient_id();
195        let (_, sink_id) = self.allocate_transient_id();
196        let conn_id = session.conn_id().clone();
197        let up_to = up_to
198            .as_ref()
199            .map(|expr| Coordinator::evaluate_when(self.catalog().state(), expr.clone(), session))
200            .transpose()?;
201        let debug_name = format!("subscribe-{}", sink_id);
202        let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
203            .override_from(&self.catalog.get_cluster(cluster_id).config.features());
204
205        // Build an optimizer for this SUBSCRIBE.
206        let mut optimizer = optimize::subscribe::Optimizer::new(
207            self.owned_catalog(),
208            compute_instance,
209            view_id,
210            sink_id,
211            Some(conn_id),
212            *with_snapshot,
213            up_to,
214            debug_name,
215            optimizer_config,
216            self.optimizer_metrics(),
217        );
218        let catalog = self.owned_catalog();
219
220        let span = Span::current();
221        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
222            || "optimize subscribe (mir)",
223            move || {
224                span.in_scope(|| {
225                    // MIR ⇒ MIR optimization (global)
226                    let global_mir_plan = optimizer.catch_unwind_optimize(plan.from.clone())?;
227                    // Add introduced indexes as validity dependencies.
228                    validity.extend_dependencies(
229                        global_mir_plan
230                            .id_bundle(optimizer.cluster_id())
231                            .iter()
232                            .map(|id| catalog.resolve_item_id(&id)),
233                    );
234
235                    let stage =
236                        SubscribeStage::TimestampOptimizeLir(SubscribeTimestampOptimizeLir {
237                            validity,
238                            plan,
239                            timeline,
240                            optimizer,
241                            global_mir_plan,
242                            dependency_ids,
243                            replica_id,
244                        });
245                    Ok(Box::new(stage))
246                })
247            },
248        )))
249    }
250
251    #[instrument]
252    async fn subscribe_timestamp_optimize_lir(
253        &mut self,
254        ctx: &ExecuteContext,
255        SubscribeTimestampOptimizeLir {
256            validity,
257            plan,
258            timeline,
259            mut optimizer,
260            global_mir_plan,
261            dependency_ids,
262            replica_id,
263        }: SubscribeTimestampOptimizeLir,
264    ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
265        let plan::SubscribePlan { when, .. } = &plan;
266
267        // Timestamp selection
268        let oracle_read_ts = self.oracle_read_ts(ctx.session(), &timeline, when).await;
269        let bundle = &global_mir_plan.id_bundle(optimizer.cluster_id());
270        let (determination, read_holds) = self.determine_timestamp(
271            ctx.session(),
272            bundle,
273            when,
274            optimizer.cluster_id(),
275            &timeline,
276            oracle_read_ts,
277            None,
278        )?;
279
280        let as_of = determination.timestamp_context.timestamp_or_default();
281
282        if let Some(id) = ctx.extra().contents() {
283            self.set_statement_execution_timestamp(id, as_of);
284        }
285        if let Some(up_to) = optimizer.up_to() {
286            if as_of == up_to {
287                ctx.session()
288                    .add_notice(AdapterNotice::EqualSubscribeBounds { bound: up_to });
289            } else if as_of > up_to {
290                return Err(AdapterError::AbsurdSubscribeBounds { as_of, up_to });
291            }
292        }
293
294        self.store_transaction_read_holds(ctx.session(), read_holds);
295
296        let global_mir_plan = global_mir_plan.resolve(Antichain::from_elem(as_of));
297
298        // Optimize LIR
299        let span = Span::current();
300        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
301            || "optimize subscribe (lir)",
302            move || {
303                span.in_scope(|| {
304                    // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
305                    let global_lir_plan =
306                        optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
307
308                    let stage = SubscribeStage::Finish(SubscribeFinish {
309                        validity,
310                        cluster_id: optimizer.cluster_id(),
311                        plan,
312                        global_lir_plan,
313                        dependency_ids,
314                        replica_id,
315                    });
316                    Ok(Box::new(stage))
317                })
318            },
319        )))
320    }
321
322    #[instrument]
323    async fn subscribe_finish(
324        &mut self,
325        ctx: &mut ExecuteContext,
326        SubscribeFinish {
327            validity: _,
328            cluster_id,
329            plan:
330                plan::SubscribePlan {
331                    copy_to,
332                    emit_progress,
333                    output,
334                    ..
335                },
336            global_lir_plan,
337            dependency_ids,
338            replica_id,
339        }: SubscribeFinish,
340    ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
341        let sink_id = global_lir_plan.sink_id();
342
343        let (tx, rx) = mpsc::unbounded_channel();
344        let active_subscribe = ActiveSubscribe {
345            conn_id: ctx.session().conn_id().clone(),
346            session_uuid: ctx.session().uuid(),
347            channel: tx,
348            emit_progress,
349            as_of: global_lir_plan
350                .as_of()
351                .expect("set to Some in an earlier stage"),
352            arity: global_lir_plan.sink_desc().from_desc.arity(),
353            cluster_id,
354            depends_on: dependency_ids,
355            start_time: self.now(),
356            output,
357        };
358        active_subscribe.initialize();
359
360        let (df_desc, df_meta) = global_lir_plan.unapply();
361
362        // Emit notices.
363        self.emit_optimizer_notices(ctx.session(), &df_meta.optimizer_notices);
364
365        // Add metadata for the new SUBSCRIBE.
366        let write_notify_fut = self
367            .add_active_compute_sink(sink_id, ActiveComputeSink::Subscribe(active_subscribe))
368            .await;
369        // Ship dataflow.
370        let ship_dataflow_fut = self.ship_dataflow(df_desc, cluster_id, replica_id);
371
372        // Both adding metadata for the new SUBSCRIBE and shipping the underlying dataflow, send
373        // requests to external services, which can take time, so we run them concurrently.
374        let ((), ()) = futures::future::join(write_notify_fut, ship_dataflow_fut).await;
375
376        // Release the pre-optimization read holds because the controller is now handling those.
377        let txn_read_holds = self
378            .txn_read_holds
379            .remove(ctx.session().conn_id())
380            .expect("must have previously installed read holds");
381
382        // Explicitly drop read holds, just to make it obvious what's happening.
383        drop(txn_read_holds);
384
385        let resp = ExecuteResponse::Subscribing {
386            rx,
387            ctx_extra: std::mem::take(ctx.extra_mut()),
388            instance_id: cluster_id,
389        };
390        let resp = match copy_to {
391            None => resp,
392            Some(format) => ExecuteResponse::CopyTo {
393                format,
394                resp: Box::new(resp),
395            },
396        };
397        Ok(StageResult::Response(resp))
398    }
399}