1use mz_ore::instrument;
11use mz_repr::optimize::OverrideFrom;
12use mz_sql::plan::{self, QueryWhen};
13use mz_sql::session::metadata::SessionMetadata;
14use timely::progress::Antichain;
15use tokio::sync::mpsc;
16use tracing::Span;
17
18use crate::active_compute_sink::{ActiveComputeSink, ActiveSubscribe};
19use crate::command::ExecuteResponse;
20use crate::coord::sequencer::inner::{check_log_reads, return_if_err};
21use crate::coord::{
22 Coordinator, Message, PlanValidity, StageResult, Staged, SubscribeFinish, SubscribeOptimizeMir,
23 SubscribeStage, SubscribeTimestampOptimizeLir, TargetCluster,
24};
25use crate::error::AdapterError;
26use crate::optimize::Optimize;
27use crate::session::{Session, TransactionOps};
28use crate::{AdapterNotice, ExecuteContext, TimelineContext, optimize};
29
30impl Staged for SubscribeStage {
31 type Ctx = ExecuteContext;
32
33 fn validity(&mut self) -> &mut PlanValidity {
34 match self {
35 SubscribeStage::OptimizeMir(stage) => &mut stage.validity,
36 SubscribeStage::TimestampOptimizeLir(stage) => &mut stage.validity,
37 SubscribeStage::Finish(stage) => &mut stage.validity,
38 }
39 }
40
41 async fn stage(
42 self,
43 coord: &mut Coordinator,
44 ctx: &mut ExecuteContext,
45 ) -> Result<StageResult<Box<Self>>, AdapterError> {
46 match self {
47 SubscribeStage::OptimizeMir(stage) => {
48 coord.subscribe_optimize_mir(ctx.session(), stage)
49 }
50 SubscribeStage::TimestampOptimizeLir(stage) => {
51 coord.subscribe_timestamp_optimize_lir(ctx, stage).await
52 }
53 SubscribeStage::Finish(stage) => coord.subscribe_finish(ctx, stage).await,
54 }
55 }
56
57 fn message(self, ctx: ExecuteContext, span: Span) -> Message {
58 Message::SubscribeStageReady {
59 ctx,
60 span,
61 stage: self,
62 }
63 }
64
65 fn cancel_enabled(&self) -> bool {
66 true
67 }
68}
69
70impl Coordinator {
71 #[instrument]
72 pub(crate) async fn sequence_subscribe(
73 &mut self,
74 mut ctx: ExecuteContext,
75 plan: plan::SubscribePlan,
76 target_cluster: TargetCluster,
77 ) {
78 let stage = return_if_err!(
79 self.subscribe_validate(ctx.session_mut(), plan, target_cluster),
80 ctx
81 );
82 self.sequence_staged(ctx, Span::current(), stage).await;
83 }
84
85 #[instrument]
86 fn subscribe_validate(
87 &mut self,
88 session: &mut Session,
89 plan: plan::SubscribePlan,
90 target_cluster: TargetCluster,
91 ) -> Result<SubscribeStage, AdapterError> {
92 let plan::SubscribePlan { from, when, .. } = &plan;
93
94 let cluster = self
95 .catalog()
96 .resolve_target_cluster(target_cluster, session)?;
97 let cluster_id = cluster.id;
98
99 if cluster.replicas().next().is_none() {
100 return Err(AdapterError::NoClusterReplicasAvailable {
101 name: cluster.name.clone(),
102 is_managed: cluster.is_managed(),
103 });
104 }
105
106 let mut replica_id = session
107 .vars()
108 .cluster_replica()
109 .map(|name| {
110 cluster
111 .replica_id(name)
112 .ok_or(AdapterError::UnknownClusterReplica {
113 cluster_name: cluster.name.clone(),
114 replica_name: name.to_string(),
115 })
116 })
117 .transpose()?;
118
119 if when == &QueryWhen::Immediately {
122 session.add_transaction_ops(TransactionOps::Subscribe)?;
125 }
126
127 let depends_on = from.depends_on();
128
129 let notices = check_log_reads(
131 self.catalog(),
132 cluster,
133 &depends_on,
134 &mut replica_id,
135 session.vars(),
136 )?;
137 session.add_notices(notices);
138
139 let mut timeline = self
141 .catalog()
142 .validate_timeline_context(depends_on.iter().copied())?;
143 if matches!(timeline, TimelineContext::TimestampIndependent) && from.contains_temporal() {
144 timeline = TimelineContext::TimestampDependent;
147 }
148
149 let dependencies = depends_on
150 .iter()
151 .map(|id| self.catalog().resolve_item_id(id))
152 .collect();
153 let validity = PlanValidity::new(
154 self.catalog().transient_revision(),
155 dependencies,
156 Some(cluster_id),
157 replica_id,
158 session.role_metadata().clone(),
159 );
160
161 Ok(SubscribeStage::OptimizeMir(SubscribeOptimizeMir {
162 validity,
163 plan,
164 timeline,
165 dependency_ids: depends_on,
166 cluster_id,
167 replica_id,
168 }))
169 }
170
171 #[instrument]
172 fn subscribe_optimize_mir(
173 &mut self,
174 session: &Session,
175 SubscribeOptimizeMir {
176 mut validity,
177 plan,
178 timeline,
179 dependency_ids,
180 cluster_id,
181 replica_id,
182 }: SubscribeOptimizeMir,
183 ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
184 let plan::SubscribePlan {
185 with_snapshot,
186 up_to,
187 ..
188 } = &plan;
189
190 let compute_instance = self
192 .instance_snapshot(cluster_id)
193 .expect("compute instance does not exist");
194 let (_, view_id) = self.allocate_transient_id();
195 let (_, sink_id) = self.allocate_transient_id();
196 let conn_id = session.conn_id().clone();
197 let up_to = up_to
198 .as_ref()
199 .map(|expr| Coordinator::evaluate_when(self.catalog().state(), expr.clone(), session))
200 .transpose()?;
201 let debug_name = format!("subscribe-{}", sink_id);
202 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
203 .override_from(&self.catalog.get_cluster(cluster_id).config.features());
204
205 let mut optimizer = optimize::subscribe::Optimizer::new(
207 self.owned_catalog(),
208 compute_instance,
209 view_id,
210 sink_id,
211 Some(conn_id),
212 *with_snapshot,
213 up_to,
214 debug_name,
215 optimizer_config,
216 self.optimizer_metrics(),
217 );
218 let catalog = self.owned_catalog();
219
220 let span = Span::current();
221 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
222 || "optimize subscribe (mir)",
223 move || {
224 span.in_scope(|| {
225 let global_mir_plan = optimizer.catch_unwind_optimize(plan.from.clone())?;
227 validity.extend_dependencies(
229 global_mir_plan
230 .id_bundle(optimizer.cluster_id())
231 .iter()
232 .map(|id| catalog.resolve_item_id(&id)),
233 );
234
235 let stage =
236 SubscribeStage::TimestampOptimizeLir(SubscribeTimestampOptimizeLir {
237 validity,
238 plan,
239 timeline,
240 optimizer,
241 global_mir_plan,
242 dependency_ids,
243 replica_id,
244 });
245 Ok(Box::new(stage))
246 })
247 },
248 )))
249 }
250
251 #[instrument]
252 async fn subscribe_timestamp_optimize_lir(
253 &mut self,
254 ctx: &ExecuteContext,
255 SubscribeTimestampOptimizeLir {
256 validity,
257 plan,
258 timeline,
259 mut optimizer,
260 global_mir_plan,
261 dependency_ids,
262 replica_id,
263 }: SubscribeTimestampOptimizeLir,
264 ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
265 let plan::SubscribePlan { when, .. } = &plan;
266
267 let oracle_read_ts = self.oracle_read_ts(ctx.session(), &timeline, when).await;
269 let bundle = &global_mir_plan.id_bundle(optimizer.cluster_id());
270 let (determination, read_holds) = self.determine_timestamp(
271 ctx.session(),
272 bundle,
273 when,
274 optimizer.cluster_id(),
275 &timeline,
276 oracle_read_ts,
277 None,
278 )?;
279
280 let as_of = determination.timestamp_context.timestamp_or_default();
281
282 if let Some(id) = ctx.extra().contents() {
283 self.set_statement_execution_timestamp(id, as_of);
284 }
285 if let Some(up_to) = optimizer.up_to() {
286 if as_of == up_to {
287 ctx.session()
288 .add_notice(AdapterNotice::EqualSubscribeBounds { bound: up_to });
289 } else if as_of > up_to {
290 return Err(AdapterError::AbsurdSubscribeBounds { as_of, up_to });
291 }
292 }
293
294 self.store_transaction_read_holds(ctx.session(), read_holds);
295
296 let global_mir_plan = global_mir_plan.resolve(Antichain::from_elem(as_of));
297
298 let span = Span::current();
300 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
301 || "optimize subscribe (lir)",
302 move || {
303 span.in_scope(|| {
304 let global_lir_plan =
306 optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
307
308 let stage = SubscribeStage::Finish(SubscribeFinish {
309 validity,
310 cluster_id: optimizer.cluster_id(),
311 plan,
312 global_lir_plan,
313 dependency_ids,
314 replica_id,
315 });
316 Ok(Box::new(stage))
317 })
318 },
319 )))
320 }
321
322 #[instrument]
323 async fn subscribe_finish(
324 &mut self,
325 ctx: &mut ExecuteContext,
326 SubscribeFinish {
327 validity: _,
328 cluster_id,
329 plan:
330 plan::SubscribePlan {
331 copy_to,
332 emit_progress,
333 output,
334 ..
335 },
336 global_lir_plan,
337 dependency_ids,
338 replica_id,
339 }: SubscribeFinish,
340 ) -> Result<StageResult<Box<SubscribeStage>>, AdapterError> {
341 let sink_id = global_lir_plan.sink_id();
342
343 let (tx, rx) = mpsc::unbounded_channel();
344 let active_subscribe = ActiveSubscribe {
345 conn_id: ctx.session().conn_id().clone(),
346 session_uuid: ctx.session().uuid(),
347 channel: tx,
348 emit_progress,
349 as_of: global_lir_plan
350 .as_of()
351 .expect("set to Some in an earlier stage"),
352 arity: global_lir_plan.sink_desc().from_desc.arity(),
353 cluster_id,
354 depends_on: dependency_ids,
355 start_time: self.now(),
356 output,
357 };
358 active_subscribe.initialize();
359
360 let (df_desc, df_meta) = global_lir_plan.unapply();
361
362 self.emit_optimizer_notices(ctx.session(), &df_meta.optimizer_notices);
364
365 let write_notify_fut = self
367 .add_active_compute_sink(sink_id, ActiveComputeSink::Subscribe(active_subscribe))
368 .await;
369 let ship_dataflow_fut = self.ship_dataflow(df_desc, cluster_id, replica_id);
371
372 let ((), ()) = futures::future::join(write_notify_fut, ship_dataflow_fut).await;
375
376 let txn_read_holds = self
378 .txn_read_holds
379 .remove(ctx.session().conn_id())
380 .expect("must have previously installed read holds");
381
382 drop(txn_read_holds);
384
385 let resp = ExecuteResponse::Subscribing {
386 rx,
387 ctx_extra: std::mem::take(ctx.extra_mut()),
388 instance_id: cluster_id,
389 };
390 let resp = match copy_to {
391 None => resp,
392 Some(format) => ExecuteResponse::CopyTo {
393 format,
394 resp: Box::new(resp),
395 },
396 };
397 Ok(StageResult::Response(resp))
398 }
399}