Skip to main content

mz_adapter/coord/sequencer/inner/
subscribe.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use maplit::btreemap;
11use mz_adapter_types::connection::ConnectionId;
12use mz_cluster_client::ReplicaId;
13use mz_compute_types::ComputeInstanceId;
14use mz_compute_types::dataflows::DataflowDescription;
15use mz_compute_types::plan::Plan;
16use mz_ore::collections::CollectionExt;
17use mz_ore::instrument;
18use mz_repr::GlobalId;
19use mz_repr::explain::{ExprHumanizerExt, TransientItem};
20use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
21use mz_sql::plan::{self, QueryWhen, SubscribeFrom};
22use mz_sql::session::metadata::SessionMetadata;
23use std::collections::BTreeSet;
24use timely::progress::Antichain;
25use tokio::sync::mpsc;
26use tracing::Span;
27use uuid::Uuid;
28
29use crate::active_compute_sink::{ActiveComputeSink, ActiveSubscribe};
30use crate::command::ExecuteResponse;
31use crate::coord::sequencer::inner::return_if_err;
32use crate::coord::sequencer::{check_log_reads, emit_optimizer_notices};
33use crate::coord::{
34    Coordinator, ExplainContext, ExplainPlanContext, Message, PlanValidity, StageResult, Staged,
35    SubscribeExplain, SubscribeFinish, SubscribeOptimizeMir, SubscribeStage,
36    SubscribeTimestampOptimizeLir, TargetCluster,
37};
38use crate::error::AdapterError;
39use crate::explain::optimizer_trace::OptimizerTrace;
40use crate::optimize::Optimize;
41use crate::session::{Session, TransactionOps};
42use crate::{
43    AdapterNotice, ExecuteContext, ExecuteContextGuard, ReadHolds, TimelineContext, optimize,
44};
45
46impl Staged for SubscribeStage {
47    type Ctx = ExecuteContext;
48
49    fn validity(&mut self) -> &mut PlanValidity {
50        match self {
51            SubscribeStage::OptimizeMir(stage) => &mut stage.validity,
52            SubscribeStage::TimestampOptimizeLir(stage) => &mut stage.validity,
53            SubscribeStage::Finish(stage) => &mut stage.validity,
54            SubscribeStage::Explain(stage) => &mut stage.validity,
55        }
56    }
57
58    async fn stage(
59        self,
60        coord: &mut Coordinator,
61        ctx: &mut ExecuteContext,
62    ) -> Result<StageResult<Box<Self>>, AdapterError> {
63        match self {
64            SubscribeStage::OptimizeMir(stage) => coord.subscribe_optimize_mir(stage),
65            SubscribeStage::TimestampOptimizeLir(stage) => {
66                coord.subscribe_timestamp_optimize_lir(ctx, stage).await
67            }
68            SubscribeStage::Finish(stage) => coord.subscribe_finish(ctx, stage).await,
69            SubscribeStage::Explain(stage) => coord.subscribe_explain(ctx.session(), stage).await,
70        }
71    }
72
73    fn message(self, ctx: ExecuteContext, span: Span) -> Message {
74        Message::SubscribeStageReady {
75            ctx,
76            span,
77            stage: self,
78        }
79    }
80
81    fn cancel_enabled(&self) -> bool {
82        true
83    }
84}
85
86impl Coordinator {
87    #[instrument]
88    pub(crate) async fn sequence_subscribe(
89        &mut self,
90        mut ctx: ExecuteContext,
91        plan: plan::SubscribePlan,
92        target_cluster: TargetCluster,
93    ) {
94        let stage = return_if_err!(
95            self.subscribe_validate(
96                ctx.session_mut(),
97                plan,
98                target_cluster,
99                ExplainContext::None
100            ),
101            ctx
102        );
103        self.sequence_staged(ctx, Span::current(), stage).await;
104    }
105
106    #[instrument]
107    pub(crate) async fn explain_subscribe(
108        &mut self,
109        mut ctx: ExecuteContext,
110        plan::ExplainPlanPlan {
111            stage,
112            format,
113            config,
114            explainee,
115        }: plan::ExplainPlanPlan,
116        target_cluster: TargetCluster,
117    ) {
118        let plan::Explainee::Statement(stmt) = explainee else {
119            // This is currently asserted in the `sequence_explain_plan` code that
120            // calls this method.
121            unreachable!()
122        };
123        let plan::ExplaineeStatement::Subscribe { broken, plan } = stmt else {
124            // This is currently asserted in the `sequence_explain_plan` code that
125            // calls this method.
126            unreachable!()
127        };
128
129        let desc = match &plan.from {
130            SubscribeFrom::Id(_) => None,
131            SubscribeFrom::Query { desc, .. } => Some(desc.clone()),
132        };
133
134        // Create an OptimizerTrace instance to collect plans emitted when
135        // executing the optimizer pipeline.
136        let optimizer_trace = OptimizerTrace::new(stage.paths());
137
138        let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
139            broken,
140            config,
141            format,
142            stage,
143            replan: None,
144            desc,
145            optimizer_trace,
146        });
147        let stage = return_if_err!(
148            self.subscribe_validate(ctx.session_mut(), plan, target_cluster, explain_ctx),
149            ctx
150        );
151        self.sequence_staged(ctx, Span::current(), stage).await;
152    }
153
154    #[instrument]
155    fn subscribe_validate(
156        &self,
157        session: &mut Session,
158        plan: plan::SubscribePlan,
159        target_cluster: TargetCluster,
160        explain_ctx: ExplainContext,
161    ) -> Result<SubscribeStage, AdapterError> {
162        let plan::SubscribePlan { from, when, .. } = &plan;
163
164        let cluster = self
165            .catalog()
166            .resolve_target_cluster(target_cluster, session)?;
167        let cluster_id = cluster.id;
168
169        // Only check cluster replicas if we're not in explain mode.
170        if explain_ctx.needs_cluster() && cluster.replicas().next().is_none() {
171            return Err(AdapterError::NoClusterReplicasAvailable {
172                name: cluster.name.clone(),
173                is_managed: cluster.is_managed(),
174            });
175        }
176
177        let mut replica_id = session
178            .vars()
179            .cluster_replica()
180            .map(|name| {
181                cluster
182                    .replica_id(name)
183                    .ok_or(AdapterError::UnknownClusterReplica {
184                        cluster_name: cluster.name.clone(),
185                        replica_name: name.to_string(),
186                    })
187            })
188            .transpose()?;
189
190        // SUBSCRIBE AS OF, similar to peeks, doesn't need to worry about transaction
191        // timestamp semantics.
192        if explain_ctx.needs_cluster() && when == &QueryWhen::Immediately {
193            // If this isn't a SUBSCRIBE AS OF, the SUBSCRIBE can be in a transaction if it's the
194            // only operation.
195            session.add_transaction_ops(TransactionOps::Subscribe)?;
196        }
197
198        let depends_on = from.depends_on();
199
200        // Run `check_log_reads` and emit notices.
201        let notices = check_log_reads(
202            self.catalog(),
203            cluster,
204            &depends_on,
205            &mut replica_id,
206            session.vars(),
207        )?;
208        session.add_notices(notices);
209
210        // Determine timeline.
211        let mut timeline = self
212            .catalog()
213            .validate_timeline_context(depends_on.iter().copied())?;
214        if matches!(timeline, TimelineContext::TimestampIndependent) && from.contains_temporal() {
215            // If the from IDs are timestamp independent but the query contains temporal functions
216            // then the timeline context needs to be upgraded to timestamp dependent.
217            timeline = TimelineContext::TimestampDependent;
218        }
219
220        let dependencies = depends_on
221            .iter()
222            .map(|id| self.catalog().resolve_item_id(id))
223            .collect();
224        let validity = PlanValidity::new(
225            self.catalog().transient_revision(),
226            dependencies,
227            Some(cluster_id),
228            replica_id,
229            session.role_metadata().clone(),
230        );
231
232        Ok(SubscribeStage::OptimizeMir(SubscribeOptimizeMir {
233            validity,
234            plan,
235            timeline,
236            dependency_ids: depends_on,
237            cluster_id,
238            replica_id,
239            explain_ctx,
240        }))
241    }
242
243    #[instrument]
244    fn subscribe_optimize_mir(
245        &self,
246        SubscribeOptimizeMir {
247            mut validity,
248            plan,
249            timeline,
250            dependency_ids,
251            cluster_id,
252            replica_id,
253            explain_ctx,
254        }: SubscribeOptimizeMir,
255    ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
256        let plan::SubscribePlan {
257            with_snapshot,
258            up_to,
259            ..
260        } = &plan;
261
262        // Collect optimizer parameters.
263        let compute_instance = self
264            .instance_snapshot(cluster_id)
265            .expect("compute instance does not exist");
266        let (_, view_id) = self.allocate_transient_id();
267        let (_, sink_id) = self.allocate_transient_id();
268        let debug_name = format!("subscribe-{}", sink_id);
269        let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
270            .override_from(&self.catalog.get_cluster(cluster_id).config.features())
271            .override_from(&self.cluster_scoped_optimizer_overrides(cluster_id))
272            .override_from(&explain_ctx);
273
274        // Build an optimizer for this SUBSCRIBE.
275        let mut optimizer = optimize::subscribe::Optimizer::new(
276            self.owned_catalog(),
277            compute_instance,
278            view_id,
279            sink_id,
280            *with_snapshot,
281            *up_to,
282            debug_name,
283            optimizer_config,
284            self.optimizer_metrics(),
285        );
286        let catalog = self.owned_catalog();
287
288        let span = Span::current();
289        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
290            || "optimize subscribe (mir)",
291            move || {
292                span.in_scope(|| {
293                    let _dispatch_guard = explain_ctx.dispatch_guard();
294
295                    // MIR ⇒ MIR optimization (global)
296                    let global_mir_plan = optimizer.catch_unwind_optimize(plan.clone())?;
297                    // Add introduced indexes as validity dependencies.
298                    validity.extend_dependencies(
299                        global_mir_plan
300                            .id_bundle(optimizer.cluster_id())
301                            .iter()
302                            .map(|id| catalog.resolve_item_id(&id)),
303                    );
304
305                    let stage =
306                        SubscribeStage::TimestampOptimizeLir(SubscribeTimestampOptimizeLir {
307                            validity,
308                            plan,
309                            timeline,
310                            optimizer,
311                            global_mir_plan,
312                            dependency_ids,
313                            replica_id,
314                            explain_ctx,
315                        });
316                    Ok(Box::new(stage))
317                })
318            },
319        )))
320    }
321
322    #[instrument]
323    async fn subscribe_timestamp_optimize_lir(
324        &mut self,
325        ctx: &ExecuteContext,
326        SubscribeTimestampOptimizeLir {
327            validity,
328            plan,
329            timeline,
330            mut optimizer,
331            global_mir_plan,
332            dependency_ids,
333            replica_id,
334            explain_ctx,
335        }: SubscribeTimestampOptimizeLir,
336    ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
337        let plan::SubscribePlan { when, .. } = &plan;
338
339        // Timestamp selection
340        let oracle_read_ts = self.oracle_read_ts(ctx.session(), &timeline, when).await;
341        let bundle = &global_mir_plan.id_bundle(optimizer.cluster_id());
342        let (determination, read_holds) = self.determine_timestamp(
343            ctx.session(),
344            bundle,
345            when,
346            optimizer.cluster_id(),
347            &timeline,
348            oracle_read_ts,
349            None,
350        )?;
351
352        let as_of = determination.timestamp_context.timestamp_or_default();
353
354        if let Some(id) = ctx.extra().contents() {
355            self.set_statement_execution_timestamp(id, as_of);
356        }
357        if let Some(up_to) = optimizer.up_to() {
358            if as_of == up_to {
359                ctx.session()
360                    .add_notice(AdapterNotice::EqualSubscribeBounds { bound: up_to });
361            } else if as_of > up_to {
362                return Err(AdapterError::AbsurdSubscribeBounds { as_of, up_to });
363            }
364        }
365
366        self.store_transaction_read_holds(ctx.session().conn_id().clone(), read_holds);
367
368        let global_mir_plan = global_mir_plan.resolve(Antichain::from_elem(as_of));
369
370        // Optimize LIR
371        let span = Span::current();
372        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
373            || "optimize subscribe (lir)",
374            move || {
375                span.in_scope(|| {
376                    let _dispatch_guard = explain_ctx.dispatch_guard();
377
378                    let cluster_id = optimizer.cluster_id();
379
380                    let mut pipeline = || -> Result<_, AdapterError> {
381                        // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
382                        let global_lir_plan =
383                            optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
384                        Ok(global_lir_plan)
385                    };
386
387                    let stage = match pipeline() {
388                        Ok(global_lir_plan) => {
389                            if let ExplainContext::Plan(explain_ctx) = explain_ctx {
390                                let (_, df_meta) = global_lir_plan.unapply();
391                                SubscribeStage::Explain(SubscribeExplain {
392                                    validity,
393                                    optimizer,
394                                    df_meta,
395                                    cluster_id,
396                                    explain_ctx,
397                                })
398                            } else {
399                                SubscribeStage::Finish(SubscribeFinish {
400                                    validity,
401                                    cluster_id,
402                                    plan,
403                                    global_lir_plan,
404                                    dependency_ids,
405                                    replica_id,
406                                })
407                            }
408                        }
409                        Err(err) => {
410                            let ExplainContext::Plan(explain_ctx) = explain_ctx else {
411                                return Err(err);
412                            };
413
414                            if explain_ctx.broken {
415                                tracing::error!("error while handling EXPLAIN statement: {}", err);
416                                SubscribeStage::Explain(SubscribeExplain {
417                                    validity,
418                                    optimizer,
419                                    df_meta: Default::default(),
420                                    cluster_id,
421                                    explain_ctx,
422                                })
423                            } else {
424                                return Err(err);
425                            }
426                        }
427                    };
428
429                    Ok(Box::new(stage))
430                })
431            },
432        )))
433    }
434
435    #[instrument]
436    async fn subscribe_finish(
437        &mut self,
438        ctx: &mut ExecuteContext,
439        SubscribeFinish {
440            validity: _,
441            cluster_id,
442            plan,
443            global_lir_plan,
444            dependency_ids,
445            replica_id,
446        }: SubscribeFinish,
447    ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
448        let (df_desc, df_meta) = global_lir_plan.unapply();
449        emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices);
450        let conn_id = ctx.session.conn_id().clone();
451        let session_uuid = ctx.session().uuid();
452        let txn_read_holds = self
453            .txn_read_holds
454            .remove(&conn_id)
455            .expect("must have previously installed read holds");
456        let resp = self
457            .implement_subscribe(
458                ctx.extra_mut(),
459                df_desc,
460                dependency_ids,
461                cluster_id,
462                replica_id,
463                conn_id,
464                session_uuid,
465                txn_read_holds,
466                plan,
467            )
468            .await?;
469        Ok(StageResult::Response(resp))
470    }
471
472    #[instrument]
473    pub(crate) async fn implement_subscribe(
474        &mut self,
475        ctx_extra: &mut ExecuteContextGuard,
476        df_desc: DataflowDescription<Plan>,
477        dependency_ids: BTreeSet<GlobalId>,
478        cluster_id: ComputeInstanceId,
479        replica_id: Option<ReplicaId>,
480        conn_id: ConnectionId,
481        session_uuid: Uuid,
482        read_holds: ReadHolds,
483        plan: plan::SubscribePlan,
484    ) -> Result<ExecuteResponse, AdapterError> {
485        let sink_id = df_desc.sink_id();
486
487        let (tx, rx) = mpsc::unbounded_channel();
488        let active_subscribe = ActiveSubscribe {
489            conn_id: conn_id.clone(),
490            session_uuid,
491            channel: tx,
492            emit_progress: plan.emit_progress,
493            as_of: df_desc
494                .as_of
495                .as_ref()
496                .and_then(|t| t.as_option())
497                .copied()
498                .expect("set to Some in an earlier stage"),
499            arity: df_desc
500                .sink_exports
501                .values()
502                .into_element()
503                .from_desc
504                .arity(),
505            cluster_id,
506            depends_on: dependency_ids,
507            start_time: self.now(),
508            output: plan.output,
509            internal: false,
510        };
511        active_subscribe.initialize();
512
513        // Add metadata for the new SUBSCRIBE.
514        let write_notify_fut = self
515            .add_active_compute_sink(sink_id, ActiveComputeSink::Subscribe(active_subscribe))
516            .await;
517        // Ship dataflow.
518        let ship_dataflow_fut = self.ship_dataflow(df_desc, cluster_id, replica_id);
519
520        // Both adding metadata for the new SUBSCRIBE and shipping the underlying dataflow, send
521        // requests to external services, which can take time, so we run them concurrently.
522        let ((), ()) = futures::future::join(write_notify_fut, ship_dataflow_fut).await;
523
524        // Explicitly drop read holds, just to make it obvious what's happening.
525        drop(read_holds);
526
527        let resp = ExecuteResponse::Subscribing {
528            rx,
529            ctx_extra: std::mem::take(ctx_extra),
530            instance_id: cluster_id,
531        };
532        let resp = match plan.copy_to {
533            None => resp,
534            Some(format) => ExecuteResponse::CopyTo {
535                format,
536                resp: Box::new(resp),
537            },
538        };
539        Ok(resp)
540    }
541
542    #[instrument]
543    async fn subscribe_explain(
544        &self,
545        session: &Session,
546        SubscribeExplain {
547            optimizer,
548            df_meta,
549            cluster_id,
550            explain_ctx:
551                ExplainPlanContext {
552                    config,
553                    format,
554                    stage,
555                    optimizer_trace,
556                    desc,
557                    ..
558                },
559            ..
560        }: SubscribeExplain,
561    ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
562        let session_catalog = self.catalog().for_session(session);
563
564        let expr_humanizer = {
565            let transient_items = btreemap! {
566                optimizer.sink_id() => TransientItem::new(
567                    Some(vec![GlobalId::Explain.to_string()]),
568                    desc.map(|d| d.iter_names().map(|c| c.to_string()).collect()),
569                )
570            };
571            ExprHumanizerExt::new(transient_items, &session_catalog)
572        };
573
574        let target_cluster = self.catalog().get_cluster(cluster_id);
575
576        let features = OptimizerFeatures::from(self.catalog().system_config())
577            .override_from(&target_cluster.config.features())
578            .override_from(&self.cluster_scoped_optimizer_overrides(cluster_id))
579            .override_from(&config.features);
580
581        let rows = optimizer_trace
582            .into_rows(
583                format,
584                &config,
585                &features,
586                &expr_humanizer,
587                None,
588                Some(target_cluster),
589                df_meta,
590                stage,
591                plan::ExplaineeStatementKind::Subscribe,
592                None,
593            )
594            .await?;
595
596        Ok(StageResult::Response(Self::send_immediate_rows(rows)))
597    }
598}