Skip to main content

mz_adapter/coord/sequencer/inner/
subscribe.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use maplit::btreemap;
11use mz_ore::instrument;
12use mz_repr::GlobalId;
13use mz_repr::explain::{ExprHumanizerExt, TransientItem};
14use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
15use mz_sql::plan::{self, QueryWhen, SubscribeFrom};
16use mz_sql::session::metadata::SessionMetadata;
17use timely::progress::Antichain;
18use tokio::sync::mpsc;
19use tracing::Span;
20
21use crate::active_compute_sink::{ActiveComputeSink, ActiveSubscribe};
22use crate::command::ExecuteResponse;
23use crate::coord::sequencer::inner::return_if_err;
24use crate::coord::sequencer::{check_log_reads, emit_optimizer_notices};
25use crate::coord::{
26    Coordinator, ExplainContext, ExplainPlanContext, Message, PlanValidity, StageResult, Staged,
27    SubscribeExplain, SubscribeFinish, SubscribeOptimizeMir, SubscribeStage,
28    SubscribeTimestampOptimizeLir, TargetCluster,
29};
30use crate::error::AdapterError;
31use crate::explain::optimizer_trace::OptimizerTrace;
32use crate::optimize::Optimize;
33use crate::session::{Session, TransactionOps};
34use crate::{AdapterNotice, ExecuteContext, TimelineContext, optimize};
35
36impl Staged for SubscribeStage {
37    type Ctx = ExecuteContext;
38
39    fn validity(&mut self) -> &mut PlanValidity {
40        match self {
41            SubscribeStage::OptimizeMir(stage) => &mut stage.validity,
42            SubscribeStage::TimestampOptimizeLir(stage) => &mut stage.validity,
43            SubscribeStage::Finish(stage) => &mut stage.validity,
44            SubscribeStage::Explain(stage) => &mut stage.validity,
45        }
46    }
47
48    async fn stage(
49        self,
50        coord: &mut Coordinator,
51        ctx: &mut ExecuteContext,
52    ) -> Result<StageResult<Box<Self>>, AdapterError> {
53        match self {
54            SubscribeStage::OptimizeMir(stage) => coord.subscribe_optimize_mir(stage),
55            SubscribeStage::TimestampOptimizeLir(stage) => {
56                coord.subscribe_timestamp_optimize_lir(ctx, stage).await
57            }
58            SubscribeStage::Finish(stage) => coord.subscribe_finish(ctx, stage).await,
59            SubscribeStage::Explain(stage) => coord.subscribe_explain(ctx.session(), stage).await,
60        }
61    }
62
63    fn message(self, ctx: ExecuteContext, span: Span) -> Message {
64        Message::SubscribeStageReady {
65            ctx,
66            span,
67            stage: self,
68        }
69    }
70
71    fn cancel_enabled(&self) -> bool {
72        true
73    }
74}
75
76impl Coordinator {
77    #[instrument]
78    pub(crate) async fn sequence_subscribe(
79        &mut self,
80        mut ctx: ExecuteContext,
81        plan: plan::SubscribePlan,
82        target_cluster: TargetCluster,
83    ) {
84        let stage = return_if_err!(
85            self.subscribe_validate(
86                ctx.session_mut(),
87                plan,
88                target_cluster,
89                ExplainContext::None
90            ),
91            ctx
92        );
93        self.sequence_staged(ctx, Span::current(), stage).await;
94    }
95
96    #[instrument]
97    pub(crate) async fn explain_subscribe(
98        &mut self,
99        mut ctx: ExecuteContext,
100        plan::ExplainPlanPlan {
101            stage,
102            format,
103            config,
104            explainee,
105        }: plan::ExplainPlanPlan,
106        target_cluster: TargetCluster,
107    ) {
108        let plan::Explainee::Statement(stmt) = explainee else {
109            // This is currently asserted in the `sequence_explain_plan` code that
110            // calls this method.
111            unreachable!()
112        };
113        let plan::ExplaineeStatement::Subscribe { broken, plan } = stmt else {
114            // This is currently asserted in the `sequence_explain_plan` code that
115            // calls this method.
116            unreachable!()
117        };
118
119        let desc = match &plan.from {
120            SubscribeFrom::Id(_) => None,
121            SubscribeFrom::Query { desc, .. } => Some(desc.clone()),
122        };
123
124        // Create an OptimizerTrace instance to collect plans emitted when
125        // executing the optimizer pipeline.
126        let optimizer_trace = OptimizerTrace::new(stage.paths());
127
128        let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
129            broken,
130            config,
131            format,
132            stage,
133            replan: None,
134            desc,
135            optimizer_trace,
136        });
137        let stage = return_if_err!(
138            self.subscribe_validate(ctx.session_mut(), plan, target_cluster, explain_ctx),
139            ctx
140        );
141        self.sequence_staged(ctx, Span::current(), stage).await;
142    }
143
144    #[instrument]
145    fn subscribe_validate(
146        &self,
147        session: &mut Session,
148        plan: plan::SubscribePlan,
149        target_cluster: TargetCluster,
150        explain_ctx: ExplainContext,
151    ) -> Result<SubscribeStage, AdapterError> {
152        let plan::SubscribePlan { from, when, .. } = &plan;
153
154        let cluster = self
155            .catalog()
156            .resolve_target_cluster(target_cluster, session)?;
157        let cluster_id = cluster.id;
158
159        // Only check cluster replicas if we're not in explain mode.
160        if explain_ctx.needs_cluster() && cluster.replicas().next().is_none() {
161            return Err(AdapterError::NoClusterReplicasAvailable {
162                name: cluster.name.clone(),
163                is_managed: cluster.is_managed(),
164            });
165        }
166
167        let mut replica_id = session
168            .vars()
169            .cluster_replica()
170            .map(|name| {
171                cluster
172                    .replica_id(name)
173                    .ok_or(AdapterError::UnknownClusterReplica {
174                        cluster_name: cluster.name.clone(),
175                        replica_name: name.to_string(),
176                    })
177            })
178            .transpose()?;
179
180        // SUBSCRIBE AS OF, similar to peeks, doesn't need to worry about transaction
181        // timestamp semantics.
182        if when == &QueryWhen::Immediately {
183            // If this isn't a SUBSCRIBE AS OF, the SUBSCRIBE can be in a transaction if it's the
184            // only operation.
185            session.add_transaction_ops(TransactionOps::Subscribe)?;
186        }
187
188        let depends_on = from.depends_on();
189
190        // Run `check_log_reads` and emit notices.
191        let notices = check_log_reads(
192            self.catalog(),
193            cluster,
194            &depends_on,
195            &mut replica_id,
196            session.vars(),
197        )?;
198        session.add_notices(notices);
199
200        // Determine timeline.
201        let mut timeline = self
202            .catalog()
203            .validate_timeline_context(depends_on.iter().copied())?;
204        if matches!(timeline, TimelineContext::TimestampIndependent) && from.contains_temporal() {
205            // If the from IDs are timestamp independent but the query contains temporal functions
206            // then the timeline context needs to be upgraded to timestamp dependent.
207            timeline = TimelineContext::TimestampDependent;
208        }
209
210        let dependencies = depends_on
211            .iter()
212            .map(|id| self.catalog().resolve_item_id(id))
213            .collect();
214        let validity = PlanValidity::new(
215            self.catalog().transient_revision(),
216            dependencies,
217            Some(cluster_id),
218            replica_id,
219            session.role_metadata().clone(),
220        );
221
222        Ok(SubscribeStage::OptimizeMir(SubscribeOptimizeMir {
223            validity,
224            plan,
225            timeline,
226            dependency_ids: depends_on,
227            cluster_id,
228            replica_id,
229            explain_ctx,
230        }))
231    }
232
233    #[instrument]
234    fn subscribe_optimize_mir(
235        &self,
236        SubscribeOptimizeMir {
237            mut validity,
238            plan,
239            timeline,
240            dependency_ids,
241            cluster_id,
242            replica_id,
243            explain_ctx,
244        }: SubscribeOptimizeMir,
245    ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
246        let plan::SubscribePlan {
247            with_snapshot,
248            up_to,
249            ..
250        } = &plan;
251
252        // Collect optimizer parameters.
253        let compute_instance = self
254            .instance_snapshot(cluster_id)
255            .expect("compute instance does not exist");
256        let (_, view_id) = self.allocate_transient_id();
257        let (_, sink_id) = self.allocate_transient_id();
258        let debug_name = format!("subscribe-{}", sink_id);
259        let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
260            .override_from(&self.catalog.get_cluster(cluster_id).config.features())
261            .override_from(&explain_ctx);
262
263        // Build an optimizer for this SUBSCRIBE.
264        let mut optimizer = optimize::subscribe::Optimizer::new(
265            self.owned_catalog(),
266            compute_instance,
267            view_id,
268            sink_id,
269            *with_snapshot,
270            *up_to,
271            debug_name,
272            optimizer_config,
273            self.optimizer_metrics(),
274        );
275        let catalog = self.owned_catalog();
276
277        let span = Span::current();
278        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
279            || "optimize subscribe (mir)",
280            move || {
281                span.in_scope(|| {
282                    let _dispatch_guard = explain_ctx.dispatch_guard();
283
284                    // MIR ⇒ MIR optimization (global)
285                    let global_mir_plan = optimizer.catch_unwind_optimize(plan.from.clone())?;
286                    // Add introduced indexes as validity dependencies.
287                    validity.extend_dependencies(
288                        global_mir_plan
289                            .id_bundle(optimizer.cluster_id())
290                            .iter()
291                            .map(|id| catalog.resolve_item_id(&id)),
292                    );
293
294                    let stage =
295                        SubscribeStage::TimestampOptimizeLir(SubscribeTimestampOptimizeLir {
296                            validity,
297                            plan,
298                            timeline,
299                            optimizer,
300                            global_mir_plan,
301                            dependency_ids,
302                            replica_id,
303                            explain_ctx,
304                        });
305                    Ok(Box::new(stage))
306                })
307            },
308        )))
309    }
310
311    #[instrument]
312    async fn subscribe_timestamp_optimize_lir(
313        &mut self,
314        ctx: &ExecuteContext,
315        SubscribeTimestampOptimizeLir {
316            validity,
317            plan,
318            timeline,
319            mut optimizer,
320            global_mir_plan,
321            dependency_ids,
322            replica_id,
323            explain_ctx,
324        }: SubscribeTimestampOptimizeLir,
325    ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
326        let plan::SubscribePlan { when, .. } = &plan;
327
328        // Timestamp selection
329        let oracle_read_ts = self.oracle_read_ts(ctx.session(), &timeline, when).await;
330        let bundle = &global_mir_plan.id_bundle(optimizer.cluster_id());
331        let (determination, read_holds) = self.determine_timestamp(
332            ctx.session(),
333            bundle,
334            when,
335            optimizer.cluster_id(),
336            &timeline,
337            oracle_read_ts,
338            None,
339        )?;
340
341        let as_of = determination.timestamp_context.timestamp_or_default();
342
343        if let Some(id) = ctx.extra().contents() {
344            self.set_statement_execution_timestamp(id, as_of);
345        }
346        if let Some(up_to) = optimizer.up_to() {
347            if as_of == up_to {
348                ctx.session()
349                    .add_notice(AdapterNotice::EqualSubscribeBounds { bound: up_to });
350            } else if as_of > up_to {
351                return Err(AdapterError::AbsurdSubscribeBounds { as_of, up_to });
352            }
353        }
354
355        self.store_transaction_read_holds(ctx.session().conn_id().clone(), read_holds);
356
357        let global_mir_plan = global_mir_plan.resolve(Antichain::from_elem(as_of));
358
359        // Optimize LIR
360        let span = Span::current();
361        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
362            || "optimize subscribe (lir)",
363            move || {
364                span.in_scope(|| {
365                    let _dispatch_guard = explain_ctx.dispatch_guard();
366
367                    let cluster_id = optimizer.cluster_id();
368
369                    let mut pipeline = || -> Result<_, AdapterError> {
370                        // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
371                        let global_lir_plan =
372                            optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
373                        Ok(global_lir_plan)
374                    };
375
376                    let stage = match pipeline() {
377                        Ok(global_lir_plan) => {
378                            if let ExplainContext::Plan(explain_ctx) = explain_ctx {
379                                let (_, df_meta) = global_lir_plan.unapply();
380                                SubscribeStage::Explain(SubscribeExplain {
381                                    validity,
382                                    optimizer,
383                                    df_meta,
384                                    cluster_id,
385                                    explain_ctx,
386                                })
387                            } else {
388                                SubscribeStage::Finish(SubscribeFinish {
389                                    validity,
390                                    cluster_id,
391                                    plan,
392                                    global_lir_plan,
393                                    dependency_ids,
394                                    replica_id,
395                                })
396                            }
397                        }
398                        Err(err) => {
399                            let ExplainContext::Plan(explain_ctx) = explain_ctx else {
400                                return Err(err);
401                            };
402
403                            if explain_ctx.broken {
404                                tracing::error!("error while handling EXPLAIN statement: {}", err);
405                                SubscribeStage::Explain(SubscribeExplain {
406                                    validity,
407                                    optimizer,
408                                    df_meta: Default::default(),
409                                    cluster_id,
410                                    explain_ctx,
411                                })
412                            } else {
413                                return Err(err);
414                            }
415                        }
416                    };
417
418                    Ok(Box::new(stage))
419                })
420            },
421        )))
422    }
423
424    #[instrument]
425    async fn subscribe_finish(
426        &mut self,
427        ctx: &mut ExecuteContext,
428        SubscribeFinish {
429            validity: _,
430            cluster_id,
431            plan:
432                plan::SubscribePlan {
433                    copy_to,
434                    emit_progress,
435                    output,
436                    ..
437                },
438            global_lir_plan,
439            dependency_ids,
440            replica_id,
441        }: SubscribeFinish,
442    ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
443        let sink_id = global_lir_plan.sink_id();
444
445        let (tx, rx) = mpsc::unbounded_channel();
446        let active_subscribe = ActiveSubscribe {
447            conn_id: ctx.session().conn_id().clone(),
448            session_uuid: ctx.session().uuid(),
449            channel: tx,
450            emit_progress,
451            as_of: global_lir_plan
452                .as_of()
453                .expect("set to Some in an earlier stage"),
454            arity: global_lir_plan.sink_desc().from_desc.arity(),
455            cluster_id,
456            depends_on: dependency_ids,
457            start_time: self.now(),
458            output,
459        };
460        active_subscribe.initialize();
461
462        let (df_desc, df_meta) = global_lir_plan.unapply();
463
464        // Emit notices.
465        emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices);
466
467        // Add metadata for the new SUBSCRIBE.
468        let write_notify_fut = self
469            .add_active_compute_sink(sink_id, ActiveComputeSink::Subscribe(active_subscribe))
470            .await;
471        // Ship dataflow.
472        let ship_dataflow_fut = self.ship_dataflow(df_desc, cluster_id, replica_id);
473
474        // Both adding metadata for the new SUBSCRIBE and shipping the underlying dataflow, send
475        // requests to external services, which can take time, so we run them concurrently.
476        let ((), ()) = futures::future::join(write_notify_fut, ship_dataflow_fut).await;
477
478        // Release the pre-optimization read holds because the controller is now handling those.
479        let txn_read_holds = self
480            .txn_read_holds
481            .remove(ctx.session().conn_id())
482            .expect("must have previously installed read holds");
483
484        // Explicitly drop read holds, just to make it obvious what's happening.
485        drop(txn_read_holds);
486
487        let resp = ExecuteResponse::Subscribing {
488            rx,
489            ctx_extra: std::mem::take(ctx.extra_mut()),
490            instance_id: cluster_id,
491        };
492        let resp = match copy_to {
493            None => resp,
494            Some(format) => ExecuteResponse::CopyTo {
495                format,
496                resp: Box::new(resp),
497            },
498        };
499        Ok(StageResult::Response(resp))
500    }
501
502    #[instrument]
503    async fn subscribe_explain(
504        &self,
505        session: &Session,
506        SubscribeExplain {
507            optimizer,
508            df_meta,
509            cluster_id,
510            explain_ctx:
511                ExplainPlanContext {
512                    config,
513                    format,
514                    stage,
515                    optimizer_trace,
516                    desc,
517                    ..
518                },
519            ..
520        }: SubscribeExplain,
521    ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
522        let session_catalog = self.catalog().for_session(session);
523
524        let expr_humanizer = {
525            let transient_items = btreemap! {
526                optimizer.sink_id() => TransientItem::new(
527                    Some(vec![GlobalId::Explain.to_string()]),
528                    desc.map(|d| d.iter_names().map(|c| c.to_string()).collect()),
529                )
530            };
531            ExprHumanizerExt::new(transient_items, &session_catalog)
532        };
533
534        let target_cluster = self.catalog().get_cluster(cluster_id);
535
536        let features = OptimizerFeatures::from(self.catalog().system_config())
537            .override_from(&target_cluster.config.features())
538            .override_from(&config.features);
539
540        let rows = optimizer_trace
541            .into_rows(
542                format,
543                &config,
544                &features,
545                &expr_humanizer,
546                None,
547                Some(target_cluster),
548                df_meta,
549                stage,
550                plan::ExplaineeStatementKind::Subscribe,
551                None,
552            )
553            .await?;
554
555        Ok(StageResult::Response(Self::send_immediate_rows(rows)))
556    }
557}