1use mz_ore::instrument;
11use mz_repr::optimize::OverrideFrom;
12use mz_sql::plan::{self, QueryWhen};
13use mz_sql::session::metadata::SessionMetadata;
14use timely::progress::Antichain;
15use tokio::sync::mpsc;
16use tracing::Span;
17
18use crate::active_compute_sink::{ActiveComputeSink, ActiveSubscribe};
19use crate::command::ExecuteResponse;
20use crate::coord::sequencer::inner::{check_log_reads, return_if_err};
21use crate::coord::{
22 Coordinator, Message, PlanValidity, StageResult, Staged, SubscribeFinish, SubscribeOptimizeMir,
23 SubscribeStage, SubscribeTimestampOptimizeLir, TargetCluster,
24};
25use crate::error::AdapterError;
26use crate::optimize::Optimize;
27use crate::session::{Session, TransactionOps};
28use crate::{AdapterNotice, ExecuteContext, TimelineContext, optimize};
29
30impl Staged for SubscribeStage {
31 type Ctx = ExecuteContext;
32
33 fn validity(&mut self) -> &mut PlanValidity {
34 match self {
35 SubscribeStage::OptimizeMir(stage) => &mut stage.validity,
36 SubscribeStage::TimestampOptimizeLir(stage) => &mut stage.validity,
37 SubscribeStage::Finish(stage) => &mut stage.validity,
38 }
39 }
40
41 async fn stage(
42 self,
43 coord: &mut Coordinator,
44 ctx: &mut ExecuteContext,
45 ) -> Result<StageResult<Box<Self>>, AdapterError> {
46 match self {
47 SubscribeStage::OptimizeMir(stage) => {
48 coord.subscribe_optimize_mir(ctx.session(), stage)
49 }
50 SubscribeStage::TimestampOptimizeLir(stage) => {
51 coord.subscribe_timestamp_optimize_lir(ctx, stage).await
52 }
53 SubscribeStage::Finish(stage) => coord.subscribe_finish(ctx, stage).await,
54 }
55 }
56
57 fn message(self, ctx: ExecuteContext, span: Span) -> Message {
58 Message::SubscribeStageReady {
59 ctx,
60 span,
61 stage: self,
62 }
63 }
64
65 fn cancel_enabled(&self) -> bool {
66 true
67 }
68}
69
70impl Coordinator {
71 #[instrument]
72 pub(crate) async fn sequence_subscribe(
73 &mut self,
74 mut ctx: ExecuteContext,
75 plan: plan::SubscribePlan,
76 target_cluster: TargetCluster,
77 ) {
78 let stage = return_if_err!(
79 self.subscribe_validate(ctx.session_mut(), plan, target_cluster),
80 ctx
81 );
82 self.sequence_staged(ctx, Span::current(), stage).await;
83 }
84
85 #[instrument]
86 fn subscribe_validate(
87 &mut self,
88 session: &mut Session,
89 plan: plan::SubscribePlan,
90 target_cluster: TargetCluster,
91 ) -> Result<SubscribeStage, AdapterError> {
92 let plan::SubscribePlan { from, when, .. } = &plan;
93
94 let cluster = self
95 .catalog()
96 .resolve_target_cluster(target_cluster, session)?;
97 let cluster_id = cluster.id;
98
99 if cluster.replicas().next().is_none() {
100 return Err(AdapterError::NoClusterReplicasAvailable {
101 name: cluster.name.clone(),
102 is_managed: cluster.is_managed(),
103 });
104 }
105
106 let mut replica_id = session
107 .vars()
108 .cluster_replica()
109 .map(|name| {
110 cluster
111 .replica_id(name)
112 .ok_or(AdapterError::UnknownClusterReplica {
113 cluster_name: cluster.name.clone(),
114 replica_name: name.to_string(),
115 })
116 })
117 .transpose()?;
118
119 if when == &QueryWhen::Immediately {
122 session.add_transaction_ops(TransactionOps::Subscribe)?;
125 }
126
127 let depends_on = from.depends_on();
128
129 let notices = check_log_reads(
131 self.catalog(),
132 cluster,
133 &depends_on,
134 &mut replica_id,
135 session.vars(),
136 )?;
137 session.add_notices(notices);
138
139 let mut timeline = self
141 .catalog()
142 .validate_timeline_context(depends_on.iter().copied())?;
143 if matches!(timeline, TimelineContext::TimestampIndependent) && from.contains_temporal() {
144 timeline = TimelineContext::TimestampDependent;
147 }
148
149 let dependencies = depends_on
150 .iter()
151 .map(|id| self.catalog().resolve_item_id(id))
152 .collect();
153 let validity = PlanValidity::new(
154 self.catalog().transient_revision(),
155 dependencies,
156 Some(cluster_id),
157 replica_id,
158 session.role_metadata().clone(),
159 );
160
161 Ok(SubscribeStage::OptimizeMir(SubscribeOptimizeMir {
162 validity,
163 plan,
164 timeline,
165 dependency_ids: depends_on,
166 cluster_id,
167 replica_id,
168 }))
169 }
170
171 #[instrument]
172 fn subscribe_optimize_mir(
173 &mut self,
174 session: &Session,
175 SubscribeOptimizeMir {
176 mut validity,
177 plan,
178 timeline,
179 dependency_ids,
180 cluster_id,
181 replica_id,
182 }: SubscribeOptimizeMir,
183 ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
184 let plan::SubscribePlan {
185 with_snapshot,
186 up_to,
187 ..
188 } = &plan;
189
190 let compute_instance = self
192 .instance_snapshot(cluster_id)
193 .expect("compute instance does not exist");
194 let (_, view_id) = self.allocate_transient_id();
195 let (_, sink_id) = self.allocate_transient_id();
196 let conn_id = session.conn_id().clone();
197 let debug_name = format!("subscribe-{}", sink_id);
198 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
199 .override_from(&self.catalog.get_cluster(cluster_id).config.features());
200
201 let mut optimizer = optimize::subscribe::Optimizer::new(
203 self.owned_catalog(),
204 compute_instance,
205 view_id,
206 sink_id,
207 Some(conn_id),
208 *with_snapshot,
209 *up_to,
210 debug_name,
211 optimizer_config,
212 self.optimizer_metrics(),
213 );
214 let catalog = self.owned_catalog();
215
216 let span = Span::current();
217 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
218 || "optimize subscribe (mir)",
219 move || {
220 span.in_scope(|| {
221 let global_mir_plan = optimizer.catch_unwind_optimize(plan.from.clone())?;
223 validity.extend_dependencies(
225 global_mir_plan
226 .id_bundle(optimizer.cluster_id())
227 .iter()
228 .map(|id| catalog.resolve_item_id(&id)),
229 );
230
231 let stage =
232 SubscribeStage::TimestampOptimizeLir(SubscribeTimestampOptimizeLir {
233 validity,
234 plan,
235 timeline,
236 optimizer,
237 global_mir_plan,
238 dependency_ids,
239 replica_id,
240 });
241 Ok(Box::new(stage))
242 })
243 },
244 )))
245 }
246
247 #[instrument]
248 async fn subscribe_timestamp_optimize_lir(
249 &mut self,
250 ctx: &ExecuteContext,
251 SubscribeTimestampOptimizeLir {
252 validity,
253 plan,
254 timeline,
255 mut optimizer,
256 global_mir_plan,
257 dependency_ids,
258 replica_id,
259 }: SubscribeTimestampOptimizeLir,
260 ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
261 let plan::SubscribePlan { when, .. } = &plan;
262
263 let oracle_read_ts = self.oracle_read_ts(ctx.session(), &timeline, when).await;
265 let bundle = &global_mir_plan.id_bundle(optimizer.cluster_id());
266 let (determination, read_holds) = self.determine_timestamp(
267 ctx.session(),
268 bundle,
269 when,
270 optimizer.cluster_id(),
271 &timeline,
272 oracle_read_ts,
273 None,
274 )?;
275
276 let as_of = determination.timestamp_context.timestamp_or_default();
277
278 if let Some(id) = ctx.extra().contents() {
279 self.set_statement_execution_timestamp(id, as_of);
280 }
281 if let Some(up_to) = optimizer.up_to() {
282 if as_of == up_to {
283 ctx.session()
284 .add_notice(AdapterNotice::EqualSubscribeBounds { bound: up_to });
285 } else if as_of > up_to {
286 return Err(AdapterError::AbsurdSubscribeBounds { as_of, up_to });
287 }
288 }
289
290 self.store_transaction_read_holds(ctx.session(), read_holds);
291
292 let global_mir_plan = global_mir_plan.resolve(Antichain::from_elem(as_of));
293
294 let span = Span::current();
296 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
297 || "optimize subscribe (lir)",
298 move || {
299 span.in_scope(|| {
300 let global_lir_plan =
302 optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
303
304 let stage = SubscribeStage::Finish(SubscribeFinish {
305 validity,
306 cluster_id: optimizer.cluster_id(),
307 plan,
308 global_lir_plan,
309 dependency_ids,
310 replica_id,
311 });
312 Ok(Box::new(stage))
313 })
314 },
315 )))
316 }
317
318 #[instrument]
319 async fn subscribe_finish(
320 &mut self,
321 ctx: &mut ExecuteContext,
322 SubscribeFinish {
323 validity: _,
324 cluster_id,
325 plan:
326 plan::SubscribePlan {
327 copy_to,
328 emit_progress,
329 output,
330 ..
331 },
332 global_lir_plan,
333 dependency_ids,
334 replica_id,
335 }: SubscribeFinish,
336 ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
337 let sink_id = global_lir_plan.sink_id();
338
339 let (tx, rx) = mpsc::unbounded_channel();
340 let active_subscribe = ActiveSubscribe {
341 conn_id: ctx.session().conn_id().clone(),
342 session_uuid: ctx.session().uuid(),
343 channel: tx,
344 emit_progress,
345 as_of: global_lir_plan
346 .as_of()
347 .expect("set to Some in an earlier stage"),
348 arity: global_lir_plan.sink_desc().from_desc.arity(),
349 cluster_id,
350 depends_on: dependency_ids,
351 start_time: self.now(),
352 output,
353 };
354 active_subscribe.initialize();
355
356 let (df_desc, df_meta) = global_lir_plan.unapply();
357
358 self.emit_optimizer_notices(ctx.session(), &df_meta.optimizer_notices);
360
361 let write_notify_fut = self
363 .add_active_compute_sink(sink_id, ActiveComputeSink::Subscribe(active_subscribe))
364 .await;
365 let ship_dataflow_fut = self.ship_dataflow(df_desc, cluster_id, replica_id);
367
368 let ((), ()) = futures::future::join(write_notify_fut, ship_dataflow_fut).await;
371
372 let txn_read_holds = self
374 .txn_read_holds
375 .remove(ctx.session().conn_id())
376 .expect("must have previously installed read holds");
377
378 drop(txn_read_holds);
380
381 let resp = ExecuteResponse::Subscribing {
382 rx,
383 ctx_extra: std::mem::take(ctx.extra_mut()),
384 instance_id: cluster_id,
385 };
386 let resp = match copy_to {
387 None => resp,
388 Some(format) => ExecuteResponse::CopyTo {
389 format,
390 resp: Box::new(resp),
391 },
392 };
393 Ok(StageResult::Response(resp))
394 }
395}