Skip to main content

mz_adapter/coord/sequencer/inner/
subscribe.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use maplit::btreemap;
11use mz_adapter_types::connection::ConnectionId;
12use mz_cluster_client::ReplicaId;
13use mz_compute_types::ComputeInstanceId;
14use mz_compute_types::dataflows::DataflowDescription;
15use mz_compute_types::plan::Plan;
16use mz_ore::collections::CollectionExt;
17use mz_ore::instrument;
18use mz_repr::GlobalId;
19use mz_repr::explain::{ExprHumanizerExt, TransientItem};
20use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
21use mz_sql::plan::{self, QueryWhen, SubscribeFrom};
22use mz_sql::session::metadata::SessionMetadata;
23use std::collections::BTreeSet;
24use timely::progress::Antichain;
25use tokio::sync::mpsc;
26use tracing::Span;
27use uuid::Uuid;
28
29use crate::active_compute_sink::{ActiveComputeSink, ActiveSubscribe};
30use crate::command::ExecuteResponse;
31use crate::coord::sequencer::inner::return_if_err;
32use crate::coord::sequencer::{check_log_reads, emit_optimizer_notices};
33use crate::coord::{
34    Coordinator, ExplainContext, ExplainPlanContext, Message, PlanValidity, StageResult, Staged,
35    SubscribeExplain, SubscribeFinish, SubscribeOptimizeMir, SubscribeStage,
36    SubscribeTimestampOptimizeLir, TargetCluster,
37};
38use crate::error::AdapterError;
39use crate::explain::optimizer_trace::OptimizerTrace;
40use crate::optimize::Optimize;
41use crate::session::{Session, TransactionOps};
42use crate::{
43    AdapterNotice, ExecuteContext, ExecuteContextGuard, ReadHolds, TimelineContext, optimize,
44};
45
46impl Staged for SubscribeStage {
47    type Ctx = ExecuteContext;
48
49    fn validity(&mut self) -> &mut PlanValidity {
50        match self {
51            SubscribeStage::OptimizeMir(stage) => &mut stage.validity,
52            SubscribeStage::TimestampOptimizeLir(stage) => &mut stage.validity,
53            SubscribeStage::Finish(stage) => &mut stage.validity,
54            SubscribeStage::Explain(stage) => &mut stage.validity,
55        }
56    }
57
58    async fn stage(
59        self,
60        coord: &mut Coordinator,
61        ctx: &mut ExecuteContext,
62    ) -> Result<StageResult<Box<Self>>, AdapterError> {
63        match self {
64            SubscribeStage::OptimizeMir(stage) => coord.subscribe_optimize_mir(stage),
65            SubscribeStage::TimestampOptimizeLir(stage) => {
66                coord.subscribe_timestamp_optimize_lir(ctx, stage).await
67            }
68            SubscribeStage::Finish(stage) => coord.subscribe_finish(ctx, stage).await,
69            SubscribeStage::Explain(stage) => coord.subscribe_explain(ctx.session(), stage).await,
70        }
71    }
72
73    fn message(self, ctx: ExecuteContext, span: Span) -> Message {
74        Message::SubscribeStageReady {
75            ctx,
76            span,
77            stage: self,
78        }
79    }
80
81    fn cancel_enabled(&self) -> bool {
82        true
83    }
84}
85
86impl Coordinator {
87    #[instrument]
88    pub(crate) async fn sequence_subscribe(
89        &mut self,
90        mut ctx: ExecuteContext,
91        plan: plan::SubscribePlan,
92        target_cluster: TargetCluster,
93    ) {
94        let stage = return_if_err!(
95            self.subscribe_validate(
96                ctx.session_mut(),
97                plan,
98                target_cluster,
99                ExplainContext::None
100            ),
101            ctx
102        );
103        self.sequence_staged(ctx, Span::current(), stage).await;
104    }
105
106    #[instrument]
107    pub(crate) async fn explain_subscribe(
108        &mut self,
109        mut ctx: ExecuteContext,
110        plan::ExplainPlanPlan {
111            stage,
112            format,
113            config,
114            explainee,
115        }: plan::ExplainPlanPlan,
116        target_cluster: TargetCluster,
117    ) {
118        let plan::Explainee::Statement(stmt) = explainee else {
119            // This is currently asserted in the `sequence_explain_plan` code that
120            // calls this method.
121            unreachable!()
122        };
123        let plan::ExplaineeStatement::Subscribe { broken, plan } = stmt else {
124            // This is currently asserted in the `sequence_explain_plan` code that
125            // calls this method.
126            unreachable!()
127        };
128
129        let desc = match &plan.from {
130            SubscribeFrom::Id(_) => None,
131            SubscribeFrom::Query { desc, .. } => Some(desc.clone()),
132        };
133
134        // Create an OptimizerTrace instance to collect plans emitted when
135        // executing the optimizer pipeline.
136        let optimizer_trace = OptimizerTrace::new(stage.paths());
137
138        let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
139            broken,
140            config,
141            format,
142            stage,
143            replan: None,
144            desc,
145            optimizer_trace,
146        });
147        let stage = return_if_err!(
148            self.subscribe_validate(ctx.session_mut(), plan, target_cluster, explain_ctx),
149            ctx
150        );
151        self.sequence_staged(ctx, Span::current(), stage).await;
152    }
153
154    #[instrument]
155    fn subscribe_validate(
156        &self,
157        session: &mut Session,
158        plan: plan::SubscribePlan,
159        target_cluster: TargetCluster,
160        explain_ctx: ExplainContext,
161    ) -> Result<SubscribeStage, AdapterError> {
162        let plan::SubscribePlan { from, when, .. } = &plan;
163
164        let cluster = self
165            .catalog()
166            .resolve_target_cluster(target_cluster, session)?;
167        let cluster_id = cluster.id;
168
169        // Only check cluster replicas if we're not in explain mode.
170        if explain_ctx.needs_cluster() && cluster.replicas().next().is_none() {
171            return Err(AdapterError::NoClusterReplicasAvailable {
172                name: cluster.name.clone(),
173                is_managed: cluster.is_managed(),
174            });
175        }
176
177        let mut replica_id = session
178            .vars()
179            .cluster_replica()
180            .map(|name| {
181                cluster
182                    .replica_id(name)
183                    .ok_or(AdapterError::UnknownClusterReplica {
184                        cluster_name: cluster.name.clone(),
185                        replica_name: name.to_string(),
186                    })
187            })
188            .transpose()?;
189
190        // SUBSCRIBE AS OF, similar to peeks, doesn't need to worry about transaction
191        // timestamp semantics.
192        if when == &QueryWhen::Immediately {
193            // If this isn't a SUBSCRIBE AS OF, the SUBSCRIBE can be in a transaction if it's the
194            // only operation.
195            session.add_transaction_ops(TransactionOps::Subscribe)?;
196        }
197
198        let depends_on = from.depends_on();
199
200        // Run `check_log_reads` and emit notices.
201        let notices = check_log_reads(
202            self.catalog(),
203            cluster,
204            &depends_on,
205            &mut replica_id,
206            session.vars(),
207        )?;
208        session.add_notices(notices);
209
210        // Determine timeline.
211        let mut timeline = self
212            .catalog()
213            .validate_timeline_context(depends_on.iter().copied())?;
214        if matches!(timeline, TimelineContext::TimestampIndependent) && from.contains_temporal() {
215            // If the from IDs are timestamp independent but the query contains temporal functions
216            // then the timeline context needs to be upgraded to timestamp dependent.
217            timeline = TimelineContext::TimestampDependent;
218        }
219
220        let dependencies = depends_on
221            .iter()
222            .map(|id| self.catalog().resolve_item_id(id))
223            .collect();
224        let validity = PlanValidity::new(
225            self.catalog().transient_revision(),
226            dependencies,
227            Some(cluster_id),
228            replica_id,
229            session.role_metadata().clone(),
230        );
231
232        Ok(SubscribeStage::OptimizeMir(SubscribeOptimizeMir {
233            validity,
234            plan,
235            timeline,
236            dependency_ids: depends_on,
237            cluster_id,
238            replica_id,
239            explain_ctx,
240        }))
241    }
242
243    #[instrument]
244    fn subscribe_optimize_mir(
245        &self,
246        SubscribeOptimizeMir {
247            mut validity,
248            plan,
249            timeline,
250            dependency_ids,
251            cluster_id,
252            replica_id,
253            explain_ctx,
254        }: SubscribeOptimizeMir,
255    ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
256        let plan::SubscribePlan {
257            with_snapshot,
258            up_to,
259            ..
260        } = &plan;
261
262        // Collect optimizer parameters.
263        let compute_instance = self
264            .instance_snapshot(cluster_id)
265            .expect("compute instance does not exist");
266        let (_, view_id) = self.allocate_transient_id();
267        let (_, sink_id) = self.allocate_transient_id();
268        let debug_name = format!("subscribe-{}", sink_id);
269        let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
270            .override_from(&self.catalog.get_cluster(cluster_id).config.features())
271            .override_from(&explain_ctx);
272
273        // Build an optimizer for this SUBSCRIBE.
274        let mut optimizer = optimize::subscribe::Optimizer::new(
275            self.owned_catalog(),
276            compute_instance,
277            view_id,
278            sink_id,
279            *with_snapshot,
280            *up_to,
281            debug_name,
282            optimizer_config,
283            self.optimizer_metrics(),
284        );
285        let catalog = self.owned_catalog();
286
287        let span = Span::current();
288        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
289            || "optimize subscribe (mir)",
290            move || {
291                span.in_scope(|| {
292                    let _dispatch_guard = explain_ctx.dispatch_guard();
293
294                    // MIR ⇒ MIR optimization (global)
295                    let global_mir_plan = optimizer.catch_unwind_optimize(plan.clone())?;
296                    // Add introduced indexes as validity dependencies.
297                    validity.extend_dependencies(
298                        global_mir_plan
299                            .id_bundle(optimizer.cluster_id())
300                            .iter()
301                            .map(|id| catalog.resolve_item_id(&id)),
302                    );
303
304                    let stage =
305                        SubscribeStage::TimestampOptimizeLir(SubscribeTimestampOptimizeLir {
306                            validity,
307                            plan,
308                            timeline,
309                            optimizer,
310                            global_mir_plan,
311                            dependency_ids,
312                            replica_id,
313                            explain_ctx,
314                        });
315                    Ok(Box::new(stage))
316                })
317            },
318        )))
319    }
320
321    #[instrument]
322    async fn subscribe_timestamp_optimize_lir(
323        &mut self,
324        ctx: &ExecuteContext,
325        SubscribeTimestampOptimizeLir {
326            validity,
327            plan,
328            timeline,
329            mut optimizer,
330            global_mir_plan,
331            dependency_ids,
332            replica_id,
333            explain_ctx,
334        }: SubscribeTimestampOptimizeLir,
335    ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
336        let plan::SubscribePlan { when, .. } = &plan;
337
338        // Timestamp selection
339        let oracle_read_ts = self.oracle_read_ts(ctx.session(), &timeline, when).await;
340        let bundle = &global_mir_plan.id_bundle(optimizer.cluster_id());
341        let (determination, read_holds) = self.determine_timestamp(
342            ctx.session(),
343            bundle,
344            when,
345            optimizer.cluster_id(),
346            &timeline,
347            oracle_read_ts,
348            None,
349        )?;
350
351        let as_of = determination.timestamp_context.timestamp_or_default();
352
353        if let Some(id) = ctx.extra().contents() {
354            self.set_statement_execution_timestamp(id, as_of);
355        }
356        if let Some(up_to) = optimizer.up_to() {
357            if as_of == up_to {
358                ctx.session()
359                    .add_notice(AdapterNotice::EqualSubscribeBounds { bound: up_to });
360            } else if as_of > up_to {
361                return Err(AdapterError::AbsurdSubscribeBounds { as_of, up_to });
362            }
363        }
364
365        self.store_transaction_read_holds(ctx.session().conn_id().clone(), read_holds);
366
367        let global_mir_plan = global_mir_plan.resolve(Antichain::from_elem(as_of));
368
369        // Optimize LIR
370        let span = Span::current();
371        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
372            || "optimize subscribe (lir)",
373            move || {
374                span.in_scope(|| {
375                    let _dispatch_guard = explain_ctx.dispatch_guard();
376
377                    let cluster_id = optimizer.cluster_id();
378
379                    let mut pipeline = || -> Result<_, AdapterError> {
380                        // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
381                        let global_lir_plan =
382                            optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
383                        Ok(global_lir_plan)
384                    };
385
386                    let stage = match pipeline() {
387                        Ok(global_lir_plan) => {
388                            if let ExplainContext::Plan(explain_ctx) = explain_ctx {
389                                let (_, df_meta) = global_lir_plan.unapply();
390                                SubscribeStage::Explain(SubscribeExplain {
391                                    validity,
392                                    optimizer,
393                                    df_meta,
394                                    cluster_id,
395                                    explain_ctx,
396                                })
397                            } else {
398                                SubscribeStage::Finish(SubscribeFinish {
399                                    validity,
400                                    cluster_id,
401                                    plan,
402                                    global_lir_plan,
403                                    dependency_ids,
404                                    replica_id,
405                                })
406                            }
407                        }
408                        Err(err) => {
409                            let ExplainContext::Plan(explain_ctx) = explain_ctx else {
410                                return Err(err);
411                            };
412
413                            if explain_ctx.broken {
414                                tracing::error!("error while handling EXPLAIN statement: {}", err);
415                                SubscribeStage::Explain(SubscribeExplain {
416                                    validity,
417                                    optimizer,
418                                    df_meta: Default::default(),
419                                    cluster_id,
420                                    explain_ctx,
421                                })
422                            } else {
423                                return Err(err);
424                            }
425                        }
426                    };
427
428                    Ok(Box::new(stage))
429                })
430            },
431        )))
432    }
433
434    #[instrument]
435    async fn subscribe_finish(
436        &mut self,
437        ctx: &mut ExecuteContext,
438        SubscribeFinish {
439            validity: _,
440            cluster_id,
441            plan,
442            global_lir_plan,
443            dependency_ids,
444            replica_id,
445        }: SubscribeFinish,
446    ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
447        let (df_desc, df_meta) = global_lir_plan.unapply();
448        emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices);
449        let conn_id = ctx.session.conn_id().clone();
450        let session_uuid = ctx.session().uuid();
451        let txn_read_holds = self
452            .txn_read_holds
453            .remove(&conn_id)
454            .expect("must have previously installed read holds");
455        let resp = self
456            .implement_subscribe(
457                ctx.extra_mut(),
458                df_desc,
459                dependency_ids,
460                cluster_id,
461                replica_id,
462                conn_id,
463                session_uuid,
464                txn_read_holds,
465                plan,
466            )
467            .await?;
468        Ok(StageResult::Response(resp))
469    }
470
471    #[instrument]
472    pub(crate) async fn implement_subscribe(
473        &mut self,
474        ctx_extra: &mut ExecuteContextGuard,
475        df_desc: DataflowDescription<Plan>,
476        dependency_ids: BTreeSet<GlobalId>,
477        cluster_id: ComputeInstanceId,
478        replica_id: Option<ReplicaId>,
479        conn_id: ConnectionId,
480        session_uuid: Uuid,
481        read_holds: ReadHolds<mz_repr::Timestamp>,
482        plan: plan::SubscribePlan,
483    ) -> Result<ExecuteResponse, AdapterError> {
484        let sink_id = df_desc.sink_id();
485
486        let (tx, rx) = mpsc::unbounded_channel();
487        let active_subscribe = ActiveSubscribe {
488            conn_id: conn_id.clone(),
489            session_uuid,
490            channel: tx,
491            emit_progress: plan.emit_progress,
492            as_of: df_desc
493                .as_of
494                .as_ref()
495                .and_then(|t| t.as_option())
496                .copied()
497                .expect("set to Some in an earlier stage"),
498            arity: df_desc
499                .sink_exports
500                .values()
501                .into_element()
502                .from_desc
503                .arity(),
504            cluster_id,
505            depends_on: dependency_ids,
506            start_time: self.now(),
507            output: plan.output,
508        };
509        active_subscribe.initialize();
510
511        // Add metadata for the new SUBSCRIBE.
512        let write_notify_fut = self
513            .add_active_compute_sink(sink_id, ActiveComputeSink::Subscribe(active_subscribe))
514            .await;
515        // Ship dataflow.
516        let ship_dataflow_fut = self.ship_dataflow(df_desc, cluster_id, replica_id);
517
518        // Both adding metadata for the new SUBSCRIBE and shipping the underlying dataflow, send
519        // requests to external services, which can take time, so we run them concurrently.
520        let ((), ()) = futures::future::join(write_notify_fut, ship_dataflow_fut).await;
521
522        // Explicitly drop read holds, just to make it obvious what's happening.
523        drop(read_holds);
524
525        let resp = ExecuteResponse::Subscribing {
526            rx,
527            ctx_extra: std::mem::take(ctx_extra),
528            instance_id: cluster_id,
529        };
530        let resp = match plan.copy_to {
531            None => resp,
532            Some(format) => ExecuteResponse::CopyTo {
533                format,
534                resp: Box::new(resp),
535            },
536        };
537        Ok(resp)
538    }
539
540    #[instrument]
541    async fn subscribe_explain(
542        &self,
543        session: &Session,
544        SubscribeExplain {
545            optimizer,
546            df_meta,
547            cluster_id,
548            explain_ctx:
549                ExplainPlanContext {
550                    config,
551                    format,
552                    stage,
553                    optimizer_trace,
554                    desc,
555                    ..
556                },
557            ..
558        }: SubscribeExplain,
559    ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
560        let session_catalog = self.catalog().for_session(session);
561
562        let expr_humanizer = {
563            let transient_items = btreemap! {
564                optimizer.sink_id() => TransientItem::new(
565                    Some(vec![GlobalId::Explain.to_string()]),
566                    desc.map(|d| d.iter_names().map(|c| c.to_string()).collect()),
567                )
568            };
569            ExprHumanizerExt::new(transient_items, &session_catalog)
570        };
571
572        let target_cluster = self.catalog().get_cluster(cluster_id);
573
574        let features = OptimizerFeatures::from(self.catalog().system_config())
575            .override_from(&target_cluster.config.features())
576            .override_from(&config.features);
577
578        let rows = optimizer_trace
579            .into_rows(
580                format,
581                &config,
582                &features,
583                &expr_humanizer,
584                None,
585                Some(target_cluster),
586                df_meta,
587                stage,
588                plan::ExplaineeStatementKind::Subscribe,
589                None,
590            )
591            .await?;
592
593        Ok(StageResult::Response(Self::send_immediate_rows(rows)))
594    }
595}