mz_adapter/coord/sequencer/inner/
subscribe.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use mz_ore::instrument;
11use mz_repr::optimize::OverrideFrom;
12use mz_sql::plan::{self, QueryWhen};
13use mz_sql::session::metadata::SessionMetadata;
14use timely::progress::Antichain;
15use tokio::sync::mpsc;
16use tracing::Span;
17
18use crate::active_compute_sink::{ActiveComputeSink, ActiveSubscribe};
19use crate::command::ExecuteResponse;
20use crate::coord::sequencer::inner::return_if_err;
21use crate::coord::sequencer::{check_log_reads, emit_optimizer_notices};
22use crate::coord::{
23    Coordinator, Message, PlanValidity, StageResult, Staged, SubscribeFinish, SubscribeOptimizeMir,
24    SubscribeStage, SubscribeTimestampOptimizeLir, TargetCluster,
25};
26use crate::error::AdapterError;
27use crate::optimize::Optimize;
28use crate::session::{Session, TransactionOps};
29use crate::{AdapterNotice, ExecuteContext, TimelineContext, optimize};
30
31impl Staged for SubscribeStage {
32    type Ctx = ExecuteContext;
33
34    fn validity(&mut self) -> &mut PlanValidity {
35        match self {
36            SubscribeStage::OptimizeMir(stage) => &mut stage.validity,
37            SubscribeStage::TimestampOptimizeLir(stage) => &mut stage.validity,
38            SubscribeStage::Finish(stage) => &mut stage.validity,
39        }
40    }
41
42    async fn stage(
43        self,
44        coord: &mut Coordinator,
45        ctx: &mut ExecuteContext,
46    ) -> Result<StageResult<Box<Self>>, AdapterError> {
47        match self {
48            SubscribeStage::OptimizeMir(stage) => coord.subscribe_optimize_mir(stage),
49            SubscribeStage::TimestampOptimizeLir(stage) => {
50                coord.subscribe_timestamp_optimize_lir(ctx, stage).await
51            }
52            SubscribeStage::Finish(stage) => coord.subscribe_finish(ctx, stage).await,
53        }
54    }
55
56    fn message(self, ctx: ExecuteContext, span: Span) -> Message {
57        Message::SubscribeStageReady {
58            ctx,
59            span,
60            stage: self,
61        }
62    }
63
64    fn cancel_enabled(&self) -> bool {
65        true
66    }
67}
68
69impl Coordinator {
70    #[instrument]
71    pub(crate) async fn sequence_subscribe(
72        &mut self,
73        mut ctx: ExecuteContext,
74        plan: plan::SubscribePlan,
75        target_cluster: TargetCluster,
76    ) {
77        let stage = return_if_err!(
78            self.subscribe_validate(ctx.session_mut(), plan, target_cluster),
79            ctx
80        );
81        self.sequence_staged(ctx, Span::current(), stage).await;
82    }
83
84    #[instrument]
85    fn subscribe_validate(
86        &self,
87        session: &mut Session,
88        plan: plan::SubscribePlan,
89        target_cluster: TargetCluster,
90    ) -> Result<SubscribeStage, AdapterError> {
91        let plan::SubscribePlan { from, when, .. } = &plan;
92
93        let cluster = self
94            .catalog()
95            .resolve_target_cluster(target_cluster, session)?;
96        let cluster_id = cluster.id;
97
98        if cluster.replicas().next().is_none() {
99            return Err(AdapterError::NoClusterReplicasAvailable {
100                name: cluster.name.clone(),
101                is_managed: cluster.is_managed(),
102            });
103        }
104
105        let mut replica_id = session
106            .vars()
107            .cluster_replica()
108            .map(|name| {
109                cluster
110                    .replica_id(name)
111                    .ok_or(AdapterError::UnknownClusterReplica {
112                        cluster_name: cluster.name.clone(),
113                        replica_name: name.to_string(),
114                    })
115            })
116            .transpose()?;
117
118        // SUBSCRIBE AS OF, similar to peeks, doesn't need to worry about transaction
119        // timestamp semantics.
120        if when == &QueryWhen::Immediately {
121            // If this isn't a SUBSCRIBE AS OF, the SUBSCRIBE can be in a transaction if it's the
122            // only operation.
123            session.add_transaction_ops(TransactionOps::Subscribe)?;
124        }
125
126        let depends_on = from.depends_on();
127
128        // Run `check_log_reads` and emit notices.
129        let notices = check_log_reads(
130            self.catalog(),
131            cluster,
132            &depends_on,
133            &mut replica_id,
134            session.vars(),
135        )?;
136        session.add_notices(notices);
137
138        // Determine timeline.
139        let mut timeline = self
140            .catalog()
141            .validate_timeline_context(depends_on.iter().copied())?;
142        if matches!(timeline, TimelineContext::TimestampIndependent) && from.contains_temporal() {
143            // If the from IDs are timestamp independent but the query contains temporal functions
144            // then the timeline context needs to be upgraded to timestamp dependent.
145            timeline = TimelineContext::TimestampDependent;
146        }
147
148        let dependencies = depends_on
149            .iter()
150            .map(|id| self.catalog().resolve_item_id(id))
151            .collect();
152        let validity = PlanValidity::new(
153            self.catalog().transient_revision(),
154            dependencies,
155            Some(cluster_id),
156            replica_id,
157            session.role_metadata().clone(),
158        );
159
160        Ok(SubscribeStage::OptimizeMir(SubscribeOptimizeMir {
161            validity,
162            plan,
163            timeline,
164            dependency_ids: depends_on,
165            cluster_id,
166            replica_id,
167        }))
168    }
169
170    #[instrument]
171    fn subscribe_optimize_mir(
172        &self,
173        SubscribeOptimizeMir {
174            mut validity,
175            plan,
176            timeline,
177            dependency_ids,
178            cluster_id,
179            replica_id,
180        }: SubscribeOptimizeMir,
181    ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
182        let plan::SubscribePlan {
183            with_snapshot,
184            up_to,
185            ..
186        } = &plan;
187
188        // Collect optimizer parameters.
189        let compute_instance = self
190            .instance_snapshot(cluster_id)
191            .expect("compute instance does not exist");
192        let (_, view_id) = self.allocate_transient_id();
193        let (_, sink_id) = self.allocate_transient_id();
194        let debug_name = format!("subscribe-{}", sink_id);
195        let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
196            .override_from(&self.catalog.get_cluster(cluster_id).config.features());
197
198        // Build an optimizer for this SUBSCRIBE.
199        let mut optimizer = optimize::subscribe::Optimizer::new(
200            self.owned_catalog(),
201            compute_instance,
202            view_id,
203            sink_id,
204            *with_snapshot,
205            *up_to,
206            debug_name,
207            optimizer_config,
208            self.optimizer_metrics(),
209        );
210        let catalog = self.owned_catalog();
211
212        let span = Span::current();
213        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
214            || "optimize subscribe (mir)",
215            move || {
216                span.in_scope(|| {
217                    // MIR ⇒ MIR optimization (global)
218                    let global_mir_plan = optimizer.catch_unwind_optimize(plan.from.clone())?;
219                    // Add introduced indexes as validity dependencies.
220                    validity.extend_dependencies(
221                        global_mir_plan
222                            .id_bundle(optimizer.cluster_id())
223                            .iter()
224                            .map(|id| catalog.resolve_item_id(&id)),
225                    );
226
227                    let stage =
228                        SubscribeStage::TimestampOptimizeLir(SubscribeTimestampOptimizeLir {
229                            validity,
230                            plan,
231                            timeline,
232                            optimizer,
233                            global_mir_plan,
234                            dependency_ids,
235                            replica_id,
236                        });
237                    Ok(Box::new(stage))
238                })
239            },
240        )))
241    }
242
243    #[instrument]
244    async fn subscribe_timestamp_optimize_lir(
245        &mut self,
246        ctx: &ExecuteContext,
247        SubscribeTimestampOptimizeLir {
248            validity,
249            plan,
250            timeline,
251            mut optimizer,
252            global_mir_plan,
253            dependency_ids,
254            replica_id,
255        }: SubscribeTimestampOptimizeLir,
256    ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
257        let plan::SubscribePlan { when, .. } = &plan;
258
259        // Timestamp selection
260        let oracle_read_ts = self.oracle_read_ts(ctx.session(), &timeline, when).await;
261        let bundle = &global_mir_plan.id_bundle(optimizer.cluster_id());
262        let (determination, read_holds) = self.determine_timestamp(
263            ctx.session(),
264            bundle,
265            when,
266            optimizer.cluster_id(),
267            &timeline,
268            oracle_read_ts,
269            None,
270        )?;
271
272        let as_of = determination.timestamp_context.timestamp_or_default();
273
274        if let Some(id) = ctx.extra().contents() {
275            self.set_statement_execution_timestamp(id, as_of);
276        }
277        if let Some(up_to) = optimizer.up_to() {
278            if as_of == up_to {
279                ctx.session()
280                    .add_notice(AdapterNotice::EqualSubscribeBounds { bound: up_to });
281            } else if as_of > up_to {
282                return Err(AdapterError::AbsurdSubscribeBounds { as_of, up_to });
283            }
284        }
285
286        self.store_transaction_read_holds(ctx.session().conn_id().clone(), read_holds);
287
288        let global_mir_plan = global_mir_plan.resolve(Antichain::from_elem(as_of));
289
290        // Optimize LIR
291        let span = Span::current();
292        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
293            || "optimize subscribe (lir)",
294            move || {
295                span.in_scope(|| {
296                    // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
297                    let global_lir_plan =
298                        optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
299
300                    let stage = SubscribeStage::Finish(SubscribeFinish {
301                        validity,
302                        cluster_id: optimizer.cluster_id(),
303                        plan,
304                        global_lir_plan,
305                        dependency_ids,
306                        replica_id,
307                    });
308                    Ok(Box::new(stage))
309                })
310            },
311        )))
312    }
313
314    #[instrument]
315    async fn subscribe_finish(
316        &mut self,
317        ctx: &mut ExecuteContext,
318        SubscribeFinish {
319            validity: _,
320            cluster_id,
321            plan:
322                plan::SubscribePlan {
323                    copy_to,
324                    emit_progress,
325                    output,
326                    ..
327                },
328            global_lir_plan,
329            dependency_ids,
330            replica_id,
331        }: SubscribeFinish,
332    ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
333        let sink_id = global_lir_plan.sink_id();
334
335        let (tx, rx) = mpsc::unbounded_channel();
336        let active_subscribe = ActiveSubscribe {
337            conn_id: ctx.session().conn_id().clone(),
338            session_uuid: ctx.session().uuid(),
339            channel: tx,
340            emit_progress,
341            as_of: global_lir_plan
342                .as_of()
343                .expect("set to Some in an earlier stage"),
344            arity: global_lir_plan.sink_desc().from_desc.arity(),
345            cluster_id,
346            depends_on: dependency_ids,
347            start_time: self.now(),
348            output,
349        };
350        active_subscribe.initialize();
351
352        let (df_desc, df_meta) = global_lir_plan.unapply();
353
354        // Emit notices.
355        emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices);
356
357        // Add metadata for the new SUBSCRIBE.
358        let write_notify_fut = self
359            .add_active_compute_sink(sink_id, ActiveComputeSink::Subscribe(active_subscribe))
360            .await;
361        // Ship dataflow.
362        let ship_dataflow_fut = self.ship_dataflow(df_desc, cluster_id, replica_id);
363
364        // Both adding metadata for the new SUBSCRIBE and shipping the underlying dataflow, send
365        // requests to external services, which can take time, so we run them concurrently.
366        let ((), ()) = futures::future::join(write_notify_fut, ship_dataflow_fut).await;
367
368        // Release the pre-optimization read holds because the controller is now handling those.
369        let txn_read_holds = self
370            .txn_read_holds
371            .remove(ctx.session().conn_id())
372            .expect("must have previously installed read holds");
373
374        // Explicitly drop read holds, just to make it obvious what's happening.
375        drop(txn_read_holds);
376
377        let resp = ExecuteResponse::Subscribing {
378            rx,
379            ctx_extra: std::mem::take(ctx.extra_mut()),
380            instance_id: cluster_id,
381        };
382        let resp = match copy_to {
383            None => resp,
384            Some(format) => ExecuteResponse::CopyTo {
385                format,
386                resp: Box::new(resp),
387            },
388        };
389        Ok(StageResult::Response(resp))
390    }
391}