1use anyhow::bail;
32use derivative::Derivative;
33use mz_adapter_types::dyncfgs::ENABLE_INTROSPECTION_SUBSCRIBES;
34use mz_cluster_client::ReplicaId;
35use mz_compute_client::controller::error::ERROR_TARGET_REPLICA_FAILED;
36use mz_compute_client::protocol::response::SubscribeBatch;
37use mz_controller_types::ClusterId;
38use mz_ore::collections::CollectionExt;
39use mz_ore::soft_panic_or_log;
40use mz_repr::optimize::OverrideFrom;
41use mz_repr::{Datum, GlobalId, Row};
42use mz_sql::catalog::SessionCatalog;
43use mz_sql::plan::{Params, Plan, SubscribePlan};
44use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, RoleMetadata};
45use mz_storage_client::controller::{IntrospectionType, StorageWriteOp};
46use tracing::{Span, info};
47
48use crate::coord::{
49 Coordinator, IntrospectionSubscribeFinish, IntrospectionSubscribeOptimizeMir,
50 IntrospectionSubscribeStage, IntrospectionSubscribeTimestampOptimizeLir, Message, PlanValidity,
51 StageResult, Staged,
52};
53use crate::optimize::Optimize;
54use crate::{AdapterError, ExecuteResponse, optimize};
55
56#[derive(Derivative)]
58#[derivative(Debug)]
59pub(super) struct IntrospectionSubscribe {
60 cluster_id: ClusterId,
62 replica_id: ReplicaId,
64 spec: &'static SubscribeSpec,
66 #[derivative(Debug = "ignore")]
74 deferred_write: Option<StorageWriteOp>,
75}
76
77impl IntrospectionSubscribe {
78 fn delete_write_op(&self) -> StorageWriteOp {
81 let target_replica = self.replica_id.to_string();
82 let filter = Box::new(move |row: &Row| {
83 let replica_id = row.unpack_first();
84 replica_id == Datum::String(&target_replica)
85 });
86 StorageWriteOp::Delete { filter }
87 }
88}
89
90impl Coordinator {
91 pub(super) async fn bootstrap_introspection_subscribes(&mut self) {
95 let mut cluster_replicas = Vec::new();
96 for cluster in self.catalog.clusters() {
97 for replica in cluster.replicas() {
98 cluster_replicas.push((cluster.id, replica.replica_id));
99 }
100 }
101
102 for (cluster_id, replica_id) in cluster_replicas {
103 self.install_introspection_subscribes(cluster_id, replica_id)
104 .await;
105 }
106 }
107
108 pub(super) async fn install_introspection_subscribes(
110 &mut self,
111 cluster_id: ClusterId,
112 replica_id: ReplicaId,
113 ) {
114 let dyncfgs = self.catalog().system_config().dyncfgs();
115 if !ENABLE_INTROSPECTION_SUBSCRIBES.get(dyncfgs) {
116 return;
117 }
118
119 for spec in SUBSCRIBES {
120 self.install_introspection_subscribe(cluster_id, replica_id, spec)
121 .await;
122 }
123 }
124
125 async fn install_introspection_subscribe(
126 &mut self,
127 cluster_id: ClusterId,
128 replica_id: ReplicaId,
129 spec: &'static SubscribeSpec,
130 ) {
131 let (_, id) = self.allocate_transient_id();
132 info!(
133 %id,
134 %replica_id,
135 type_ = ?spec.introspection_type,
136 "installing introspection subscribe",
137 );
138
139 let subscribe = IntrospectionSubscribe {
143 cluster_id,
144 replica_id,
145 spec,
146 deferred_write: None,
147 };
148 self.introspection_subscribes.insert(id, subscribe);
149
150 self.sequence_introspection_subscribe(id, spec, cluster_id, replica_id)
151 .await;
152 }
153
154 async fn sequence_introspection_subscribe(
155 &mut self,
156 subscribe_id: GlobalId,
157 spec: &'static SubscribeSpec,
158 cluster_id: ClusterId,
159 replica_id: ReplicaId,
160 ) {
161 let catalog = self.catalog().for_system_session();
162 let plan = spec.to_plan(&catalog).expect("valid spec");
163
164 let role_metadata = RoleMetadata::new(MZ_SYSTEM_ROLE_ID);
165 let dependencies = plan
166 .from
167 .depends_on()
168 .iter()
169 .map(|id| self.catalog().resolve_item_id(id))
170 .collect();
171 let validity = PlanValidity::new(
172 self.catalog.transient_revision(),
173 dependencies,
174 Some(cluster_id),
175 Some(replica_id),
176 role_metadata,
177 );
178
179 let stage = IntrospectionSubscribeStage::OptimizeMir(IntrospectionSubscribeOptimizeMir {
180 validity,
181 plan,
182 subscribe_id,
183 cluster_id,
184 replica_id,
185 });
186 self.sequence_staged((), Span::current(), stage).await;
187 }
188
189 fn sequence_introspection_subscribe_optimize_mir(
190 &self,
191 stage: IntrospectionSubscribeOptimizeMir,
192 ) -> Result<StageResult<Box<IntrospectionSubscribeStage>>, AdapterError> {
193 let IntrospectionSubscribeOptimizeMir {
194 mut validity,
195 plan,
196 subscribe_id,
197 cluster_id,
198 replica_id,
199 } = stage;
200
201 let compute_instance = self.instance_snapshot(cluster_id).expect("must exist");
202 let (_, view_id) = self.allocate_transient_id();
203
204 let vars = self.catalog().system_config();
205 let overrides = self.catalog.get_cluster(cluster_id).config.features();
206 let optimizer_config = optimize::OptimizerConfig::from(vars).override_from(&overrides);
207
208 let mut optimizer = optimize::subscribe::Optimizer::new(
209 self.owned_catalog(),
210 compute_instance,
211 view_id,
212 subscribe_id,
213 plan.with_snapshot,
214 None,
215 format!("introspection-subscribe-{subscribe_id}"),
216 optimizer_config,
217 self.optimizer_metrics(),
218 );
219 let catalog = self.owned_catalog();
220
221 let span = Span::current();
222 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
223 || "optimize introspection subscribe (mir)",
224 move || {
225 span.in_scope(|| {
226 let global_mir_plan = optimizer.catch_unwind_optimize(plan.from)?;
228 let id_bundle = global_mir_plan.id_bundle(cluster_id);
230 let item_ids = id_bundle.iter().map(|id| catalog.resolve_item_id(&id));
231 validity.extend_dependencies(item_ids);
232
233 let stage = IntrospectionSubscribeStage::TimestampOptimizeLir(
234 IntrospectionSubscribeTimestampOptimizeLir {
235 validity,
236 optimizer,
237 global_mir_plan,
238 cluster_id,
239 replica_id,
240 },
241 );
242 Ok(Box::new(stage))
243 })
244 },
245 )))
246 }
247
248 fn sequence_introspection_subscribe_timestamp_optimize_lir(
249 &self,
250 stage: IntrospectionSubscribeTimestampOptimizeLir,
251 ) -> Result<StageResult<Box<IntrospectionSubscribeStage>>, AdapterError> {
252 let IntrospectionSubscribeTimestampOptimizeLir {
253 validity,
254 mut optimizer,
255 global_mir_plan,
256 cluster_id,
257 replica_id,
258 } = stage;
259
260 let id_bundle = global_mir_plan.id_bundle(cluster_id);
262 let read_holds = self.acquire_read_holds(&id_bundle);
263 let as_of = read_holds.least_valid_read();
264
265 let global_mir_plan = global_mir_plan.resolve(as_of);
266
267 let span = Span::current();
268 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
269 || "optimize introspection subscribe (lir)",
270 move || {
271 span.in_scope(|| {
272 let global_lir_plan =
274 optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
275
276 let stage = IntrospectionSubscribeStage::Finish(IntrospectionSubscribeFinish {
277 validity,
278 global_lir_plan,
279 read_holds,
280 cluster_id,
281 replica_id,
282 });
283 Ok(Box::new(stage))
284 })
285 },
286 )))
287 }
288
289 async fn sequence_introspection_subscribe_finish(
290 &mut self,
291 stage: IntrospectionSubscribeFinish,
292 ) -> Result<StageResult<Box<IntrospectionSubscribeStage>>, AdapterError> {
293 let IntrospectionSubscribeFinish {
294 validity: _,
295 global_lir_plan,
296 read_holds,
297 cluster_id,
298 replica_id,
299 } = stage;
300
301 let subscribe_id = global_lir_plan.sink_id();
302
303 let response = if self.introspection_subscribes.contains_key(&subscribe_id) {
306 let (df_desc, _df_meta) = global_lir_plan.unapply();
307 self.ship_dataflow(df_desc, cluster_id, Some(replica_id))
308 .await;
309
310 Ok(StageResult::Response(
311 ExecuteResponse::CreatedIntrospectionSubscribe,
312 ))
313 } else {
314 Err(AdapterError::internal(
315 "introspection",
316 "introspection subscribe has already been dropped",
317 ))
318 };
319
320 drop(read_holds);
321 response
322 }
323
324 pub(super) fn drop_introspection_subscribes(&mut self, replica_id: ReplicaId) {
332 let to_drop: Vec<_> = self
333 .introspection_subscribes
334 .iter()
335 .filter(|(_, s)| s.replica_id == replica_id)
336 .map(|(id, _)| *id)
337 .collect();
338
339 for id in to_drop {
340 self.drop_introspection_subscribe(id);
341 }
342 }
343
344 fn drop_introspection_subscribe(&mut self, id: GlobalId) {
345 let Some(subscribe) = self.introspection_subscribes.remove(&id) else {
346 soft_panic_or_log!("attempt to remove unknown introspection subscribe (id={id})");
347 return;
348 };
349
350 info!(
351 %id,
352 replica_id = %subscribe.replica_id,
353 type_ = ?subscribe.spec.introspection_type,
354 "dropping introspection subscribe",
355 );
356
357 let _ = self
361 .controller
362 .compute
363 .drop_collections(subscribe.cluster_id, vec![id]);
364
365 self.controller.storage.update_introspection_collection(
366 subscribe.spec.introspection_type,
367 subscribe.delete_write_op(),
368 );
369 }
370
371 async fn reinstall_introspection_subscribe(&mut self, id: GlobalId) {
372 let Some(mut subscribe) = self.introspection_subscribes.remove(&id) else {
373 soft_panic_or_log!("attempt to reinstall unknown introspection subscribe (id={id})");
374 return;
375 };
376
377 let IntrospectionSubscribe {
383 cluster_id,
384 replica_id,
385 spec,
386 ..
387 } = subscribe;
388 let old_id = id;
389 let (_, new_id) = self.allocate_transient_id();
390
391 info!(
392 %old_id, %new_id, %replica_id,
393 type_ = ?subscribe.spec.introspection_type,
394 "reinstalling introspection subscribe",
395 );
396
397 if let Err(error) = self
398 .controller
399 .compute
400 .drop_collections(cluster_id, vec![old_id])
401 {
402 soft_panic_or_log!(
403 "error dropping compute collection for introspection subscribe: {error} \
404 (id={old_id}, cluster_id={cluster_id})"
405 );
406 }
407
408 subscribe.deferred_write = Some(subscribe.delete_write_op());
411
412 self.introspection_subscribes.insert(new_id, subscribe);
413 self.sequence_introspection_subscribe(new_id, spec, cluster_id, replica_id)
414 .await;
415 }
416
417 pub(super) async fn handle_introspection_subscribe_batch(
422 &mut self,
423 id: GlobalId,
424 batch: SubscribeBatch,
425 ) {
426 let Some(subscribe) = self.introspection_subscribes.get_mut(&id) else {
427 soft_panic_or_log!("updates for unknown introspection subscribe (id={id})");
428 return;
429 };
430
431 let updates = match batch.updates {
432 Ok(updates) if updates.is_empty() => return,
433 Ok(updates) => updates,
434 Err(error) if error == ERROR_TARGET_REPLICA_FAILED => {
435 self.reinstall_introspection_subscribe(id).await;
437 return;
438 }
439 Err(error) => {
440 soft_panic_or_log!(
441 "introspection subscribe produced an error: {error} \
442 (id={id}, subscribe={subscribe:?})",
443 );
444 return;
445 }
446 };
447
448 let replica_id = subscribe.replica_id.to_string();
450 let mut new_updates = Vec::with_capacity(updates.len());
451 let mut new_row = Row::default();
452 for (_time, row, diff) in updates {
453 let mut packer = new_row.packer();
454 packer.push(Datum::String(&replica_id));
455 packer.extend_by_row(&row);
456 new_updates.push((new_row.clone(), diff));
457 }
458
459 if let Some(op) = subscribe.deferred_write.take() {
462 self.controller
463 .storage
464 .update_introspection_collection(subscribe.spec.introspection_type, op);
465 }
466
467 self.controller.storage.update_introspection_collection(
468 subscribe.spec.introspection_type,
469 StorageWriteOp::Append {
470 updates: new_updates,
471 },
472 );
473 }
474}
475
476impl Staged for IntrospectionSubscribeStage {
477 type Ctx = ();
478
479 fn validity(&mut self) -> &mut PlanValidity {
480 match self {
481 Self::OptimizeMir(stage) => &mut stage.validity,
482 Self::TimestampOptimizeLir(stage) => &mut stage.validity,
483 Self::Finish(stage) => &mut stage.validity,
484 }
485 }
486
487 async fn stage(
488 self,
489 coord: &mut Coordinator,
490 _ctx: &mut (),
491 ) -> Result<StageResult<Box<Self>>, AdapterError> {
492 match self {
493 Self::OptimizeMir(stage) => coord.sequence_introspection_subscribe_optimize_mir(stage),
494 Self::TimestampOptimizeLir(stage) => {
495 coord.sequence_introspection_subscribe_timestamp_optimize_lir(stage)
496 }
497 Self::Finish(stage) => coord.sequence_introspection_subscribe_finish(stage).await,
498 }
499 }
500
501 fn message(self, _ctx: (), span: Span) -> super::Message {
502 Message::IntrospectionSubscribeStageReady { span, stage: self }
503 }
504
505 fn cancel_enabled(&self) -> bool {
506 false
507 }
508}
509
510#[derive(Debug)]
512pub(super) struct SubscribeSpec {
513 introspection_type: IntrospectionType,
516 sql: &'static str,
518}
519
520impl SubscribeSpec {
521 fn to_plan(&self, catalog: &dyn SessionCatalog) -> Result<SubscribePlan, anyhow::Error> {
522 let parsed = mz_sql::parse::parse(self.sql)?.into_element();
523 let (stmt, resolved_ids) = mz_sql::names::resolve(catalog, parsed.ast)?;
524 let plan = mz_sql::plan::plan(None, catalog, stmt, &Params::empty(), &resolved_ids)?;
525 match plan {
526 Plan::Subscribe(plan) => Ok(plan),
527 _ => bail!("unexpected plan type: {plan:?}"),
528 }
529 }
530}
531
532const SUBSCRIBES: &[SubscribeSpec] = &[
533 SubscribeSpec {
534 introspection_type: IntrospectionType::ComputeErrorCounts,
535 sql: "SUBSCRIBE (
536 SELECT export_id, sum(count)
537 FROM mz_introspection.mz_compute_error_counts_raw
538 GROUP BY export_id
539 )",
540 },
541 SubscribeSpec {
542 introspection_type: IntrospectionType::ComputeHydrationTimes,
543 sql: "SUBSCRIBE (
544 SELECT
545 export_id,
546 CASE count(*) = count(time_ns)
547 WHEN true THEN max(time_ns)
548 ELSE NULL
549 END AS time_ns
550 FROM mz_introspection.mz_compute_hydration_times_per_worker
551 WHERE export_id NOT LIKE 't%'
552 GROUP BY export_id
553 OPTIONS (AGGREGATE INPUT GROUP SIZE = 1)
554 )",
555 },
556 SubscribeSpec {
557 introspection_type: IntrospectionType::ComputeOperatorHydrationStatus,
558 sql: "SUBSCRIBE (
559 SELECT
560 export_id,
561 lir_id,
562 bool_and(hydrated) AS hydrated
563 FROM mz_introspection.mz_compute_operator_hydration_statuses_per_worker
564 GROUP BY export_id, lir_id
565 )",
566 },
567];