1use mz_ore::instrument;
11use mz_repr::optimize::OverrideFrom;
12use mz_sql::plan::{self, QueryWhen};
13use mz_sql::session::metadata::SessionMetadata;
14use timely::progress::Antichain;
15use tokio::sync::mpsc;
16use tracing::Span;
17
18use crate::active_compute_sink::{ActiveComputeSink, ActiveSubscribe};
19use crate::command::ExecuteResponse;
20use crate::coord::sequencer::inner::return_if_err;
21use crate::coord::sequencer::{check_log_reads, emit_optimizer_notices};
22use crate::coord::{
23 Coordinator, Message, PlanValidity, StageResult, Staged, SubscribeFinish, SubscribeOptimizeMir,
24 SubscribeStage, SubscribeTimestampOptimizeLir, TargetCluster,
25};
26use crate::error::AdapterError;
27use crate::optimize::Optimize;
28use crate::session::{Session, TransactionOps};
29use crate::{AdapterNotice, ExecuteContext, TimelineContext, optimize};
30
31impl Staged for SubscribeStage {
32 type Ctx = ExecuteContext;
33
34 fn validity(&mut self) -> &mut PlanValidity {
35 match self {
36 SubscribeStage::OptimizeMir(stage) => &mut stage.validity,
37 SubscribeStage::TimestampOptimizeLir(stage) => &mut stage.validity,
38 SubscribeStage::Finish(stage) => &mut stage.validity,
39 }
40 }
41
42 async fn stage(
43 self,
44 coord: &mut Coordinator,
45 ctx: &mut ExecuteContext,
46 ) -> Result<StageResult<Box<Self>>, AdapterError> {
47 match self {
48 SubscribeStage::OptimizeMir(stage) => coord.subscribe_optimize_mir(stage),
49 SubscribeStage::TimestampOptimizeLir(stage) => {
50 coord.subscribe_timestamp_optimize_lir(ctx, stage).await
51 }
52 SubscribeStage::Finish(stage) => coord.subscribe_finish(ctx, stage).await,
53 }
54 }
55
56 fn message(self, ctx: ExecuteContext, span: Span) -> Message {
57 Message::SubscribeStageReady {
58 ctx,
59 span,
60 stage: self,
61 }
62 }
63
64 fn cancel_enabled(&self) -> bool {
65 true
66 }
67}
68
69impl Coordinator {
70 #[instrument]
71 pub(crate) async fn sequence_subscribe(
72 &mut self,
73 mut ctx: ExecuteContext,
74 plan: plan::SubscribePlan,
75 target_cluster: TargetCluster,
76 ) {
77 let stage = return_if_err!(
78 self.subscribe_validate(ctx.session_mut(), plan, target_cluster),
79 ctx
80 );
81 self.sequence_staged(ctx, Span::current(), stage).await;
82 }
83
84 #[instrument]
85 fn subscribe_validate(
86 &self,
87 session: &mut Session,
88 plan: plan::SubscribePlan,
89 target_cluster: TargetCluster,
90 ) -> Result<SubscribeStage, AdapterError> {
91 let plan::SubscribePlan { from, when, .. } = &plan;
92
93 let cluster = self
94 .catalog()
95 .resolve_target_cluster(target_cluster, session)?;
96 let cluster_id = cluster.id;
97
98 if cluster.replicas().next().is_none() {
99 return Err(AdapterError::NoClusterReplicasAvailable {
100 name: cluster.name.clone(),
101 is_managed: cluster.is_managed(),
102 });
103 }
104
105 let mut replica_id = session
106 .vars()
107 .cluster_replica()
108 .map(|name| {
109 cluster
110 .replica_id(name)
111 .ok_or(AdapterError::UnknownClusterReplica {
112 cluster_name: cluster.name.clone(),
113 replica_name: name.to_string(),
114 })
115 })
116 .transpose()?;
117
118 if when == &QueryWhen::Immediately {
121 session.add_transaction_ops(TransactionOps::Subscribe)?;
124 }
125
126 let depends_on = from.depends_on();
127
128 let notices = check_log_reads(
130 self.catalog(),
131 cluster,
132 &depends_on,
133 &mut replica_id,
134 session.vars(),
135 )?;
136 session.add_notices(notices);
137
138 let mut timeline = self
140 .catalog()
141 .validate_timeline_context(depends_on.iter().copied())?;
142 if matches!(timeline, TimelineContext::TimestampIndependent) && from.contains_temporal() {
143 timeline = TimelineContext::TimestampDependent;
146 }
147
148 let dependencies = depends_on
149 .iter()
150 .map(|id| self.catalog().resolve_item_id(id))
151 .collect();
152 let validity = PlanValidity::new(
153 self.catalog().transient_revision(),
154 dependencies,
155 Some(cluster_id),
156 replica_id,
157 session.role_metadata().clone(),
158 );
159
160 Ok(SubscribeStage::OptimizeMir(SubscribeOptimizeMir {
161 validity,
162 plan,
163 timeline,
164 dependency_ids: depends_on,
165 cluster_id,
166 replica_id,
167 }))
168 }
169
170 #[instrument]
171 fn subscribe_optimize_mir(
172 &self,
173 SubscribeOptimizeMir {
174 mut validity,
175 plan,
176 timeline,
177 dependency_ids,
178 cluster_id,
179 replica_id,
180 }: SubscribeOptimizeMir,
181 ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
182 let plan::SubscribePlan {
183 with_snapshot,
184 up_to,
185 ..
186 } = &plan;
187
188 let compute_instance = self
190 .instance_snapshot(cluster_id)
191 .expect("compute instance does not exist");
192 let (_, view_id) = self.allocate_transient_id();
193 let (_, sink_id) = self.allocate_transient_id();
194 let debug_name = format!("subscribe-{}", sink_id);
195 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
196 .override_from(&self.catalog.get_cluster(cluster_id).config.features());
197
198 let mut optimizer = optimize::subscribe::Optimizer::new(
200 self.owned_catalog(),
201 compute_instance,
202 view_id,
203 sink_id,
204 *with_snapshot,
205 *up_to,
206 debug_name,
207 optimizer_config,
208 self.optimizer_metrics(),
209 );
210 let catalog = self.owned_catalog();
211
212 let span = Span::current();
213 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
214 || "optimize subscribe (mir)",
215 move || {
216 span.in_scope(|| {
217 let global_mir_plan = optimizer.catch_unwind_optimize(plan.from.clone())?;
219 validity.extend_dependencies(
221 global_mir_plan
222 .id_bundle(optimizer.cluster_id())
223 .iter()
224 .map(|id| catalog.resolve_item_id(&id)),
225 );
226
227 let stage =
228 SubscribeStage::TimestampOptimizeLir(SubscribeTimestampOptimizeLir {
229 validity,
230 plan,
231 timeline,
232 optimizer,
233 global_mir_plan,
234 dependency_ids,
235 replica_id,
236 });
237 Ok(Box::new(stage))
238 })
239 },
240 )))
241 }
242
243 #[instrument]
244 async fn subscribe_timestamp_optimize_lir(
245 &mut self,
246 ctx: &ExecuteContext,
247 SubscribeTimestampOptimizeLir {
248 validity,
249 plan,
250 timeline,
251 mut optimizer,
252 global_mir_plan,
253 dependency_ids,
254 replica_id,
255 }: SubscribeTimestampOptimizeLir,
256 ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
257 let plan::SubscribePlan { when, .. } = &plan;
258
259 let oracle_read_ts = self.oracle_read_ts(ctx.session(), &timeline, when).await;
261 let bundle = &global_mir_plan.id_bundle(optimizer.cluster_id());
262 let (determination, read_holds) = self.determine_timestamp(
263 ctx.session(),
264 bundle,
265 when,
266 optimizer.cluster_id(),
267 &timeline,
268 oracle_read_ts,
269 None,
270 )?;
271
272 let as_of = determination.timestamp_context.timestamp_or_default();
273
274 if let Some(id) = ctx.extra().contents() {
275 self.set_statement_execution_timestamp(id, as_of);
276 }
277 if let Some(up_to) = optimizer.up_to() {
278 if as_of == up_to {
279 ctx.session()
280 .add_notice(AdapterNotice::EqualSubscribeBounds { bound: up_to });
281 } else if as_of > up_to {
282 return Err(AdapterError::AbsurdSubscribeBounds { as_of, up_to });
283 }
284 }
285
286 self.store_transaction_read_holds(ctx.session().conn_id().clone(), read_holds);
287
288 let global_mir_plan = global_mir_plan.resolve(Antichain::from_elem(as_of));
289
290 let span = Span::current();
292 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
293 || "optimize subscribe (lir)",
294 move || {
295 span.in_scope(|| {
296 let global_lir_plan =
298 optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
299
300 let stage = SubscribeStage::Finish(SubscribeFinish {
301 validity,
302 cluster_id: optimizer.cluster_id(),
303 plan,
304 global_lir_plan,
305 dependency_ids,
306 replica_id,
307 });
308 Ok(Box::new(stage))
309 })
310 },
311 )))
312 }
313
314 #[instrument]
315 async fn subscribe_finish(
316 &mut self,
317 ctx: &mut ExecuteContext,
318 SubscribeFinish {
319 validity: _,
320 cluster_id,
321 plan:
322 plan::SubscribePlan {
323 copy_to,
324 emit_progress,
325 output,
326 ..
327 },
328 global_lir_plan,
329 dependency_ids,
330 replica_id,
331 }: SubscribeFinish,
332 ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
333 let sink_id = global_lir_plan.sink_id();
334
335 let (tx, rx) = mpsc::unbounded_channel();
336 let active_subscribe = ActiveSubscribe {
337 conn_id: ctx.session().conn_id().clone(),
338 session_uuid: ctx.session().uuid(),
339 channel: tx,
340 emit_progress,
341 as_of: global_lir_plan
342 .as_of()
343 .expect("set to Some in an earlier stage"),
344 arity: global_lir_plan.sink_desc().from_desc.arity(),
345 cluster_id,
346 depends_on: dependency_ids,
347 start_time: self.now(),
348 output,
349 };
350 active_subscribe.initialize();
351
352 let (df_desc, df_meta) = global_lir_plan.unapply();
353
354 emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices);
356
357 let write_notify_fut = self
359 .add_active_compute_sink(sink_id, ActiveComputeSink::Subscribe(active_subscribe))
360 .await;
361 let ship_dataflow_fut = self.ship_dataflow(df_desc, cluster_id, replica_id);
363
364 let ((), ()) = futures::future::join(write_notify_fut, ship_dataflow_fut).await;
367
368 let txn_read_holds = self
370 .txn_read_holds
371 .remove(ctx.session().conn_id())
372 .expect("must have previously installed read holds");
373
374 drop(txn_read_holds);
376
377 let resp = ExecuteResponse::Subscribing {
378 rx,
379 ctx_extra: std::mem::take(ctx.extra_mut()),
380 instance_id: cluster_id,
381 };
382 let resp = match copy_to {
383 None => resp,
384 Some(format) => ExecuteResponse::CopyTo {
385 format,
386 resp: Box::new(resp),
387 },
388 };
389 Ok(StageResult::Response(resp))
390 }
391}