1use maplit::btreemap;
11use mz_adapter_types::connection::ConnectionId;
12use mz_cluster_client::ReplicaId;
13use mz_compute_types::ComputeInstanceId;
14use mz_compute_types::dataflows::DataflowDescription;
15use mz_compute_types::plan::Plan;
16use mz_ore::collections::CollectionExt;
17use mz_ore::instrument;
18use mz_repr::GlobalId;
19use mz_repr::explain::{ExprHumanizerExt, TransientItem};
20use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
21use mz_sql::plan::{self, QueryWhen, SubscribeFrom};
22use mz_sql::session::metadata::SessionMetadata;
23use std::collections::BTreeSet;
24use timely::progress::Antichain;
25use tokio::sync::mpsc;
26use tracing::Span;
27use uuid::Uuid;
28
29use crate::active_compute_sink::{ActiveComputeSink, ActiveSubscribe};
30use crate::command::ExecuteResponse;
31use crate::coord::sequencer::inner::return_if_err;
32use crate::coord::sequencer::{check_log_reads, emit_optimizer_notices};
33use crate::coord::{
34 Coordinator, ExplainContext, ExplainPlanContext, Message, PlanValidity, StageResult, Staged,
35 SubscribeExplain, SubscribeFinish, SubscribeOptimizeMir, SubscribeStage,
36 SubscribeTimestampOptimizeLir, TargetCluster,
37};
38use crate::error::AdapterError;
39use crate::explain::optimizer_trace::OptimizerTrace;
40use crate::optimize::Optimize;
41use crate::session::{Session, TransactionOps};
42use crate::{
43 AdapterNotice, ExecuteContext, ExecuteContextGuard, ReadHolds, TimelineContext, optimize,
44};
45
46impl Staged for SubscribeStage {
47 type Ctx = ExecuteContext;
48
49 fn validity(&mut self) -> &mut PlanValidity {
50 match self {
51 SubscribeStage::OptimizeMir(stage) => &mut stage.validity,
52 SubscribeStage::TimestampOptimizeLir(stage) => &mut stage.validity,
53 SubscribeStage::Finish(stage) => &mut stage.validity,
54 SubscribeStage::Explain(stage) => &mut stage.validity,
55 }
56 }
57
58 async fn stage(
59 self,
60 coord: &mut Coordinator,
61 ctx: &mut ExecuteContext,
62 ) -> Result<StageResult<Box<Self>>, AdapterError> {
63 match self {
64 SubscribeStage::OptimizeMir(stage) => coord.subscribe_optimize_mir(stage),
65 SubscribeStage::TimestampOptimizeLir(stage) => {
66 coord.subscribe_timestamp_optimize_lir(ctx, stage).await
67 }
68 SubscribeStage::Finish(stage) => coord.subscribe_finish(ctx, stage).await,
69 SubscribeStage::Explain(stage) => coord.subscribe_explain(ctx.session(), stage).await,
70 }
71 }
72
73 fn message(self, ctx: ExecuteContext, span: Span) -> Message {
74 Message::SubscribeStageReady {
75 ctx,
76 span,
77 stage: self,
78 }
79 }
80
81 fn cancel_enabled(&self) -> bool {
82 true
83 }
84}
85
86impl Coordinator {
87 #[instrument]
88 pub(crate) async fn sequence_subscribe(
89 &mut self,
90 mut ctx: ExecuteContext,
91 plan: plan::SubscribePlan,
92 target_cluster: TargetCluster,
93 ) {
94 let stage = return_if_err!(
95 self.subscribe_validate(
96 ctx.session_mut(),
97 plan,
98 target_cluster,
99 ExplainContext::None
100 ),
101 ctx
102 );
103 self.sequence_staged(ctx, Span::current(), stage).await;
104 }
105
106 #[instrument]
107 pub(crate) async fn explain_subscribe(
108 &mut self,
109 mut ctx: ExecuteContext,
110 plan::ExplainPlanPlan {
111 stage,
112 format,
113 config,
114 explainee,
115 }: plan::ExplainPlanPlan,
116 target_cluster: TargetCluster,
117 ) {
118 let plan::Explainee::Statement(stmt) = explainee else {
119 unreachable!()
122 };
123 let plan::ExplaineeStatement::Subscribe { broken, plan } = stmt else {
124 unreachable!()
127 };
128
129 let desc = match &plan.from {
130 SubscribeFrom::Id(_) => None,
131 SubscribeFrom::Query { desc, .. } => Some(desc.clone()),
132 };
133
134 let optimizer_trace = OptimizerTrace::new(stage.paths());
137
138 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
139 broken,
140 config,
141 format,
142 stage,
143 replan: None,
144 desc,
145 optimizer_trace,
146 });
147 let stage = return_if_err!(
148 self.subscribe_validate(ctx.session_mut(), plan, target_cluster, explain_ctx),
149 ctx
150 );
151 self.sequence_staged(ctx, Span::current(), stage).await;
152 }
153
154 #[instrument]
155 fn subscribe_validate(
156 &self,
157 session: &mut Session,
158 plan: plan::SubscribePlan,
159 target_cluster: TargetCluster,
160 explain_ctx: ExplainContext,
161 ) -> Result<SubscribeStage, AdapterError> {
162 let plan::SubscribePlan { from, when, .. } = &plan;
163
164 let cluster = self
165 .catalog()
166 .resolve_target_cluster(target_cluster, session)?;
167 let cluster_id = cluster.id;
168
169 if explain_ctx.needs_cluster() && cluster.replicas().next().is_none() {
171 return Err(AdapterError::NoClusterReplicasAvailable {
172 name: cluster.name.clone(),
173 is_managed: cluster.is_managed(),
174 });
175 }
176
177 let mut replica_id = session
178 .vars()
179 .cluster_replica()
180 .map(|name| {
181 cluster
182 .replica_id(name)
183 .ok_or(AdapterError::UnknownClusterReplica {
184 cluster_name: cluster.name.clone(),
185 replica_name: name.to_string(),
186 })
187 })
188 .transpose()?;
189
190 if when == &QueryWhen::Immediately {
193 session.add_transaction_ops(TransactionOps::Subscribe)?;
196 }
197
198 let depends_on = from.depends_on();
199
200 let notices = check_log_reads(
202 self.catalog(),
203 cluster,
204 &depends_on,
205 &mut replica_id,
206 session.vars(),
207 )?;
208 session.add_notices(notices);
209
210 let mut timeline = self
212 .catalog()
213 .validate_timeline_context(depends_on.iter().copied())?;
214 if matches!(timeline, TimelineContext::TimestampIndependent) && from.contains_temporal() {
215 timeline = TimelineContext::TimestampDependent;
218 }
219
220 let dependencies = depends_on
221 .iter()
222 .map(|id| self.catalog().resolve_item_id(id))
223 .collect();
224 let validity = PlanValidity::new(
225 self.catalog().transient_revision(),
226 dependencies,
227 Some(cluster_id),
228 replica_id,
229 session.role_metadata().clone(),
230 );
231
232 Ok(SubscribeStage::OptimizeMir(SubscribeOptimizeMir {
233 validity,
234 plan,
235 timeline,
236 dependency_ids: depends_on,
237 cluster_id,
238 replica_id,
239 explain_ctx,
240 }))
241 }
242
243 #[instrument]
244 fn subscribe_optimize_mir(
245 &self,
246 SubscribeOptimizeMir {
247 mut validity,
248 plan,
249 timeline,
250 dependency_ids,
251 cluster_id,
252 replica_id,
253 explain_ctx,
254 }: SubscribeOptimizeMir,
255 ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
256 let plan::SubscribePlan {
257 with_snapshot,
258 up_to,
259 ..
260 } = &plan;
261
262 let compute_instance = self
264 .instance_snapshot(cluster_id)
265 .expect("compute instance does not exist");
266 let (_, view_id) = self.allocate_transient_id();
267 let (_, sink_id) = self.allocate_transient_id();
268 let debug_name = format!("subscribe-{}", sink_id);
269 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
270 .override_from(&self.catalog.get_cluster(cluster_id).config.features())
271 .override_from(&explain_ctx);
272
273 let mut optimizer = optimize::subscribe::Optimizer::new(
275 self.owned_catalog(),
276 compute_instance,
277 view_id,
278 sink_id,
279 *with_snapshot,
280 *up_to,
281 debug_name,
282 optimizer_config,
283 self.optimizer_metrics(),
284 );
285 let catalog = self.owned_catalog();
286
287 let span = Span::current();
288 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
289 || "optimize subscribe (mir)",
290 move || {
291 span.in_scope(|| {
292 let _dispatch_guard = explain_ctx.dispatch_guard();
293
294 let global_mir_plan = optimizer.catch_unwind_optimize(plan.clone())?;
296 validity.extend_dependencies(
298 global_mir_plan
299 .id_bundle(optimizer.cluster_id())
300 .iter()
301 .map(|id| catalog.resolve_item_id(&id)),
302 );
303
304 let stage =
305 SubscribeStage::TimestampOptimizeLir(SubscribeTimestampOptimizeLir {
306 validity,
307 plan,
308 timeline,
309 optimizer,
310 global_mir_plan,
311 dependency_ids,
312 replica_id,
313 explain_ctx,
314 });
315 Ok(Box::new(stage))
316 })
317 },
318 )))
319 }
320
321 #[instrument]
322 async fn subscribe_timestamp_optimize_lir(
323 &mut self,
324 ctx: &ExecuteContext,
325 SubscribeTimestampOptimizeLir {
326 validity,
327 plan,
328 timeline,
329 mut optimizer,
330 global_mir_plan,
331 dependency_ids,
332 replica_id,
333 explain_ctx,
334 }: SubscribeTimestampOptimizeLir,
335 ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
336 let plan::SubscribePlan { when, .. } = &plan;
337
338 let oracle_read_ts = self.oracle_read_ts(ctx.session(), &timeline, when).await;
340 let bundle = &global_mir_plan.id_bundle(optimizer.cluster_id());
341 let (determination, read_holds) = self.determine_timestamp(
342 ctx.session(),
343 bundle,
344 when,
345 optimizer.cluster_id(),
346 &timeline,
347 oracle_read_ts,
348 None,
349 )?;
350
351 let as_of = determination.timestamp_context.timestamp_or_default();
352
353 if let Some(id) = ctx.extra().contents() {
354 self.set_statement_execution_timestamp(id, as_of);
355 }
356 if let Some(up_to) = optimizer.up_to() {
357 if as_of == up_to {
358 ctx.session()
359 .add_notice(AdapterNotice::EqualSubscribeBounds { bound: up_to });
360 } else if as_of > up_to {
361 return Err(AdapterError::AbsurdSubscribeBounds { as_of, up_to });
362 }
363 }
364
365 self.store_transaction_read_holds(ctx.session().conn_id().clone(), read_holds);
366
367 let global_mir_plan = global_mir_plan.resolve(Antichain::from_elem(as_of));
368
369 let span = Span::current();
371 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
372 || "optimize subscribe (lir)",
373 move || {
374 span.in_scope(|| {
375 let _dispatch_guard = explain_ctx.dispatch_guard();
376
377 let cluster_id = optimizer.cluster_id();
378
379 let mut pipeline = || -> Result<_, AdapterError> {
380 let global_lir_plan =
382 optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
383 Ok(global_lir_plan)
384 };
385
386 let stage = match pipeline() {
387 Ok(global_lir_plan) => {
388 if let ExplainContext::Plan(explain_ctx) = explain_ctx {
389 let (_, df_meta) = global_lir_plan.unapply();
390 SubscribeStage::Explain(SubscribeExplain {
391 validity,
392 optimizer,
393 df_meta,
394 cluster_id,
395 explain_ctx,
396 })
397 } else {
398 SubscribeStage::Finish(SubscribeFinish {
399 validity,
400 cluster_id,
401 plan,
402 global_lir_plan,
403 dependency_ids,
404 replica_id,
405 })
406 }
407 }
408 Err(err) => {
409 let ExplainContext::Plan(explain_ctx) = explain_ctx else {
410 return Err(err);
411 };
412
413 if explain_ctx.broken {
414 tracing::error!("error while handling EXPLAIN statement: {}", err);
415 SubscribeStage::Explain(SubscribeExplain {
416 validity,
417 optimizer,
418 df_meta: Default::default(),
419 cluster_id,
420 explain_ctx,
421 })
422 } else {
423 return Err(err);
424 }
425 }
426 };
427
428 Ok(Box::new(stage))
429 })
430 },
431 )))
432 }
433
434 #[instrument]
435 async fn subscribe_finish(
436 &mut self,
437 ctx: &mut ExecuteContext,
438 SubscribeFinish {
439 validity: _,
440 cluster_id,
441 plan,
442 global_lir_plan,
443 dependency_ids,
444 replica_id,
445 }: SubscribeFinish,
446 ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
447 let (df_desc, df_meta) = global_lir_plan.unapply();
448 emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices);
449 let conn_id = ctx.session.conn_id().clone();
450 let session_uuid = ctx.session().uuid();
451 let txn_read_holds = self
452 .txn_read_holds
453 .remove(&conn_id)
454 .expect("must have previously installed read holds");
455 let resp = self
456 .implement_subscribe(
457 ctx.extra_mut(),
458 df_desc,
459 dependency_ids,
460 cluster_id,
461 replica_id,
462 conn_id,
463 session_uuid,
464 txn_read_holds,
465 plan,
466 )
467 .await?;
468 Ok(StageResult::Response(resp))
469 }
470
471 #[instrument]
472 pub(crate) async fn implement_subscribe(
473 &mut self,
474 ctx_extra: &mut ExecuteContextGuard,
475 df_desc: DataflowDescription<Plan>,
476 dependency_ids: BTreeSet<GlobalId>,
477 cluster_id: ComputeInstanceId,
478 replica_id: Option<ReplicaId>,
479 conn_id: ConnectionId,
480 session_uuid: Uuid,
481 read_holds: ReadHolds<mz_repr::Timestamp>,
482 plan: plan::SubscribePlan,
483 ) -> Result<ExecuteResponse, AdapterError> {
484 let sink_id = df_desc.sink_id();
485
486 let (tx, rx) = mpsc::unbounded_channel();
487 let active_subscribe = ActiveSubscribe {
488 conn_id: conn_id.clone(),
489 session_uuid,
490 channel: tx,
491 emit_progress: plan.emit_progress,
492 as_of: df_desc
493 .as_of
494 .as_ref()
495 .and_then(|t| t.as_option())
496 .copied()
497 .expect("set to Some in an earlier stage"),
498 arity: df_desc
499 .sink_exports
500 .values()
501 .into_element()
502 .from_desc
503 .arity(),
504 cluster_id,
505 depends_on: dependency_ids,
506 start_time: self.now(),
507 output: plan.output,
508 };
509 active_subscribe.initialize();
510
511 let write_notify_fut = self
513 .add_active_compute_sink(sink_id, ActiveComputeSink::Subscribe(active_subscribe))
514 .await;
515 let ship_dataflow_fut = self.ship_dataflow(df_desc, cluster_id, replica_id);
517
518 let ((), ()) = futures::future::join(write_notify_fut, ship_dataflow_fut).await;
521
522 drop(read_holds);
524
525 let resp = ExecuteResponse::Subscribing {
526 rx,
527 ctx_extra: std::mem::take(ctx_extra),
528 instance_id: cluster_id,
529 };
530 let resp = match plan.copy_to {
531 None => resp,
532 Some(format) => ExecuteResponse::CopyTo {
533 format,
534 resp: Box::new(resp),
535 },
536 };
537 Ok(resp)
538 }
539
540 #[instrument]
541 async fn subscribe_explain(
542 &self,
543 session: &Session,
544 SubscribeExplain {
545 optimizer,
546 df_meta,
547 cluster_id,
548 explain_ctx:
549 ExplainPlanContext {
550 config,
551 format,
552 stage,
553 optimizer_trace,
554 desc,
555 ..
556 },
557 ..
558 }: SubscribeExplain,
559 ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
560 let session_catalog = self.catalog().for_session(session);
561
562 let expr_humanizer = {
563 let transient_items = btreemap! {
564 optimizer.sink_id() => TransientItem::new(
565 Some(vec![GlobalId::Explain.to_string()]),
566 desc.map(|d| d.iter_names().map(|c| c.to_string()).collect()),
567 )
568 };
569 ExprHumanizerExt::new(transient_items, &session_catalog)
570 };
571
572 let target_cluster = self.catalog().get_cluster(cluster_id);
573
574 let features = OptimizerFeatures::from(self.catalog().system_config())
575 .override_from(&target_cluster.config.features())
576 .override_from(&config.features);
577
578 let rows = optimizer_trace
579 .into_rows(
580 format,
581 &config,
582 &features,
583 &expr_humanizer,
584 None,
585 Some(target_cluster),
586 df_meta,
587 stage,
588 plan::ExplaineeStatementKind::Subscribe,
589 None,
590 )
591 .await?;
592
593 Ok(StageResult::Response(Self::send_immediate_rows(rows)))
594 }
595}