mz_adapter/coord/sequencer/inner/
subscribe.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use mz_ore::instrument;
11use mz_repr::optimize::OverrideFrom;
12use mz_sql::plan::{self, QueryWhen};
13use mz_sql::session::metadata::SessionMetadata;
14use timely::progress::Antichain;
15use tokio::sync::mpsc;
16use tracing::Span;
17
18use crate::active_compute_sink::{ActiveComputeSink, ActiveSubscribe};
19use crate::command::ExecuteResponse;
20use crate::coord::sequencer::inner::return_if_err;
21use crate::coord::sequencer::{check_log_reads, emit_optimizer_notices};
22use crate::coord::{
23    Coordinator, Message, PlanValidity, StageResult, Staged, SubscribeFinish, SubscribeOptimizeMir,
24    SubscribeStage, SubscribeTimestampOptimizeLir, TargetCluster,
25};
26use crate::error::AdapterError;
27use crate::optimize::Optimize;
28use crate::session::{Session, TransactionOps};
29use crate::{AdapterNotice, ExecuteContext, TimelineContext, optimize};
30
31impl Staged for SubscribeStage {
32    type Ctx = ExecuteContext;
33
34    fn validity(&mut self) -> &mut PlanValidity {
35        match self {
36            SubscribeStage::OptimizeMir(stage) => &mut stage.validity,
37            SubscribeStage::TimestampOptimizeLir(stage) => &mut stage.validity,
38            SubscribeStage::Finish(stage) => &mut stage.validity,
39        }
40    }
41
42    async fn stage(
43        self,
44        coord: &mut Coordinator,
45        ctx: &mut ExecuteContext,
46    ) -> Result<StageResult<Box<Self>>, AdapterError> {
47        match self {
48            SubscribeStage::OptimizeMir(stage) => {
49                coord.subscribe_optimize_mir(ctx.session(), stage)
50            }
51            SubscribeStage::TimestampOptimizeLir(stage) => {
52                coord.subscribe_timestamp_optimize_lir(ctx, stage).await
53            }
54            SubscribeStage::Finish(stage) => coord.subscribe_finish(ctx, stage).await,
55        }
56    }
57
58    fn message(self, ctx: ExecuteContext, span: Span) -> Message {
59        Message::SubscribeStageReady {
60            ctx,
61            span,
62            stage: self,
63        }
64    }
65
66    fn cancel_enabled(&self) -> bool {
67        true
68    }
69}
70
71impl Coordinator {
72    #[instrument]
73    pub(crate) async fn sequence_subscribe(
74        &mut self,
75        mut ctx: ExecuteContext,
76        plan: plan::SubscribePlan,
77        target_cluster: TargetCluster,
78    ) {
79        let stage = return_if_err!(
80            self.subscribe_validate(ctx.session_mut(), plan, target_cluster),
81            ctx
82        );
83        self.sequence_staged(ctx, Span::current(), stage).await;
84    }
85
86    #[instrument]
87    fn subscribe_validate(
88        &mut self,
89        session: &mut Session,
90        plan: plan::SubscribePlan,
91        target_cluster: TargetCluster,
92    ) -> Result<SubscribeStage, AdapterError> {
93        let plan::SubscribePlan { from, when, .. } = &plan;
94
95        let cluster = self
96            .catalog()
97            .resolve_target_cluster(target_cluster, session)?;
98        let cluster_id = cluster.id;
99
100        if cluster.replicas().next().is_none() {
101            return Err(AdapterError::NoClusterReplicasAvailable {
102                name: cluster.name.clone(),
103                is_managed: cluster.is_managed(),
104            });
105        }
106
107        let mut replica_id = session
108            .vars()
109            .cluster_replica()
110            .map(|name| {
111                cluster
112                    .replica_id(name)
113                    .ok_or(AdapterError::UnknownClusterReplica {
114                        cluster_name: cluster.name.clone(),
115                        replica_name: name.to_string(),
116                    })
117            })
118            .transpose()?;
119
120        // SUBSCRIBE AS OF, similar to peeks, doesn't need to worry about transaction
121        // timestamp semantics.
122        if when == &QueryWhen::Immediately {
123            // If this isn't a SUBSCRIBE AS OF, the SUBSCRIBE can be in a transaction if it's the
124            // only operation.
125            session.add_transaction_ops(TransactionOps::Subscribe)?;
126        }
127
128        let depends_on = from.depends_on();
129
130        // Run `check_log_reads` and emit notices.
131        let notices = check_log_reads(
132            self.catalog(),
133            cluster,
134            &depends_on,
135            &mut replica_id,
136            session.vars(),
137        )?;
138        session.add_notices(notices);
139
140        // Determine timeline.
141        let mut timeline = self
142            .catalog()
143            .validate_timeline_context(depends_on.iter().copied())?;
144        if matches!(timeline, TimelineContext::TimestampIndependent) && from.contains_temporal() {
145            // If the from IDs are timestamp independent but the query contains temporal functions
146            // then the timeline context needs to be upgraded to timestamp dependent.
147            timeline = TimelineContext::TimestampDependent;
148        }
149
150        let dependencies = depends_on
151            .iter()
152            .map(|id| self.catalog().resolve_item_id(id))
153            .collect();
154        let validity = PlanValidity::new(
155            self.catalog().transient_revision(),
156            dependencies,
157            Some(cluster_id),
158            replica_id,
159            session.role_metadata().clone(),
160        );
161
162        Ok(SubscribeStage::OptimizeMir(SubscribeOptimizeMir {
163            validity,
164            plan,
165            timeline,
166            dependency_ids: depends_on,
167            cluster_id,
168            replica_id,
169        }))
170    }
171
172    #[instrument]
173    fn subscribe_optimize_mir(
174        &mut self,
175        session: &Session,
176        SubscribeOptimizeMir {
177            mut validity,
178            plan,
179            timeline,
180            dependency_ids,
181            cluster_id,
182            replica_id,
183        }: SubscribeOptimizeMir,
184    ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
185        let plan::SubscribePlan {
186            with_snapshot,
187            up_to,
188            ..
189        } = &plan;
190
191        // Collect optimizer parameters.
192        let compute_instance = self
193            .instance_snapshot(cluster_id)
194            .expect("compute instance does not exist");
195        let (_, view_id) = self.allocate_transient_id();
196        let (_, sink_id) = self.allocate_transient_id();
197        let conn_id = session.conn_id().clone();
198        let debug_name = format!("subscribe-{}", sink_id);
199        let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
200            .override_from(&self.catalog.get_cluster(cluster_id).config.features());
201
202        // Build an optimizer for this SUBSCRIBE.
203        let mut optimizer = optimize::subscribe::Optimizer::new(
204            self.owned_catalog(),
205            compute_instance,
206            view_id,
207            sink_id,
208            Some(conn_id),
209            *with_snapshot,
210            *up_to,
211            debug_name,
212            optimizer_config,
213            self.optimizer_metrics(),
214        );
215        let catalog = self.owned_catalog();
216
217        let span = Span::current();
218        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
219            || "optimize subscribe (mir)",
220            move || {
221                span.in_scope(|| {
222                    // MIR ⇒ MIR optimization (global)
223                    let global_mir_plan = optimizer.catch_unwind_optimize(plan.from.clone())?;
224                    // Add introduced indexes as validity dependencies.
225                    validity.extend_dependencies(
226                        global_mir_plan
227                            .id_bundle(optimizer.cluster_id())
228                            .iter()
229                            .map(|id| catalog.resolve_item_id(&id)),
230                    );
231
232                    let stage =
233                        SubscribeStage::TimestampOptimizeLir(SubscribeTimestampOptimizeLir {
234                            validity,
235                            plan,
236                            timeline,
237                            optimizer,
238                            global_mir_plan,
239                            dependency_ids,
240                            replica_id,
241                        });
242                    Ok(Box::new(stage))
243                })
244            },
245        )))
246    }
247
248    #[instrument]
249    async fn subscribe_timestamp_optimize_lir(
250        &mut self,
251        ctx: &ExecuteContext,
252        SubscribeTimestampOptimizeLir {
253            validity,
254            plan,
255            timeline,
256            mut optimizer,
257            global_mir_plan,
258            dependency_ids,
259            replica_id,
260        }: SubscribeTimestampOptimizeLir,
261    ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
262        let plan::SubscribePlan { when, .. } = &plan;
263
264        // Timestamp selection
265        let oracle_read_ts = self.oracle_read_ts(ctx.session(), &timeline, when).await;
266        let bundle = &global_mir_plan.id_bundle(optimizer.cluster_id());
267        let (determination, read_holds) = self.determine_timestamp(
268            ctx.session(),
269            bundle,
270            when,
271            optimizer.cluster_id(),
272            &timeline,
273            oracle_read_ts,
274            None,
275        )?;
276
277        let as_of = determination.timestamp_context.timestamp_or_default();
278
279        if let Some(id) = ctx.extra().contents() {
280            self.set_statement_execution_timestamp(id, as_of);
281        }
282        if let Some(up_to) = optimizer.up_to() {
283            if as_of == up_to {
284                ctx.session()
285                    .add_notice(AdapterNotice::EqualSubscribeBounds { bound: up_to });
286            } else if as_of > up_to {
287                return Err(AdapterError::AbsurdSubscribeBounds { as_of, up_to });
288            }
289        }
290
291        self.store_transaction_read_holds(ctx.session().conn_id().clone(), read_holds);
292
293        let global_mir_plan = global_mir_plan.resolve(Antichain::from_elem(as_of));
294
295        // Optimize LIR
296        let span = Span::current();
297        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
298            || "optimize subscribe (lir)",
299            move || {
300                span.in_scope(|| {
301                    // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
302                    let global_lir_plan =
303                        optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
304
305                    let stage = SubscribeStage::Finish(SubscribeFinish {
306                        validity,
307                        cluster_id: optimizer.cluster_id(),
308                        plan,
309                        global_lir_plan,
310                        dependency_ids,
311                        replica_id,
312                    });
313                    Ok(Box::new(stage))
314                })
315            },
316        )))
317    }
318
319    #[instrument]
320    async fn subscribe_finish(
321        &mut self,
322        ctx: &mut ExecuteContext,
323        SubscribeFinish {
324            validity: _,
325            cluster_id,
326            plan:
327                plan::SubscribePlan {
328                    copy_to,
329                    emit_progress,
330                    output,
331                    ..
332                },
333            global_lir_plan,
334            dependency_ids,
335            replica_id,
336        }: SubscribeFinish,
337    ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
338        let sink_id = global_lir_plan.sink_id();
339
340        let (tx, rx) = mpsc::unbounded_channel();
341        let active_subscribe = ActiveSubscribe {
342            conn_id: ctx.session().conn_id().clone(),
343            session_uuid: ctx.session().uuid(),
344            channel: tx,
345            emit_progress,
346            as_of: global_lir_plan
347                .as_of()
348                .expect("set to Some in an earlier stage"),
349            arity: global_lir_plan.sink_desc().from_desc.arity(),
350            cluster_id,
351            depends_on: dependency_ids,
352            start_time: self.now(),
353            output,
354        };
355        active_subscribe.initialize();
356
357        let (df_desc, df_meta) = global_lir_plan.unapply();
358
359        // Emit notices.
360        emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices);
361
362        // Add metadata for the new SUBSCRIBE.
363        let write_notify_fut = self
364            .add_active_compute_sink(sink_id, ActiveComputeSink::Subscribe(active_subscribe))
365            .await;
366        // Ship dataflow.
367        let ship_dataflow_fut = self.ship_dataflow(df_desc, cluster_id, replica_id);
368
369        // Both adding metadata for the new SUBSCRIBE and shipping the underlying dataflow, send
370        // requests to external services, which can take time, so we run them concurrently.
371        let ((), ()) = futures::future::join(write_notify_fut, ship_dataflow_fut).await;
372
373        // Release the pre-optimization read holds because the controller is now handling those.
374        let txn_read_holds = self
375            .txn_read_holds
376            .remove(ctx.session().conn_id())
377            .expect("must have previously installed read holds");
378
379        // Explicitly drop read holds, just to make it obvious what's happening.
380        drop(txn_read_holds);
381
382        let resp = ExecuteResponse::Subscribing {
383            rx,
384            ctx_extra: std::mem::take(ctx.extra_mut()),
385            instance_id: cluster_id,
386        };
387        let resp = match copy_to {
388            None => resp,
389            Some(format) => ExecuteResponse::CopyTo {
390                format,
391                resp: Box::new(resp),
392            },
393        };
394        Ok(StageResult::Response(resp))
395    }
396}