mz_adapter/coord/sequencer/inner/
subscribe.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use mz_ore::instrument;
11use mz_repr::optimize::OverrideFrom;
12use mz_sql::plan::{self, QueryWhen};
13use mz_sql::session::metadata::SessionMetadata;
14use timely::progress::Antichain;
15use tokio::sync::mpsc;
16use tracing::Span;
17
18use crate::active_compute_sink::{ActiveComputeSink, ActiveSubscribe};
19use crate::command::ExecuteResponse;
20use crate::coord::sequencer::inner::{check_log_reads, return_if_err};
21use crate::coord::{
22    Coordinator, Message, PlanValidity, StageResult, Staged, SubscribeFinish, SubscribeOptimizeMir,
23    SubscribeStage, SubscribeTimestampOptimizeLir, TargetCluster,
24};
25use crate::error::AdapterError;
26use crate::optimize::Optimize;
27use crate::session::{Session, TransactionOps};
28use crate::{AdapterNotice, ExecuteContext, TimelineContext, optimize};
29
30impl Staged for SubscribeStage {
31    type Ctx = ExecuteContext;
32
33    fn validity(&mut self) -> &mut PlanValidity {
34        match self {
35            SubscribeStage::OptimizeMir(stage) => &mut stage.validity,
36            SubscribeStage::TimestampOptimizeLir(stage) => &mut stage.validity,
37            SubscribeStage::Finish(stage) => &mut stage.validity,
38        }
39    }
40
41    async fn stage(
42        self,
43        coord: &mut Coordinator,
44        ctx: &mut ExecuteContext,
45    ) -> Result<StageResult<Box<Self>>, AdapterError> {
46        match self {
47            SubscribeStage::OptimizeMir(stage) => {
48                coord.subscribe_optimize_mir(ctx.session(), stage)
49            }
50            SubscribeStage::TimestampOptimizeLir(stage) => {
51                coord.subscribe_timestamp_optimize_lir(ctx, stage).await
52            }
53            SubscribeStage::Finish(stage) => coord.subscribe_finish(ctx, stage).await,
54        }
55    }
56
57    fn message(self, ctx: ExecuteContext, span: Span) -> Message {
58        Message::SubscribeStageReady {
59            ctx,
60            span,
61            stage: self,
62        }
63    }
64
65    fn cancel_enabled(&self) -> bool {
66        true
67    }
68}
69
70impl Coordinator {
71    #[instrument]
72    pub(crate) async fn sequence_subscribe(
73        &mut self,
74        mut ctx: ExecuteContext,
75        plan: plan::SubscribePlan,
76        target_cluster: TargetCluster,
77    ) {
78        let stage = return_if_err!(
79            self.subscribe_validate(ctx.session_mut(), plan, target_cluster),
80            ctx
81        );
82        self.sequence_staged(ctx, Span::current(), stage).await;
83    }
84
85    #[instrument]
86    fn subscribe_validate(
87        &mut self,
88        session: &mut Session,
89        plan: plan::SubscribePlan,
90        target_cluster: TargetCluster,
91    ) -> Result<SubscribeStage, AdapterError> {
92        let plan::SubscribePlan { from, when, .. } = &plan;
93
94        let cluster = self
95            .catalog()
96            .resolve_target_cluster(target_cluster, session)?;
97        let cluster_id = cluster.id;
98
99        if cluster.replicas().next().is_none() {
100            return Err(AdapterError::NoClusterReplicasAvailable {
101                name: cluster.name.clone(),
102                is_managed: cluster.is_managed(),
103            });
104        }
105
106        let mut replica_id = session
107            .vars()
108            .cluster_replica()
109            .map(|name| {
110                cluster
111                    .replica_id(name)
112                    .ok_or(AdapterError::UnknownClusterReplica {
113                        cluster_name: cluster.name.clone(),
114                        replica_name: name.to_string(),
115                    })
116            })
117            .transpose()?;
118
119        // SUBSCRIBE AS OF, similar to peeks, doesn't need to worry about transaction
120        // timestamp semantics.
121        if when == &QueryWhen::Immediately {
122            // If this isn't a SUBSCRIBE AS OF, the SUBSCRIBE can be in a transaction if it's the
123            // only operation.
124            session.add_transaction_ops(TransactionOps::Subscribe)?;
125        }
126
127        let depends_on = from.depends_on();
128
129        // Run `check_log_reads` and emit notices.
130        let notices = check_log_reads(
131            self.catalog(),
132            cluster,
133            &depends_on,
134            &mut replica_id,
135            session.vars(),
136        )?;
137        session.add_notices(notices);
138
139        // Determine timeline.
140        let mut timeline = self.validate_timeline_context(depends_on.iter().copied())?;
141        if matches!(timeline, TimelineContext::TimestampIndependent) && from.contains_temporal() {
142            // If the from IDs are timestamp independent but the query contains temporal functions
143            // then the timeline context needs to be upgraded to timestamp dependent.
144            timeline = TimelineContext::TimestampDependent;
145        }
146
147        let dependencies = depends_on
148            .iter()
149            .map(|id| self.catalog().resolve_item_id(id))
150            .collect();
151        let validity = PlanValidity::new(
152            self.catalog().transient_revision(),
153            dependencies,
154            Some(cluster_id),
155            replica_id,
156            session.role_metadata().clone(),
157        );
158
159        Ok(SubscribeStage::OptimizeMir(SubscribeOptimizeMir {
160            validity,
161            plan,
162            timeline,
163            dependency_ids: depends_on,
164            cluster_id,
165            replica_id,
166        }))
167    }
168
169    #[instrument]
170    fn subscribe_optimize_mir(
171        &mut self,
172        session: &Session,
173        SubscribeOptimizeMir {
174            mut validity,
175            plan,
176            timeline,
177            dependency_ids,
178            cluster_id,
179            replica_id,
180        }: SubscribeOptimizeMir,
181    ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
182        let plan::SubscribePlan {
183            with_snapshot,
184            up_to,
185            ..
186        } = &plan;
187
188        // Collect optimizer parameters.
189        let compute_instance = self
190            .instance_snapshot(cluster_id)
191            .expect("compute instance does not exist");
192        let (_, view_id) = self.allocate_transient_id();
193        let (_, sink_id) = self.allocate_transient_id();
194        let conn_id = session.conn_id().clone();
195        let up_to = up_to
196            .as_ref()
197            .map(|expr| Coordinator::evaluate_when(self.catalog().state(), expr.clone(), session))
198            .transpose()?;
199        let debug_name = format!("subscribe-{}", sink_id);
200        let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
201            .override_from(&self.catalog.get_cluster(cluster_id).config.features());
202
203        // Build an optimizer for this SUBSCRIBE.
204        let mut optimizer = optimize::subscribe::Optimizer::new(
205            self.owned_catalog(),
206            compute_instance,
207            view_id,
208            sink_id,
209            Some(conn_id),
210            *with_snapshot,
211            up_to,
212            debug_name,
213            optimizer_config,
214            self.optimizer_metrics(),
215        );
216        let catalog = self.owned_catalog();
217
218        let span = Span::current();
219        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
220            || "optimize subscribe (mir)",
221            move || {
222                span.in_scope(|| {
223                    // MIR ⇒ MIR optimization (global)
224                    let global_mir_plan = optimizer.catch_unwind_optimize(plan.from.clone())?;
225                    // Add introduced indexes as validity dependencies.
226                    validity.extend_dependencies(
227                        global_mir_plan
228                            .id_bundle(optimizer.cluster_id())
229                            .iter()
230                            .map(|id| catalog.resolve_item_id(&id)),
231                    );
232
233                    let stage =
234                        SubscribeStage::TimestampOptimizeLir(SubscribeTimestampOptimizeLir {
235                            validity,
236                            plan,
237                            timeline,
238                            optimizer,
239                            global_mir_plan,
240                            dependency_ids,
241                            replica_id,
242                        });
243                    Ok(Box::new(stage))
244                })
245            },
246        )))
247    }
248
249    #[instrument]
250    async fn subscribe_timestamp_optimize_lir(
251        &mut self,
252        ctx: &ExecuteContext,
253        SubscribeTimestampOptimizeLir {
254            validity,
255            plan,
256            timeline,
257            mut optimizer,
258            global_mir_plan,
259            dependency_ids,
260            replica_id,
261        }: SubscribeTimestampOptimizeLir,
262    ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
263        let plan::SubscribePlan { when, .. } = &plan;
264
265        // Timestamp selection
266        let oracle_read_ts = self.oracle_read_ts(ctx.session(), &timeline, when).await;
267        let bundle = &global_mir_plan.id_bundle(optimizer.cluster_id());
268        let (determination, read_holds) = self.determine_timestamp(
269            ctx.session(),
270            bundle,
271            when,
272            optimizer.cluster_id(),
273            &timeline,
274            oracle_read_ts,
275            None,
276        )?;
277
278        let as_of = determination.timestamp_context.timestamp_or_default();
279
280        if let Some(id) = ctx.extra().contents() {
281            self.set_statement_execution_timestamp(id, as_of);
282        }
283        if let Some(up_to) = optimizer.up_to() {
284            if as_of == up_to {
285                ctx.session()
286                    .add_notice(AdapterNotice::EqualSubscribeBounds { bound: up_to });
287            } else if as_of > up_to {
288                return Err(AdapterError::AbsurdSubscribeBounds { as_of, up_to });
289            }
290        }
291
292        self.store_transaction_read_holds(ctx.session(), read_holds);
293
294        let global_mir_plan = global_mir_plan.resolve(Antichain::from_elem(as_of));
295
296        // Optimize LIR
297        let span = Span::current();
298        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
299            || "optimize subscribe (lir)",
300            move || {
301                span.in_scope(|| {
302                    // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
303                    let global_lir_plan =
304                        optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
305
306                    let stage = SubscribeStage::Finish(SubscribeFinish {
307                        validity,
308                        cluster_id: optimizer.cluster_id(),
309                        plan,
310                        global_lir_plan,
311                        dependency_ids,
312                        replica_id,
313                    });
314                    Ok(Box::new(stage))
315                })
316            },
317        )))
318    }
319
320    #[instrument]
321    async fn subscribe_finish(
322        &mut self,
323        ctx: &mut ExecuteContext,
324        SubscribeFinish {
325            validity: _,
326            cluster_id,
327            plan:
328                plan::SubscribePlan {
329                    copy_to,
330                    emit_progress,
331                    output,
332                    ..
333                },
334            global_lir_plan,
335            dependency_ids,
336            replica_id,
337        }: SubscribeFinish,
338    ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
339        let sink_id = global_lir_plan.sink_id();
340
341        let (tx, rx) = mpsc::unbounded_channel();
342        let active_subscribe = ActiveSubscribe {
343            conn_id: ctx.session().conn_id().clone(),
344            session_uuid: ctx.session().uuid(),
345            channel: tx,
346            emit_progress,
347            as_of: global_lir_plan
348                .as_of()
349                .expect("set to Some in an earlier stage"),
350            arity: global_lir_plan.sink_desc().from_desc.arity(),
351            cluster_id,
352            depends_on: dependency_ids,
353            start_time: self.now(),
354            output,
355        };
356        active_subscribe.initialize();
357
358        let (df_desc, df_meta) = global_lir_plan.unapply();
359
360        // Emit notices.
361        self.emit_optimizer_notices(ctx.session(), &df_meta.optimizer_notices);
362
363        // Add metadata for the new SUBSCRIBE.
364        let write_notify_fut = self
365            .add_active_compute_sink(sink_id, ActiveComputeSink::Subscribe(active_subscribe))
366            .await;
367        // Ship dataflow.
368        let ship_dataflow_fut = self.ship_dataflow(df_desc, cluster_id, replica_id);
369
370        // Both adding metadata for the new SUBSCRIBE and shipping the underlying dataflow, send
371        // requests to external services, which can take time, so we run them concurrently.
372        let ((), ()) = futures::future::join(write_notify_fut, ship_dataflow_fut).await;
373
374        // Release the pre-optimization read holds because the controller is now handling those.
375        let txn_read_holds = self
376            .txn_read_holds
377            .remove(ctx.session().conn_id())
378            .expect("must have previously installed read holds");
379
380        // Explicitly drop read holds, just to make it obvious what's happening.
381        drop(txn_read_holds);
382
383        let resp = ExecuteResponse::Subscribing {
384            rx,
385            ctx_extra: std::mem::take(ctx.extra_mut()),
386            instance_id: cluster_id,
387        };
388        let resp = match copy_to {
389            None => resp,
390            Some(format) => ExecuteResponse::CopyTo {
391                format,
392                resp: Box::new(resp),
393            },
394        };
395        Ok(StageResult::Response(resp))
396    }
397}