1use maplit::btreemap;
11use mz_adapter_types::connection::ConnectionId;
12use mz_cluster_client::ReplicaId;
13use mz_compute_types::ComputeInstanceId;
14use mz_compute_types::dataflows::DataflowDescription;
15use mz_compute_types::plan::Plan;
16use mz_ore::collections::CollectionExt;
17use mz_ore::instrument;
18use mz_repr::GlobalId;
19use mz_repr::explain::{ExprHumanizerExt, TransientItem};
20use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
21use mz_sql::plan::{self, QueryWhen, SubscribeFrom};
22use mz_sql::session::metadata::SessionMetadata;
23use std::collections::BTreeSet;
24use timely::progress::Antichain;
25use tokio::sync::mpsc;
26use tracing::Span;
27use uuid::Uuid;
28
29use crate::active_compute_sink::{ActiveComputeSink, ActiveSubscribe};
30use crate::command::ExecuteResponse;
31use crate::coord::sequencer::inner::return_if_err;
32use crate::coord::sequencer::{check_log_reads, emit_optimizer_notices};
33use crate::coord::{
34 Coordinator, ExplainContext, ExplainPlanContext, Message, PlanValidity, StageResult, Staged,
35 SubscribeExplain, SubscribeFinish, SubscribeOptimizeMir, SubscribeStage,
36 SubscribeTimestampOptimizeLir, TargetCluster,
37};
38use crate::error::AdapterError;
39use crate::explain::optimizer_trace::OptimizerTrace;
40use crate::optimize::Optimize;
41use crate::session::{Session, TransactionOps};
42use crate::{
43 AdapterNotice, ExecuteContext, ExecuteContextGuard, ReadHolds, TimelineContext, optimize,
44};
45
46impl Staged for SubscribeStage {
47 type Ctx = ExecuteContext;
48
49 fn validity(&mut self) -> &mut PlanValidity {
50 match self {
51 SubscribeStage::OptimizeMir(stage) => &mut stage.validity,
52 SubscribeStage::TimestampOptimizeLir(stage) => &mut stage.validity,
53 SubscribeStage::Finish(stage) => &mut stage.validity,
54 SubscribeStage::Explain(stage) => &mut stage.validity,
55 }
56 }
57
58 async fn stage(
59 self,
60 coord: &mut Coordinator,
61 ctx: &mut ExecuteContext,
62 ) -> Result<StageResult<Box<Self>>, AdapterError> {
63 match self {
64 SubscribeStage::OptimizeMir(stage) => coord.subscribe_optimize_mir(stage),
65 SubscribeStage::TimestampOptimizeLir(stage) => {
66 coord.subscribe_timestamp_optimize_lir(ctx, stage).await
67 }
68 SubscribeStage::Finish(stage) => coord.subscribe_finish(ctx, stage).await,
69 SubscribeStage::Explain(stage) => coord.subscribe_explain(ctx.session(), stage).await,
70 }
71 }
72
73 fn message(self, ctx: ExecuteContext, span: Span) -> Message {
74 Message::SubscribeStageReady {
75 ctx,
76 span,
77 stage: self,
78 }
79 }
80
81 fn cancel_enabled(&self) -> bool {
82 true
83 }
84}
85
86impl Coordinator {
87 #[instrument]
88 pub(crate) async fn sequence_subscribe(
89 &mut self,
90 mut ctx: ExecuteContext,
91 plan: plan::SubscribePlan,
92 target_cluster: TargetCluster,
93 ) {
94 let stage = return_if_err!(
95 self.subscribe_validate(
96 ctx.session_mut(),
97 plan,
98 target_cluster,
99 ExplainContext::None
100 ),
101 ctx
102 );
103 self.sequence_staged(ctx, Span::current(), stage).await;
104 }
105
106 #[instrument]
107 pub(crate) async fn explain_subscribe(
108 &mut self,
109 mut ctx: ExecuteContext,
110 plan::ExplainPlanPlan {
111 stage,
112 format,
113 config,
114 explainee,
115 }: plan::ExplainPlanPlan,
116 target_cluster: TargetCluster,
117 ) {
118 let plan::Explainee::Statement(stmt) = explainee else {
119 unreachable!()
122 };
123 let plan::ExplaineeStatement::Subscribe { broken, plan } = stmt else {
124 unreachable!()
127 };
128
129 let desc = match &plan.from {
130 SubscribeFrom::Id(_) => None,
131 SubscribeFrom::Query { desc, .. } => Some(desc.clone()),
132 };
133
134 let optimizer_trace = OptimizerTrace::new(stage.paths());
137
138 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
139 broken,
140 config,
141 format,
142 stage,
143 replan: None,
144 desc,
145 optimizer_trace,
146 });
147 let stage = return_if_err!(
148 self.subscribe_validate(ctx.session_mut(), plan, target_cluster, explain_ctx),
149 ctx
150 );
151 self.sequence_staged(ctx, Span::current(), stage).await;
152 }
153
154 #[instrument]
155 fn subscribe_validate(
156 &self,
157 session: &mut Session,
158 plan: plan::SubscribePlan,
159 target_cluster: TargetCluster,
160 explain_ctx: ExplainContext,
161 ) -> Result<SubscribeStage, AdapterError> {
162 let plan::SubscribePlan { from, when, .. } = &plan;
163
164 let cluster = self
165 .catalog()
166 .resolve_target_cluster(target_cluster, session)?;
167 let cluster_id = cluster.id;
168
169 if explain_ctx.needs_cluster() && cluster.replicas().next().is_none() {
171 return Err(AdapterError::NoClusterReplicasAvailable {
172 name: cluster.name.clone(),
173 is_managed: cluster.is_managed(),
174 });
175 }
176
177 let mut replica_id = session
178 .vars()
179 .cluster_replica()
180 .map(|name| {
181 cluster
182 .replica_id(name)
183 .ok_or(AdapterError::UnknownClusterReplica {
184 cluster_name: cluster.name.clone(),
185 replica_name: name.to_string(),
186 })
187 })
188 .transpose()?;
189
190 if explain_ctx.needs_cluster() && when == &QueryWhen::Immediately {
193 session.add_transaction_ops(TransactionOps::Subscribe)?;
196 }
197
198 let depends_on = from.depends_on();
199
200 let notices = check_log_reads(
202 self.catalog(),
203 cluster,
204 &depends_on,
205 &mut replica_id,
206 session.vars(),
207 )?;
208 session.add_notices(notices);
209
210 let mut timeline = self
212 .catalog()
213 .validate_timeline_context(depends_on.iter().copied())?;
214 if matches!(timeline, TimelineContext::TimestampIndependent) && from.contains_temporal() {
215 timeline = TimelineContext::TimestampDependent;
218 }
219
220 let dependencies = depends_on
221 .iter()
222 .map(|id| self.catalog().resolve_item_id(id))
223 .collect();
224 let validity = PlanValidity::new(
225 self.catalog().transient_revision(),
226 dependencies,
227 Some(cluster_id),
228 replica_id,
229 session.role_metadata().clone(),
230 );
231
232 Ok(SubscribeStage::OptimizeMir(SubscribeOptimizeMir {
233 validity,
234 plan,
235 timeline,
236 dependency_ids: depends_on,
237 cluster_id,
238 replica_id,
239 explain_ctx,
240 }))
241 }
242
243 #[instrument]
244 fn subscribe_optimize_mir(
245 &self,
246 SubscribeOptimizeMir {
247 mut validity,
248 plan,
249 timeline,
250 dependency_ids,
251 cluster_id,
252 replica_id,
253 explain_ctx,
254 }: SubscribeOptimizeMir,
255 ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
256 let plan::SubscribePlan {
257 with_snapshot,
258 up_to,
259 ..
260 } = &plan;
261
262 let compute_instance = self
264 .instance_snapshot(cluster_id)
265 .expect("compute instance does not exist");
266 let (_, view_id) = self.allocate_transient_id();
267 let (_, sink_id) = self.allocate_transient_id();
268 let debug_name = format!("subscribe-{}", sink_id);
269 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
270 .override_from(&self.catalog.get_cluster(cluster_id).config.features())
271 .override_from(&self.cluster_scoped_optimizer_overrides(cluster_id))
272 .override_from(&explain_ctx);
273
274 let mut optimizer = optimize::subscribe::Optimizer::new(
276 self.owned_catalog(),
277 compute_instance,
278 view_id,
279 sink_id,
280 *with_snapshot,
281 *up_to,
282 debug_name,
283 optimizer_config,
284 self.optimizer_metrics(),
285 );
286 let catalog = self.owned_catalog();
287
288 let span = Span::current();
289 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
290 || "optimize subscribe (mir)",
291 move || {
292 span.in_scope(|| {
293 let _dispatch_guard = explain_ctx.dispatch_guard();
294
295 let global_mir_plan = optimizer.catch_unwind_optimize(plan.clone())?;
297 validity.extend_dependencies(
299 global_mir_plan
300 .id_bundle(optimizer.cluster_id())
301 .iter()
302 .map(|id| catalog.resolve_item_id(&id)),
303 );
304
305 let stage =
306 SubscribeStage::TimestampOptimizeLir(SubscribeTimestampOptimizeLir {
307 validity,
308 plan,
309 timeline,
310 optimizer,
311 global_mir_plan,
312 dependency_ids,
313 replica_id,
314 explain_ctx,
315 });
316 Ok(Box::new(stage))
317 })
318 },
319 )))
320 }
321
322 #[instrument]
323 async fn subscribe_timestamp_optimize_lir(
324 &mut self,
325 ctx: &ExecuteContext,
326 SubscribeTimestampOptimizeLir {
327 validity,
328 plan,
329 timeline,
330 mut optimizer,
331 global_mir_plan,
332 dependency_ids,
333 replica_id,
334 explain_ctx,
335 }: SubscribeTimestampOptimizeLir,
336 ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
337 let plan::SubscribePlan { when, .. } = &plan;
338
339 let oracle_read_ts = self.oracle_read_ts(ctx.session(), &timeline, when).await;
341 let bundle = &global_mir_plan.id_bundle(optimizer.cluster_id());
342 let (determination, read_holds) = self.determine_timestamp(
343 ctx.session(),
344 bundle,
345 when,
346 optimizer.cluster_id(),
347 &timeline,
348 oracle_read_ts,
349 None,
350 )?;
351
352 let as_of = determination.timestamp_context.timestamp_or_default();
353
354 if let Some(id) = ctx.extra().contents() {
355 self.set_statement_execution_timestamp(id, as_of);
356 }
357 if let Some(up_to) = optimizer.up_to() {
358 if as_of == up_to {
359 ctx.session()
360 .add_notice(AdapterNotice::EqualSubscribeBounds { bound: up_to });
361 } else if as_of > up_to {
362 return Err(AdapterError::AbsurdSubscribeBounds { as_of, up_to });
363 }
364 }
365
366 self.store_transaction_read_holds(ctx.session().conn_id().clone(), read_holds);
367
368 let global_mir_plan = global_mir_plan.resolve(Antichain::from_elem(as_of));
369
370 let span = Span::current();
372 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
373 || "optimize subscribe (lir)",
374 move || {
375 span.in_scope(|| {
376 let _dispatch_guard = explain_ctx.dispatch_guard();
377
378 let cluster_id = optimizer.cluster_id();
379
380 let mut pipeline = || -> Result<_, AdapterError> {
381 let global_lir_plan =
383 optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
384 Ok(global_lir_plan)
385 };
386
387 let stage = match pipeline() {
388 Ok(global_lir_plan) => {
389 if let ExplainContext::Plan(explain_ctx) = explain_ctx {
390 let (_, df_meta) = global_lir_plan.unapply();
391 SubscribeStage::Explain(SubscribeExplain {
392 validity,
393 optimizer,
394 df_meta,
395 cluster_id,
396 explain_ctx,
397 })
398 } else {
399 SubscribeStage::Finish(SubscribeFinish {
400 validity,
401 cluster_id,
402 plan,
403 global_lir_plan,
404 dependency_ids,
405 replica_id,
406 })
407 }
408 }
409 Err(err) => {
410 let ExplainContext::Plan(explain_ctx) = explain_ctx else {
411 return Err(err);
412 };
413
414 if explain_ctx.broken {
415 tracing::error!("error while handling EXPLAIN statement: {}", err);
416 SubscribeStage::Explain(SubscribeExplain {
417 validity,
418 optimizer,
419 df_meta: Default::default(),
420 cluster_id,
421 explain_ctx,
422 })
423 } else {
424 return Err(err);
425 }
426 }
427 };
428
429 Ok(Box::new(stage))
430 })
431 },
432 )))
433 }
434
435 #[instrument]
436 async fn subscribe_finish(
437 &mut self,
438 ctx: &mut ExecuteContext,
439 SubscribeFinish {
440 validity: _,
441 cluster_id,
442 plan,
443 global_lir_plan,
444 dependency_ids,
445 replica_id,
446 }: SubscribeFinish,
447 ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
448 let (df_desc, df_meta) = global_lir_plan.unapply();
449 emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices);
450 let conn_id = ctx.session.conn_id().clone();
451 let session_uuid = ctx.session().uuid();
452 let txn_read_holds = self
453 .txn_read_holds
454 .remove(&conn_id)
455 .expect("must have previously installed read holds");
456 let resp = self
457 .implement_subscribe(
458 ctx.extra_mut(),
459 df_desc,
460 dependency_ids,
461 cluster_id,
462 replica_id,
463 conn_id,
464 session_uuid,
465 txn_read_holds,
466 plan,
467 )
468 .await?;
469 Ok(StageResult::Response(resp))
470 }
471
472 #[instrument]
473 pub(crate) async fn implement_subscribe(
474 &mut self,
475 ctx_extra: &mut ExecuteContextGuard,
476 df_desc: DataflowDescription<Plan>,
477 dependency_ids: BTreeSet<GlobalId>,
478 cluster_id: ComputeInstanceId,
479 replica_id: Option<ReplicaId>,
480 conn_id: ConnectionId,
481 session_uuid: Uuid,
482 read_holds: ReadHolds,
483 plan: plan::SubscribePlan,
484 ) -> Result<ExecuteResponse, AdapterError> {
485 let sink_id = df_desc.sink_id();
486
487 let (tx, rx) = mpsc::unbounded_channel();
488 let active_subscribe = ActiveSubscribe {
489 conn_id: conn_id.clone(),
490 session_uuid,
491 channel: tx,
492 emit_progress: plan.emit_progress,
493 as_of: df_desc
494 .as_of
495 .as_ref()
496 .and_then(|t| t.as_option())
497 .copied()
498 .expect("set to Some in an earlier stage"),
499 arity: df_desc
500 .sink_exports
501 .values()
502 .into_element()
503 .from_desc
504 .arity(),
505 cluster_id,
506 depends_on: dependency_ids,
507 start_time: self.now(),
508 output: plan.output,
509 internal: false,
510 };
511 active_subscribe.initialize();
512
513 let write_notify_fut = self
515 .add_active_compute_sink(sink_id, ActiveComputeSink::Subscribe(active_subscribe))
516 .await;
517 let ship_dataflow_fut = self.ship_dataflow(df_desc, cluster_id, replica_id);
519
520 let ((), ()) = futures::future::join(write_notify_fut, ship_dataflow_fut).await;
523
524 drop(read_holds);
526
527 let resp = ExecuteResponse::Subscribing {
528 rx,
529 ctx_extra: std::mem::take(ctx_extra),
530 instance_id: cluster_id,
531 };
532 let resp = match plan.copy_to {
533 None => resp,
534 Some(format) => ExecuteResponse::CopyTo {
535 format,
536 resp: Box::new(resp),
537 },
538 };
539 Ok(resp)
540 }
541
542 #[instrument]
543 async fn subscribe_explain(
544 &self,
545 session: &Session,
546 SubscribeExplain {
547 optimizer,
548 df_meta,
549 cluster_id,
550 explain_ctx:
551 ExplainPlanContext {
552 config,
553 format,
554 stage,
555 optimizer_trace,
556 desc,
557 ..
558 },
559 ..
560 }: SubscribeExplain,
561 ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
562 let session_catalog = self.catalog().for_session(session);
563
564 let expr_humanizer = {
565 let transient_items = btreemap! {
566 optimizer.sink_id() => TransientItem::new(
567 Some(vec![GlobalId::Explain.to_string()]),
568 desc.map(|d| d.iter_names().map(|c| c.to_string()).collect()),
569 )
570 };
571 ExprHumanizerExt::new(transient_items, &session_catalog)
572 };
573
574 let target_cluster = self.catalog().get_cluster(cluster_id);
575
576 let features = OptimizerFeatures::from(self.catalog().system_config())
577 .override_from(&target_cluster.config.features())
578 .override_from(&self.cluster_scoped_optimizer_overrides(cluster_id))
579 .override_from(&config.features);
580
581 let rows = optimizer_trace
582 .into_rows(
583 format,
584 &config,
585 &features,
586 &expr_humanizer,
587 None,
588 Some(target_cluster),
589 df_meta,
590 stage,
591 plan::ExplaineeStatementKind::Subscribe,
592 None,
593 )
594 .await?;
595
596 Ok(StageResult::Response(Self::send_immediate_rows(rows)))
597 }
598}