1use maplit::btreemap;
11use mz_ore::instrument;
12use mz_repr::GlobalId;
13use mz_repr::explain::{ExprHumanizerExt, TransientItem};
14use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
15use mz_sql::plan::{self, QueryWhen, SubscribeFrom};
16use mz_sql::session::metadata::SessionMetadata;
17use timely::progress::Antichain;
18use tokio::sync::mpsc;
19use tracing::Span;
20
21use crate::active_compute_sink::{ActiveComputeSink, ActiveSubscribe};
22use crate::command::ExecuteResponse;
23use crate::coord::sequencer::inner::return_if_err;
24use crate::coord::sequencer::{check_log_reads, emit_optimizer_notices};
25use crate::coord::{
26 Coordinator, ExplainContext, ExplainPlanContext, Message, PlanValidity, StageResult, Staged,
27 SubscribeExplain, SubscribeFinish, SubscribeOptimizeMir, SubscribeStage,
28 SubscribeTimestampOptimizeLir, TargetCluster,
29};
30use crate::error::AdapterError;
31use crate::explain::optimizer_trace::OptimizerTrace;
32use crate::optimize::Optimize;
33use crate::session::{Session, TransactionOps};
34use crate::{AdapterNotice, ExecuteContext, TimelineContext, optimize};
35
36impl Staged for SubscribeStage {
37 type Ctx = ExecuteContext;
38
39 fn validity(&mut self) -> &mut PlanValidity {
40 match self {
41 SubscribeStage::OptimizeMir(stage) => &mut stage.validity,
42 SubscribeStage::TimestampOptimizeLir(stage) => &mut stage.validity,
43 SubscribeStage::Finish(stage) => &mut stage.validity,
44 SubscribeStage::Explain(stage) => &mut stage.validity,
45 }
46 }
47
48 async fn stage(
49 self,
50 coord: &mut Coordinator,
51 ctx: &mut ExecuteContext,
52 ) -> Result<StageResult<Box<Self>>, AdapterError> {
53 match self {
54 SubscribeStage::OptimizeMir(stage) => coord.subscribe_optimize_mir(stage),
55 SubscribeStage::TimestampOptimizeLir(stage) => {
56 coord.subscribe_timestamp_optimize_lir(ctx, stage).await
57 }
58 SubscribeStage::Finish(stage) => coord.subscribe_finish(ctx, stage).await,
59 SubscribeStage::Explain(stage) => coord.subscribe_explain(ctx.session(), stage).await,
60 }
61 }
62
63 fn message(self, ctx: ExecuteContext, span: Span) -> Message {
64 Message::SubscribeStageReady {
65 ctx,
66 span,
67 stage: self,
68 }
69 }
70
71 fn cancel_enabled(&self) -> bool {
72 true
73 }
74}
75
76impl Coordinator {
77 #[instrument]
78 pub(crate) async fn sequence_subscribe(
79 &mut self,
80 mut ctx: ExecuteContext,
81 plan: plan::SubscribePlan,
82 target_cluster: TargetCluster,
83 ) {
84 let stage = return_if_err!(
85 self.subscribe_validate(
86 ctx.session_mut(),
87 plan,
88 target_cluster,
89 ExplainContext::None
90 ),
91 ctx
92 );
93 self.sequence_staged(ctx, Span::current(), stage).await;
94 }
95
96 #[instrument]
97 pub(crate) async fn explain_subscribe(
98 &mut self,
99 mut ctx: ExecuteContext,
100 plan::ExplainPlanPlan {
101 stage,
102 format,
103 config,
104 explainee,
105 }: plan::ExplainPlanPlan,
106 target_cluster: TargetCluster,
107 ) {
108 let plan::Explainee::Statement(stmt) = explainee else {
109 unreachable!()
112 };
113 let plan::ExplaineeStatement::Subscribe { broken, plan } = stmt else {
114 unreachable!()
117 };
118
119 let desc = match &plan.from {
120 SubscribeFrom::Id(_) => None,
121 SubscribeFrom::Query { desc, .. } => Some(desc.clone()),
122 };
123
124 let optimizer_trace = OptimizerTrace::new(stage.paths());
127
128 let explain_ctx = ExplainContext::Plan(ExplainPlanContext {
129 broken,
130 config,
131 format,
132 stage,
133 replan: None,
134 desc,
135 optimizer_trace,
136 });
137 let stage = return_if_err!(
138 self.subscribe_validate(ctx.session_mut(), plan, target_cluster, explain_ctx),
139 ctx
140 );
141 self.sequence_staged(ctx, Span::current(), stage).await;
142 }
143
144 #[instrument]
145 fn subscribe_validate(
146 &self,
147 session: &mut Session,
148 plan: plan::SubscribePlan,
149 target_cluster: TargetCluster,
150 explain_ctx: ExplainContext,
151 ) -> Result<SubscribeStage, AdapterError> {
152 let plan::SubscribePlan { from, when, .. } = &plan;
153
154 let cluster = self
155 .catalog()
156 .resolve_target_cluster(target_cluster, session)?;
157 let cluster_id = cluster.id;
158
159 if explain_ctx.needs_cluster() && cluster.replicas().next().is_none() {
161 return Err(AdapterError::NoClusterReplicasAvailable {
162 name: cluster.name.clone(),
163 is_managed: cluster.is_managed(),
164 });
165 }
166
167 let mut replica_id = session
168 .vars()
169 .cluster_replica()
170 .map(|name| {
171 cluster
172 .replica_id(name)
173 .ok_or(AdapterError::UnknownClusterReplica {
174 cluster_name: cluster.name.clone(),
175 replica_name: name.to_string(),
176 })
177 })
178 .transpose()?;
179
180 if when == &QueryWhen::Immediately {
183 session.add_transaction_ops(TransactionOps::Subscribe)?;
186 }
187
188 let depends_on = from.depends_on();
189
190 let notices = check_log_reads(
192 self.catalog(),
193 cluster,
194 &depends_on,
195 &mut replica_id,
196 session.vars(),
197 )?;
198 session.add_notices(notices);
199
200 let mut timeline = self
202 .catalog()
203 .validate_timeline_context(depends_on.iter().copied())?;
204 if matches!(timeline, TimelineContext::TimestampIndependent) && from.contains_temporal() {
205 timeline = TimelineContext::TimestampDependent;
208 }
209
210 let dependencies = depends_on
211 .iter()
212 .map(|id| self.catalog().resolve_item_id(id))
213 .collect();
214 let validity = PlanValidity::new(
215 self.catalog().transient_revision(),
216 dependencies,
217 Some(cluster_id),
218 replica_id,
219 session.role_metadata().clone(),
220 );
221
222 Ok(SubscribeStage::OptimizeMir(SubscribeOptimizeMir {
223 validity,
224 plan,
225 timeline,
226 dependency_ids: depends_on,
227 cluster_id,
228 replica_id,
229 explain_ctx,
230 }))
231 }
232
233 #[instrument]
234 fn subscribe_optimize_mir(
235 &self,
236 SubscribeOptimizeMir {
237 mut validity,
238 plan,
239 timeline,
240 dependency_ids,
241 cluster_id,
242 replica_id,
243 explain_ctx,
244 }: SubscribeOptimizeMir,
245 ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
246 let plan::SubscribePlan {
247 with_snapshot,
248 up_to,
249 ..
250 } = &plan;
251
252 let compute_instance = self
254 .instance_snapshot(cluster_id)
255 .expect("compute instance does not exist");
256 let (_, view_id) = self.allocate_transient_id();
257 let (_, sink_id) = self.allocate_transient_id();
258 let debug_name = format!("subscribe-{}", sink_id);
259 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
260 .override_from(&self.catalog.get_cluster(cluster_id).config.features())
261 .override_from(&explain_ctx);
262
263 let mut optimizer = optimize::subscribe::Optimizer::new(
265 self.owned_catalog(),
266 compute_instance,
267 view_id,
268 sink_id,
269 *with_snapshot,
270 *up_to,
271 debug_name,
272 optimizer_config,
273 self.optimizer_metrics(),
274 );
275 let catalog = self.owned_catalog();
276
277 let span = Span::current();
278 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
279 || "optimize subscribe (mir)",
280 move || {
281 span.in_scope(|| {
282 let _dispatch_guard = explain_ctx.dispatch_guard();
283
284 let global_mir_plan = optimizer.catch_unwind_optimize(plan.from.clone())?;
286 validity.extend_dependencies(
288 global_mir_plan
289 .id_bundle(optimizer.cluster_id())
290 .iter()
291 .map(|id| catalog.resolve_item_id(&id)),
292 );
293
294 let stage =
295 SubscribeStage::TimestampOptimizeLir(SubscribeTimestampOptimizeLir {
296 validity,
297 plan,
298 timeline,
299 optimizer,
300 global_mir_plan,
301 dependency_ids,
302 replica_id,
303 explain_ctx,
304 });
305 Ok(Box::new(stage))
306 })
307 },
308 )))
309 }
310
311 #[instrument]
312 async fn subscribe_timestamp_optimize_lir(
313 &mut self,
314 ctx: &ExecuteContext,
315 SubscribeTimestampOptimizeLir {
316 validity,
317 plan,
318 timeline,
319 mut optimizer,
320 global_mir_plan,
321 dependency_ids,
322 replica_id,
323 explain_ctx,
324 }: SubscribeTimestampOptimizeLir,
325 ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
326 let plan::SubscribePlan { when, .. } = &plan;
327
328 let oracle_read_ts = self.oracle_read_ts(ctx.session(), &timeline, when).await;
330 let bundle = &global_mir_plan.id_bundle(optimizer.cluster_id());
331 let (determination, read_holds) = self.determine_timestamp(
332 ctx.session(),
333 bundle,
334 when,
335 optimizer.cluster_id(),
336 &timeline,
337 oracle_read_ts,
338 None,
339 )?;
340
341 let as_of = determination.timestamp_context.timestamp_or_default();
342
343 if let Some(id) = ctx.extra().contents() {
344 self.set_statement_execution_timestamp(id, as_of);
345 }
346 if let Some(up_to) = optimizer.up_to() {
347 if as_of == up_to {
348 ctx.session()
349 .add_notice(AdapterNotice::EqualSubscribeBounds { bound: up_to });
350 } else if as_of > up_to {
351 return Err(AdapterError::AbsurdSubscribeBounds { as_of, up_to });
352 }
353 }
354
355 self.store_transaction_read_holds(ctx.session().conn_id().clone(), read_holds);
356
357 let global_mir_plan = global_mir_plan.resolve(Antichain::from_elem(as_of));
358
359 let span = Span::current();
361 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
362 || "optimize subscribe (lir)",
363 move || {
364 span.in_scope(|| {
365 let _dispatch_guard = explain_ctx.dispatch_guard();
366
367 let cluster_id = optimizer.cluster_id();
368
369 let mut pipeline = || -> Result<_, AdapterError> {
370 let global_lir_plan =
372 optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
373 Ok(global_lir_plan)
374 };
375
376 let stage = match pipeline() {
377 Ok(global_lir_plan) => {
378 if let ExplainContext::Plan(explain_ctx) = explain_ctx {
379 let (_, df_meta) = global_lir_plan.unapply();
380 SubscribeStage::Explain(SubscribeExplain {
381 validity,
382 optimizer,
383 df_meta,
384 cluster_id,
385 explain_ctx,
386 })
387 } else {
388 SubscribeStage::Finish(SubscribeFinish {
389 validity,
390 cluster_id,
391 plan,
392 global_lir_plan,
393 dependency_ids,
394 replica_id,
395 })
396 }
397 }
398 Err(err) => {
399 let ExplainContext::Plan(explain_ctx) = explain_ctx else {
400 return Err(err);
401 };
402
403 if explain_ctx.broken {
404 tracing::error!("error while handling EXPLAIN statement: {}", err);
405 SubscribeStage::Explain(SubscribeExplain {
406 validity,
407 optimizer,
408 df_meta: Default::default(),
409 cluster_id,
410 explain_ctx,
411 })
412 } else {
413 return Err(err);
414 }
415 }
416 };
417
418 Ok(Box::new(stage))
419 })
420 },
421 )))
422 }
423
424 #[instrument]
425 async fn subscribe_finish(
426 &mut self,
427 ctx: &mut ExecuteContext,
428 SubscribeFinish {
429 validity: _,
430 cluster_id,
431 plan:
432 plan::SubscribePlan {
433 copy_to,
434 emit_progress,
435 output,
436 ..
437 },
438 global_lir_plan,
439 dependency_ids,
440 replica_id,
441 }: SubscribeFinish,
442 ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
443 let sink_id = global_lir_plan.sink_id();
444
445 let (tx, rx) = mpsc::unbounded_channel();
446 let active_subscribe = ActiveSubscribe {
447 conn_id: ctx.session().conn_id().clone(),
448 session_uuid: ctx.session().uuid(),
449 channel: tx,
450 emit_progress,
451 as_of: global_lir_plan
452 .as_of()
453 .expect("set to Some in an earlier stage"),
454 arity: global_lir_plan.sink_desc().from_desc.arity(),
455 cluster_id,
456 depends_on: dependency_ids,
457 start_time: self.now(),
458 output,
459 };
460 active_subscribe.initialize();
461
462 let (df_desc, df_meta) = global_lir_plan.unapply();
463
464 emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices);
466
467 let write_notify_fut = self
469 .add_active_compute_sink(sink_id, ActiveComputeSink::Subscribe(active_subscribe))
470 .await;
471 let ship_dataflow_fut = self.ship_dataflow(df_desc, cluster_id, replica_id);
473
474 let ((), ()) = futures::future::join(write_notify_fut, ship_dataflow_fut).await;
477
478 let txn_read_holds = self
480 .txn_read_holds
481 .remove(ctx.session().conn_id())
482 .expect("must have previously installed read holds");
483
484 drop(txn_read_holds);
486
487 let resp = ExecuteResponse::Subscribing {
488 rx,
489 ctx_extra: std::mem::take(ctx.extra_mut()),
490 instance_id: cluster_id,
491 };
492 let resp = match copy_to {
493 None => resp,
494 Some(format) => ExecuteResponse::CopyTo {
495 format,
496 resp: Box::new(resp),
497 },
498 };
499 Ok(StageResult::Response(resp))
500 }
501
502 #[instrument]
503 async fn subscribe_explain(
504 &self,
505 session: &Session,
506 SubscribeExplain {
507 optimizer,
508 df_meta,
509 cluster_id,
510 explain_ctx:
511 ExplainPlanContext {
512 config,
513 format,
514 stage,
515 optimizer_trace,
516 desc,
517 ..
518 },
519 ..
520 }: SubscribeExplain,
521 ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
522 let session_catalog = self.catalog().for_session(session);
523
524 let expr_humanizer = {
525 let transient_items = btreemap! {
526 optimizer.sink_id() => TransientItem::new(
527 Some(vec![GlobalId::Explain.to_string()]),
528 desc.map(|d| d.iter_names().map(|c| c.to_string()).collect()),
529 )
530 };
531 ExprHumanizerExt::new(transient_items, &session_catalog)
532 };
533
534 let target_cluster = self.catalog().get_cluster(cluster_id);
535
536 let features = OptimizerFeatures::from(self.catalog().system_config())
537 .override_from(&target_cluster.config.features())
538 .override_from(&config.features);
539
540 let rows = optimizer_trace
541 .into_rows(
542 format,
543 &config,
544 &features,
545 &expr_humanizer,
546 None,
547 Some(target_cluster),
548 df_meta,
549 stage,
550 plan::ExplaineeStatementKind::Subscribe,
551 None,
552 )
553 .await?;
554
555 Ok(StageResult::Response(Self::send_immediate_rows(rows)))
556 }
557}