1use mz_ore::instrument;
11use mz_repr::optimize::OverrideFrom;
12use mz_sql::plan::{self, QueryWhen};
13use mz_sql::session::metadata::SessionMetadata;
14use timely::progress::Antichain;
15use tokio::sync::mpsc;
16use tracing::Span;
17
18use crate::active_compute_sink::{ActiveComputeSink, ActiveSubscribe};
19use crate::command::ExecuteResponse;
20use crate::coord::sequencer::inner::return_if_err;
21use crate::coord::sequencer::{check_log_reads, emit_optimizer_notices};
22use crate::coord::{
23 Coordinator, Message, PlanValidity, StageResult, Staged, SubscribeFinish, SubscribeOptimizeMir,
24 SubscribeStage, SubscribeTimestampOptimizeLir, TargetCluster,
25};
26use crate::error::AdapterError;
27use crate::optimize::Optimize;
28use crate::session::{Session, TransactionOps};
29use crate::{AdapterNotice, ExecuteContext, TimelineContext, optimize};
30
31impl Staged for SubscribeStage {
32 type Ctx = ExecuteContext;
33
34 fn validity(&mut self) -> &mut PlanValidity {
35 match self {
36 SubscribeStage::OptimizeMir(stage) => &mut stage.validity,
37 SubscribeStage::TimestampOptimizeLir(stage) => &mut stage.validity,
38 SubscribeStage::Finish(stage) => &mut stage.validity,
39 }
40 }
41
42 async fn stage(
43 self,
44 coord: &mut Coordinator,
45 ctx: &mut ExecuteContext,
46 ) -> Result<StageResult<Box<Self>>, AdapterError> {
47 match self {
48 SubscribeStage::OptimizeMir(stage) => {
49 coord.subscribe_optimize_mir(ctx.session(), stage)
50 }
51 SubscribeStage::TimestampOptimizeLir(stage) => {
52 coord.subscribe_timestamp_optimize_lir(ctx, stage).await
53 }
54 SubscribeStage::Finish(stage) => coord.subscribe_finish(ctx, stage).await,
55 }
56 }
57
58 fn message(self, ctx: ExecuteContext, span: Span) -> Message {
59 Message::SubscribeStageReady {
60 ctx,
61 span,
62 stage: self,
63 }
64 }
65
66 fn cancel_enabled(&self) -> bool {
67 true
68 }
69}
70
71impl Coordinator {
72 #[instrument]
73 pub(crate) async fn sequence_subscribe(
74 &mut self,
75 mut ctx: ExecuteContext,
76 plan: plan::SubscribePlan,
77 target_cluster: TargetCluster,
78 ) {
79 let stage = return_if_err!(
80 self.subscribe_validate(ctx.session_mut(), plan, target_cluster),
81 ctx
82 );
83 self.sequence_staged(ctx, Span::current(), stage).await;
84 }
85
86 #[instrument]
87 fn subscribe_validate(
88 &mut self,
89 session: &mut Session,
90 plan: plan::SubscribePlan,
91 target_cluster: TargetCluster,
92 ) -> Result<SubscribeStage, AdapterError> {
93 let plan::SubscribePlan { from, when, .. } = &plan;
94
95 let cluster = self
96 .catalog()
97 .resolve_target_cluster(target_cluster, session)?;
98 let cluster_id = cluster.id;
99
100 if cluster.replicas().next().is_none() {
101 return Err(AdapterError::NoClusterReplicasAvailable {
102 name: cluster.name.clone(),
103 is_managed: cluster.is_managed(),
104 });
105 }
106
107 let mut replica_id = session
108 .vars()
109 .cluster_replica()
110 .map(|name| {
111 cluster
112 .replica_id(name)
113 .ok_or(AdapterError::UnknownClusterReplica {
114 cluster_name: cluster.name.clone(),
115 replica_name: name.to_string(),
116 })
117 })
118 .transpose()?;
119
120 if when == &QueryWhen::Immediately {
123 session.add_transaction_ops(TransactionOps::Subscribe)?;
126 }
127
128 let depends_on = from.depends_on();
129
130 let notices = check_log_reads(
132 self.catalog(),
133 cluster,
134 &depends_on,
135 &mut replica_id,
136 session.vars(),
137 )?;
138 session.add_notices(notices);
139
140 let mut timeline = self
142 .catalog()
143 .validate_timeline_context(depends_on.iter().copied())?;
144 if matches!(timeline, TimelineContext::TimestampIndependent) && from.contains_temporal() {
145 timeline = TimelineContext::TimestampDependent;
148 }
149
150 let dependencies = depends_on
151 .iter()
152 .map(|id| self.catalog().resolve_item_id(id))
153 .collect();
154 let validity = PlanValidity::new(
155 self.catalog().transient_revision(),
156 dependencies,
157 Some(cluster_id),
158 replica_id,
159 session.role_metadata().clone(),
160 );
161
162 Ok(SubscribeStage::OptimizeMir(SubscribeOptimizeMir {
163 validity,
164 plan,
165 timeline,
166 dependency_ids: depends_on,
167 cluster_id,
168 replica_id,
169 }))
170 }
171
172 #[instrument]
173 fn subscribe_optimize_mir(
174 &mut self,
175 session: &Session,
176 SubscribeOptimizeMir {
177 mut validity,
178 plan,
179 timeline,
180 dependency_ids,
181 cluster_id,
182 replica_id,
183 }: SubscribeOptimizeMir,
184 ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
185 let plan::SubscribePlan {
186 with_snapshot,
187 up_to,
188 ..
189 } = &plan;
190
191 let compute_instance = self
193 .instance_snapshot(cluster_id)
194 .expect("compute instance does not exist");
195 let (_, view_id) = self.allocate_transient_id();
196 let (_, sink_id) = self.allocate_transient_id();
197 let conn_id = session.conn_id().clone();
198 let debug_name = format!("subscribe-{}", sink_id);
199 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
200 .override_from(&self.catalog.get_cluster(cluster_id).config.features());
201
202 let mut optimizer = optimize::subscribe::Optimizer::new(
204 self.owned_catalog(),
205 compute_instance,
206 view_id,
207 sink_id,
208 Some(conn_id),
209 *with_snapshot,
210 *up_to,
211 debug_name,
212 optimizer_config,
213 self.optimizer_metrics(),
214 );
215 let catalog = self.owned_catalog();
216
217 let span = Span::current();
218 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
219 || "optimize subscribe (mir)",
220 move || {
221 span.in_scope(|| {
222 let global_mir_plan = optimizer.catch_unwind_optimize(plan.from.clone())?;
224 validity.extend_dependencies(
226 global_mir_plan
227 .id_bundle(optimizer.cluster_id())
228 .iter()
229 .map(|id| catalog.resolve_item_id(&id)),
230 );
231
232 let stage =
233 SubscribeStage::TimestampOptimizeLir(SubscribeTimestampOptimizeLir {
234 validity,
235 plan,
236 timeline,
237 optimizer,
238 global_mir_plan,
239 dependency_ids,
240 replica_id,
241 });
242 Ok(Box::new(stage))
243 })
244 },
245 )))
246 }
247
248 #[instrument]
249 async fn subscribe_timestamp_optimize_lir(
250 &mut self,
251 ctx: &ExecuteContext,
252 SubscribeTimestampOptimizeLir {
253 validity,
254 plan,
255 timeline,
256 mut optimizer,
257 global_mir_plan,
258 dependency_ids,
259 replica_id,
260 }: SubscribeTimestampOptimizeLir,
261 ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
262 let plan::SubscribePlan { when, .. } = &plan;
263
264 let oracle_read_ts = self.oracle_read_ts(ctx.session(), &timeline, when).await;
266 let bundle = &global_mir_plan.id_bundle(optimizer.cluster_id());
267 let (determination, read_holds) = self.determine_timestamp(
268 ctx.session(),
269 bundle,
270 when,
271 optimizer.cluster_id(),
272 &timeline,
273 oracle_read_ts,
274 None,
275 )?;
276
277 let as_of = determination.timestamp_context.timestamp_or_default();
278
279 if let Some(id) = ctx.extra().contents() {
280 self.set_statement_execution_timestamp(id, as_of);
281 }
282 if let Some(up_to) = optimizer.up_to() {
283 if as_of == up_to {
284 ctx.session()
285 .add_notice(AdapterNotice::EqualSubscribeBounds { bound: up_to });
286 } else if as_of > up_to {
287 return Err(AdapterError::AbsurdSubscribeBounds { as_of, up_to });
288 }
289 }
290
291 self.store_transaction_read_holds(ctx.session().conn_id().clone(), read_holds);
292
293 let global_mir_plan = global_mir_plan.resolve(Antichain::from_elem(as_of));
294
295 let span = Span::current();
297 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
298 || "optimize subscribe (lir)",
299 move || {
300 span.in_scope(|| {
301 let global_lir_plan =
303 optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
304
305 let stage = SubscribeStage::Finish(SubscribeFinish {
306 validity,
307 cluster_id: optimizer.cluster_id(),
308 plan,
309 global_lir_plan,
310 dependency_ids,
311 replica_id,
312 });
313 Ok(Box::new(stage))
314 })
315 },
316 )))
317 }
318
319 #[instrument]
320 async fn subscribe_finish(
321 &mut self,
322 ctx: &mut ExecuteContext,
323 SubscribeFinish {
324 validity: _,
325 cluster_id,
326 plan:
327 plan::SubscribePlan {
328 copy_to,
329 emit_progress,
330 output,
331 ..
332 },
333 global_lir_plan,
334 dependency_ids,
335 replica_id,
336 }: SubscribeFinish,
337 ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
338 let sink_id = global_lir_plan.sink_id();
339
340 let (tx, rx) = mpsc::unbounded_channel();
341 let active_subscribe = ActiveSubscribe {
342 conn_id: ctx.session().conn_id().clone(),
343 session_uuid: ctx.session().uuid(),
344 channel: tx,
345 emit_progress,
346 as_of: global_lir_plan
347 .as_of()
348 .expect("set to Some in an earlier stage"),
349 arity: global_lir_plan.sink_desc().from_desc.arity(),
350 cluster_id,
351 depends_on: dependency_ids,
352 start_time: self.now(),
353 output,
354 };
355 active_subscribe.initialize();
356
357 let (df_desc, df_meta) = global_lir_plan.unapply();
358
359 emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices);
361
362 let write_notify_fut = self
364 .add_active_compute_sink(sink_id, ActiveComputeSink::Subscribe(active_subscribe))
365 .await;
366 let ship_dataflow_fut = self.ship_dataflow(df_desc, cluster_id, replica_id);
368
369 let ((), ()) = futures::future::join(write_notify_fut, ship_dataflow_fut).await;
372
373 let txn_read_holds = self
375 .txn_read_holds
376 .remove(ctx.session().conn_id())
377 .expect("must have previously installed read holds");
378
379 drop(txn_read_holds);
381
382 let resp = ExecuteResponse::Subscribing {
383 rx,
384 ctx_extra: std::mem::take(ctx.extra_mut()),
385 instance_id: cluster_id,
386 };
387 let resp = match copy_to {
388 None => resp,
389 Some(format) => ExecuteResponse::CopyTo {
390 format,
391 resp: Box::new(resp),
392 },
393 };
394 Ok(StageResult::Response(resp))
395 }
396}