1use mz_ore::instrument;
11use mz_repr::optimize::OverrideFrom;
12use mz_sql::plan::{self, QueryWhen};
13use mz_sql::session::metadata::SessionMetadata;
14use timely::progress::Antichain;
15use tokio::sync::mpsc;
16use tracing::Span;
17
18use crate::active_compute_sink::{ActiveComputeSink, ActiveSubscribe};
19use crate::command::ExecuteResponse;
20use crate::coord::sequencer::inner::{check_log_reads, return_if_err};
21use crate::coord::{
22 Coordinator, Message, PlanValidity, StageResult, Staged, SubscribeFinish, SubscribeOptimizeMir,
23 SubscribeStage, SubscribeTimestampOptimizeLir, TargetCluster,
24};
25use crate::error::AdapterError;
26use crate::optimize::Optimize;
27use crate::session::{Session, TransactionOps};
28use crate::{AdapterNotice, ExecuteContext, TimelineContext, optimize};
29
30impl Staged for SubscribeStage {
31 type Ctx = ExecuteContext;
32
33 fn validity(&mut self) -> &mut PlanValidity {
34 match self {
35 SubscribeStage::OptimizeMir(stage) => &mut stage.validity,
36 SubscribeStage::TimestampOptimizeLir(stage) => &mut stage.validity,
37 SubscribeStage::Finish(stage) => &mut stage.validity,
38 }
39 }
40
41 async fn stage(
42 self,
43 coord: &mut Coordinator,
44 ctx: &mut ExecuteContext,
45 ) -> Result<StageResult<Box<Self>>, AdapterError> {
46 match self {
47 SubscribeStage::OptimizeMir(stage) => {
48 coord.subscribe_optimize_mir(ctx.session(), stage)
49 }
50 SubscribeStage::TimestampOptimizeLir(stage) => {
51 coord.subscribe_timestamp_optimize_lir(ctx, stage).await
52 }
53 SubscribeStage::Finish(stage) => coord.subscribe_finish(ctx, stage).await,
54 }
55 }
56
57 fn message(self, ctx: ExecuteContext, span: Span) -> Message {
58 Message::SubscribeStageReady {
59 ctx,
60 span,
61 stage: self,
62 }
63 }
64
65 fn cancel_enabled(&self) -> bool {
66 true
67 }
68}
69
70impl Coordinator {
71 #[instrument]
72 pub(crate) async fn sequence_subscribe(
73 &mut self,
74 mut ctx: ExecuteContext,
75 plan: plan::SubscribePlan,
76 target_cluster: TargetCluster,
77 ) {
78 let stage = return_if_err!(
79 self.subscribe_validate(ctx.session_mut(), plan, target_cluster),
80 ctx
81 );
82 self.sequence_staged(ctx, Span::current(), stage).await;
83 }
84
85 #[instrument]
86 fn subscribe_validate(
87 &mut self,
88 session: &mut Session,
89 plan: plan::SubscribePlan,
90 target_cluster: TargetCluster,
91 ) -> Result<SubscribeStage, AdapterError> {
92 let plan::SubscribePlan { from, when, .. } = &plan;
93
94 let cluster = self
95 .catalog()
96 .resolve_target_cluster(target_cluster, session)?;
97 let cluster_id = cluster.id;
98
99 if cluster.replicas().next().is_none() {
100 return Err(AdapterError::NoClusterReplicasAvailable {
101 name: cluster.name.clone(),
102 is_managed: cluster.is_managed(),
103 });
104 }
105
106 let mut replica_id = session
107 .vars()
108 .cluster_replica()
109 .map(|name| {
110 cluster
111 .replica_id(name)
112 .ok_or(AdapterError::UnknownClusterReplica {
113 cluster_name: cluster.name.clone(),
114 replica_name: name.to_string(),
115 })
116 })
117 .transpose()?;
118
119 if when == &QueryWhen::Immediately {
122 session.add_transaction_ops(TransactionOps::Subscribe)?;
125 }
126
127 let depends_on = from.depends_on();
128
129 let notices = check_log_reads(
131 self.catalog(),
132 cluster,
133 &depends_on,
134 &mut replica_id,
135 session.vars(),
136 )?;
137 session.add_notices(notices);
138
139 let mut timeline = self.validate_timeline_context(depends_on.iter().copied())?;
141 if matches!(timeline, TimelineContext::TimestampIndependent) && from.contains_temporal() {
142 timeline = TimelineContext::TimestampDependent;
145 }
146
147 let dependencies = depends_on
148 .iter()
149 .map(|id| self.catalog().resolve_item_id(id))
150 .collect();
151 let validity = PlanValidity::new(
152 self.catalog().transient_revision(),
153 dependencies,
154 Some(cluster_id),
155 replica_id,
156 session.role_metadata().clone(),
157 );
158
159 Ok(SubscribeStage::OptimizeMir(SubscribeOptimizeMir {
160 validity,
161 plan,
162 timeline,
163 dependency_ids: depends_on,
164 cluster_id,
165 replica_id,
166 }))
167 }
168
169 #[instrument]
170 fn subscribe_optimize_mir(
171 &mut self,
172 session: &Session,
173 SubscribeOptimizeMir {
174 mut validity,
175 plan,
176 timeline,
177 dependency_ids,
178 cluster_id,
179 replica_id,
180 }: SubscribeOptimizeMir,
181 ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
182 let plan::SubscribePlan {
183 with_snapshot,
184 up_to,
185 ..
186 } = &plan;
187
188 let compute_instance = self
190 .instance_snapshot(cluster_id)
191 .expect("compute instance does not exist");
192 let (_, view_id) = self.allocate_transient_id();
193 let (_, sink_id) = self.allocate_transient_id();
194 let conn_id = session.conn_id().clone();
195 let up_to = up_to
196 .as_ref()
197 .map(|expr| Coordinator::evaluate_when(self.catalog().state(), expr.clone(), session))
198 .transpose()?;
199 let debug_name = format!("subscribe-{}", sink_id);
200 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
201 .override_from(&self.catalog.get_cluster(cluster_id).config.features());
202
203 let mut optimizer = optimize::subscribe::Optimizer::new(
205 self.owned_catalog(),
206 compute_instance,
207 view_id,
208 sink_id,
209 Some(conn_id),
210 *with_snapshot,
211 up_to,
212 debug_name,
213 optimizer_config,
214 self.optimizer_metrics(),
215 );
216 let catalog = self.owned_catalog();
217
218 let span = Span::current();
219 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
220 || "optimize subscribe (mir)",
221 move || {
222 span.in_scope(|| {
223 let global_mir_plan = optimizer.catch_unwind_optimize(plan.from.clone())?;
225 validity.extend_dependencies(
227 global_mir_plan
228 .id_bundle(optimizer.cluster_id())
229 .iter()
230 .map(|id| catalog.resolve_item_id(&id)),
231 );
232
233 let stage =
234 SubscribeStage::TimestampOptimizeLir(SubscribeTimestampOptimizeLir {
235 validity,
236 plan,
237 timeline,
238 optimizer,
239 global_mir_plan,
240 dependency_ids,
241 replica_id,
242 });
243 Ok(Box::new(stage))
244 })
245 },
246 )))
247 }
248
249 #[instrument]
250 async fn subscribe_timestamp_optimize_lir(
251 &mut self,
252 ctx: &ExecuteContext,
253 SubscribeTimestampOptimizeLir {
254 validity,
255 plan,
256 timeline,
257 mut optimizer,
258 global_mir_plan,
259 dependency_ids,
260 replica_id,
261 }: SubscribeTimestampOptimizeLir,
262 ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
263 let plan::SubscribePlan { when, .. } = &plan;
264
265 let oracle_read_ts = self.oracle_read_ts(ctx.session(), &timeline, when).await;
267 let bundle = &global_mir_plan.id_bundle(optimizer.cluster_id());
268 let (determination, read_holds) = self.determine_timestamp(
269 ctx.session(),
270 bundle,
271 when,
272 optimizer.cluster_id(),
273 &timeline,
274 oracle_read_ts,
275 None,
276 )?;
277
278 let as_of = determination.timestamp_context.timestamp_or_default();
279
280 if let Some(id) = ctx.extra().contents() {
281 self.set_statement_execution_timestamp(id, as_of);
282 }
283 if let Some(up_to) = optimizer.up_to() {
284 if as_of == up_to {
285 ctx.session()
286 .add_notice(AdapterNotice::EqualSubscribeBounds { bound: up_to });
287 } else if as_of > up_to {
288 return Err(AdapterError::AbsurdSubscribeBounds { as_of, up_to });
289 }
290 }
291
292 self.store_transaction_read_holds(ctx.session(), read_holds);
293
294 let global_mir_plan = global_mir_plan.resolve(Antichain::from_elem(as_of));
295
296 let span = Span::current();
298 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
299 || "optimize subscribe (lir)",
300 move || {
301 span.in_scope(|| {
302 let global_lir_plan =
304 optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
305
306 let stage = SubscribeStage::Finish(SubscribeFinish {
307 validity,
308 cluster_id: optimizer.cluster_id(),
309 plan,
310 global_lir_plan,
311 dependency_ids,
312 replica_id,
313 });
314 Ok(Box::new(stage))
315 })
316 },
317 )))
318 }
319
320 #[instrument]
321 async fn subscribe_finish(
322 &mut self,
323 ctx: &mut ExecuteContext,
324 SubscribeFinish {
325 validity: _,
326 cluster_id,
327 plan:
328 plan::SubscribePlan {
329 copy_to,
330 emit_progress,
331 output,
332 ..
333 },
334 global_lir_plan,
335 dependency_ids,
336 replica_id,
337 }: SubscribeFinish,
338 ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
339 let sink_id = global_lir_plan.sink_id();
340
341 let (tx, rx) = mpsc::unbounded_channel();
342 let active_subscribe = ActiveSubscribe {
343 conn_id: ctx.session().conn_id().clone(),
344 session_uuid: ctx.session().uuid(),
345 channel: tx,
346 emit_progress,
347 as_of: global_lir_plan
348 .as_of()
349 .expect("set to Some in an earlier stage"),
350 arity: global_lir_plan.sink_desc().from_desc.arity(),
351 cluster_id,
352 depends_on: dependency_ids,
353 start_time: self.now(),
354 output,
355 };
356 active_subscribe.initialize();
357
358 let (df_desc, df_meta) = global_lir_plan.unapply();
359
360 self.emit_optimizer_notices(ctx.session(), &df_meta.optimizer_notices);
362
363 let write_notify_fut = self
365 .add_active_compute_sink(sink_id, ActiveComputeSink::Subscribe(active_subscribe))
366 .await;
367 let ship_dataflow_fut = self.ship_dataflow(df_desc, cluster_id, replica_id);
369
370 let ((), ()) = futures::future::join(write_notify_fut, ship_dataflow_fut).await;
373
374 let txn_read_holds = self
376 .txn_read_holds
377 .remove(ctx.session().conn_id())
378 .expect("must have previously installed read holds");
379
380 drop(txn_read_holds);
382
383 let resp = ExecuteResponse::Subscribing {
384 rx,
385 ctx_extra: std::mem::take(ctx.extra_mut()),
386 instance_id: cluster_id,
387 };
388 let resp = match copy_to {
389 None => resp,
390 Some(format) => ExecuteResponse::CopyTo {
391 format,
392 resp: Box::new(resp),
393 },
394 };
395 Ok(StageResult::Response(resp))
396 }
397}