1use anyhow::bail;
32use derivative::Derivative;
33use mz_adapter_types::dyncfgs::ENABLE_INTROSPECTION_SUBSCRIBES;
34use mz_cluster_client::ReplicaId;
35use mz_compute_client::controller::error::ERROR_TARGET_REPLICA_FAILED;
36use mz_compute_client::protocol::response::SubscribeBatch;
37use mz_controller_types::ClusterId;
38use mz_ore::collections::CollectionExt;
39use mz_ore::soft_panic_or_log;
40use mz_repr::optimize::OverrideFrom;
41use mz_repr::{Datum, GlobalId, Row};
42use mz_sql::catalog::SessionCatalog;
43use mz_sql::plan::{Params, Plan, SubscribePlan};
44use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, RoleMetadata};
45use mz_storage_client::controller::{IntrospectionType, StorageWriteOp};
46use tracing::{Span, info};
47
48use crate::coord::{
49 Coordinator, IntrospectionSubscribeFinish, IntrospectionSubscribeOptimizeMir,
50 IntrospectionSubscribeStage, IntrospectionSubscribeTimestampOptimizeLir, Message, PlanValidity,
51 StageResult, Staged,
52};
53use crate::optimize::Optimize;
54use crate::{AdapterError, ExecuteResponse, optimize};
55
56#[derive(Derivative)]
58#[derivative(Debug)]
59pub(super) struct IntrospectionSubscribe {
60 cluster_id: ClusterId,
62 replica_id: ReplicaId,
64 spec: &'static SubscribeSpec,
66 #[derivative(Debug = "ignore")]
74 deferred_write: Option<StorageWriteOp>,
75}
76
77impl IntrospectionSubscribe {
78 fn delete_write_op(&self) -> StorageWriteOp {
81 let target_replica = self.replica_id.to_string();
82 let filter = Box::new(move |row: &Row| {
83 let replica_id = row.unpack_first();
84 replica_id == Datum::String(&target_replica)
85 });
86 StorageWriteOp::Delete { filter }
87 }
88}
89
90impl Coordinator {
91 pub(super) async fn bootstrap_introspection_subscribes(&mut self) {
95 let mut cluster_replicas = Vec::new();
96 for cluster in self.catalog.clusters() {
97 for replica in cluster.replicas() {
98 cluster_replicas.push((cluster.id, replica.replica_id));
99 }
100 }
101
102 for (cluster_id, replica_id) in cluster_replicas {
103 self.install_introspection_subscribes(cluster_id, replica_id)
104 .await;
105 }
106 }
107
108 pub(super) async fn install_introspection_subscribes(
110 &mut self,
111 cluster_id: ClusterId,
112 replica_id: ReplicaId,
113 ) {
114 let dyncfgs = self.catalog().system_config().dyncfgs();
115 if !ENABLE_INTROSPECTION_SUBSCRIBES.get(dyncfgs) {
116 return;
117 }
118
119 for spec in SUBSCRIBES {
120 self.install_introspection_subscribe(cluster_id, replica_id, spec)
121 .await;
122 }
123 }
124
125 async fn install_introspection_subscribe(
126 &mut self,
127 cluster_id: ClusterId,
128 replica_id: ReplicaId,
129 spec: &'static SubscribeSpec,
130 ) {
131 let (_, id) = self.allocate_transient_id();
132 info!(
133 %id,
134 %replica_id,
135 type_ = ?spec.introspection_type,
136 "installing introspection subscribe",
137 );
138
139 let subscribe = IntrospectionSubscribe {
143 cluster_id,
144 replica_id,
145 spec,
146 deferred_write: None,
147 };
148 self.introspection_subscribes.insert(id, subscribe);
149
150 self.sequence_introspection_subscribe(id, spec, cluster_id, replica_id)
151 .await;
152 }
153
154 async fn sequence_introspection_subscribe(
155 &mut self,
156 subscribe_id: GlobalId,
157 spec: &'static SubscribeSpec,
158 cluster_id: ClusterId,
159 replica_id: ReplicaId,
160 ) {
161 let catalog = self.catalog().for_system_session();
162 let plan = spec.to_plan(&catalog).expect("valid spec");
163
164 let role_metadata = RoleMetadata::new(MZ_SYSTEM_ROLE_ID);
165 let dependencies = plan
166 .from
167 .depends_on()
168 .iter()
169 .map(|id| self.catalog().resolve_item_id(id))
170 .collect();
171 let validity = PlanValidity::new(
172 self.catalog.transient_revision(),
173 dependencies,
174 Some(cluster_id),
175 Some(replica_id),
176 role_metadata,
177 );
178
179 let stage = IntrospectionSubscribeStage::OptimizeMir(IntrospectionSubscribeOptimizeMir {
180 validity,
181 plan,
182 subscribe_id,
183 cluster_id,
184 replica_id,
185 });
186 self.sequence_staged((), Span::current(), stage).await;
187 }
188
189 fn sequence_introspection_subscribe_optimize_mir(
190 &self,
191 stage: IntrospectionSubscribeOptimizeMir,
192 ) -> Result<StageResult<Box<IntrospectionSubscribeStage>>, AdapterError> {
193 let IntrospectionSubscribeOptimizeMir {
194 mut validity,
195 plan,
196 subscribe_id,
197 cluster_id,
198 replica_id,
199 } = stage;
200
201 let compute_instance = self.instance_snapshot(cluster_id).expect("must exist");
202 let (_, view_id) = self.allocate_transient_id();
203
204 let vars = self.catalog().system_config();
205 let overrides = self.catalog.get_cluster(cluster_id).config.features();
206 let optimizer_config = optimize::OptimizerConfig::from(vars)
207 .override_from(&overrides)
208 .override_from(&self.cluster_scoped_optimizer_overrides(cluster_id));
209
210 let mut optimizer = optimize::subscribe::Optimizer::new(
211 self.owned_catalog(),
212 compute_instance,
213 view_id,
214 subscribe_id,
215 plan.with_snapshot,
216 None,
217 format!("introspection-subscribe-{subscribe_id}"),
218 optimizer_config,
219 self.optimizer_metrics(),
220 );
221 let catalog = self.owned_catalog();
222
223 let span = Span::current();
224 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
225 || "optimize introspection subscribe (mir)",
226 move || {
227 span.in_scope(|| {
228 let global_mir_plan = optimizer.catch_unwind_optimize(plan)?;
230 let id_bundle = global_mir_plan.id_bundle(cluster_id);
232 let item_ids = id_bundle.iter().map(|id| catalog.resolve_item_id(&id));
233 validity.extend_dependencies(item_ids);
234
235 let stage = IntrospectionSubscribeStage::TimestampOptimizeLir(
236 IntrospectionSubscribeTimestampOptimizeLir {
237 validity,
238 optimizer,
239 global_mir_plan,
240 cluster_id,
241 replica_id,
242 },
243 );
244 Ok(Box::new(stage))
245 })
246 },
247 )))
248 }
249
250 fn sequence_introspection_subscribe_timestamp_optimize_lir(
251 &self,
252 stage: IntrospectionSubscribeTimestampOptimizeLir,
253 ) -> Result<StageResult<Box<IntrospectionSubscribeStage>>, AdapterError> {
254 let IntrospectionSubscribeTimestampOptimizeLir {
255 validity,
256 mut optimizer,
257 global_mir_plan,
258 cluster_id,
259 replica_id,
260 } = stage;
261
262 let id_bundle = global_mir_plan.id_bundle(cluster_id);
264 let read_holds = self.acquire_read_holds(&id_bundle);
265 let as_of = read_holds.least_valid_read();
266
267 let global_mir_plan = global_mir_plan.resolve(as_of);
268
269 let span = Span::current();
270 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
271 || "optimize introspection subscribe (lir)",
272 move || {
273 span.in_scope(|| {
274 let global_lir_plan =
276 optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
277
278 let stage = IntrospectionSubscribeStage::Finish(IntrospectionSubscribeFinish {
279 validity,
280 global_lir_plan,
281 read_holds,
282 cluster_id,
283 replica_id,
284 });
285 Ok(Box::new(stage))
286 })
287 },
288 )))
289 }
290
291 async fn sequence_introspection_subscribe_finish(
292 &mut self,
293 stage: IntrospectionSubscribeFinish,
294 ) -> Result<StageResult<Box<IntrospectionSubscribeStage>>, AdapterError> {
295 let IntrospectionSubscribeFinish {
296 validity: _,
297 global_lir_plan,
298 read_holds,
299 cluster_id,
300 replica_id,
301 } = stage;
302
303 let subscribe_id = global_lir_plan.sink_id();
304
305 let response = if self.introspection_subscribes.contains_key(&subscribe_id) {
308 let (df_desc, _df_meta) = global_lir_plan.unapply();
309 self.ship_dataflow(df_desc, cluster_id, Some(replica_id))
310 .await;
311
312 Ok(StageResult::Response(
313 ExecuteResponse::CreatedIntrospectionSubscribe,
314 ))
315 } else {
316 Err(AdapterError::internal(
317 "introspection",
318 "introspection subscribe has already been dropped",
319 ))
320 };
321
322 drop(read_holds);
323 response
324 }
325
326 pub(super) fn drop_introspection_subscribes(&mut self, replica_id: ReplicaId) {
334 let to_drop: Vec<_> = self
335 .introspection_subscribes
336 .iter()
337 .filter(|(_, s)| s.replica_id == replica_id)
338 .map(|(id, _)| *id)
339 .collect();
340
341 for id in to_drop {
342 self.drop_introspection_subscribe(id);
343 }
344 }
345
346 fn drop_introspection_subscribe(&mut self, id: GlobalId) {
347 let Some(subscribe) = self.introspection_subscribes.remove(&id) else {
348 soft_panic_or_log!("attempt to remove unknown introspection subscribe (id={id})");
349 return;
350 };
351
352 info!(
353 %id,
354 replica_id = %subscribe.replica_id,
355 type_ = ?subscribe.spec.introspection_type,
356 "dropping introspection subscribe",
357 );
358
359 let _ = self
363 .controller
364 .compute
365 .drop_collections(subscribe.cluster_id, vec![id]);
366
367 self.controller.storage.update_introspection_collection(
368 subscribe.spec.introspection_type,
369 subscribe.delete_write_op(),
370 );
371 }
372
373 async fn reinstall_introspection_subscribe(&mut self, id: GlobalId) {
374 let Some(mut subscribe) = self.introspection_subscribes.remove(&id) else {
375 soft_panic_or_log!("attempt to reinstall unknown introspection subscribe (id={id})");
376 return;
377 };
378
379 let IntrospectionSubscribe {
385 cluster_id,
386 replica_id,
387 spec,
388 ..
389 } = subscribe;
390 let old_id = id;
391 let (_, new_id) = self.allocate_transient_id();
392
393 info!(
394 %old_id, %new_id, %replica_id,
395 type_ = ?subscribe.spec.introspection_type,
396 "reinstalling introspection subscribe",
397 );
398
399 if let Err(error) = self
400 .controller
401 .compute
402 .drop_collections(cluster_id, vec![old_id])
403 {
404 soft_panic_or_log!(
405 "error dropping compute collection for introspection subscribe: {error} \
406 (id={old_id}, cluster_id={cluster_id})"
407 );
408 }
409
410 subscribe.deferred_write = Some(subscribe.delete_write_op());
413
414 self.introspection_subscribes.insert(new_id, subscribe);
415 self.sequence_introspection_subscribe(new_id, spec, cluster_id, replica_id)
416 .await;
417 }
418
419 pub(super) async fn handle_introspection_subscribe_batch(
424 &mut self,
425 id: GlobalId,
426 batch: SubscribeBatch,
427 ) {
428 let Some(subscribe) = self.introspection_subscribes.get_mut(&id) else {
429 soft_panic_or_log!("updates for unknown introspection subscribe (id={id})");
430 return;
431 };
432
433 let updates = match batch.updates {
434 Ok(updates) if updates.is_empty() => return,
435 Ok(updates) => updates,
436 Err(error) if error == ERROR_TARGET_REPLICA_FAILED => {
437 self.reinstall_introspection_subscribe(id).await;
439 return;
440 }
441 Err(error) => {
442 soft_panic_or_log!(
443 "introspection subscribe produced an error: {error} \
444 (id={id}, subscribe={subscribe:?})",
445 );
446 return;
447 }
448 };
449
450 let replica_id = subscribe.replica_id.to_string();
452 let mut new_updates = Vec::with_capacity(updates.len());
453 let mut new_row = Row::default();
454 for collection in updates {
455 for (row, _time, diff) in collection.iter() {
456 let mut packer = new_row.packer();
457 packer.push(Datum::String(&replica_id));
458 packer.extend_by_row_ref(row);
459 new_updates.push((new_row.clone(), diff));
460 }
461 }
462
463 if let Some(op) = subscribe.deferred_write.take() {
466 self.controller
467 .storage
468 .update_introspection_collection(subscribe.spec.introspection_type, op);
469 }
470
471 self.controller.storage.update_introspection_collection(
472 subscribe.spec.introspection_type,
473 StorageWriteOp::Append {
474 updates: new_updates,
475 },
476 );
477 }
478}
479
480impl Staged for IntrospectionSubscribeStage {
481 type Ctx = ();
482
483 fn validity(&mut self) -> &mut PlanValidity {
484 match self {
485 Self::OptimizeMir(stage) => &mut stage.validity,
486 Self::TimestampOptimizeLir(stage) => &mut stage.validity,
487 Self::Finish(stage) => &mut stage.validity,
488 }
489 }
490
491 async fn stage(
492 self,
493 coord: &mut Coordinator,
494 _ctx: &mut (),
495 ) -> Result<StageResult<Box<Self>>, AdapterError> {
496 match self {
497 Self::OptimizeMir(stage) => coord.sequence_introspection_subscribe_optimize_mir(stage),
498 Self::TimestampOptimizeLir(stage) => {
499 coord.sequence_introspection_subscribe_timestamp_optimize_lir(stage)
500 }
501 Self::Finish(stage) => coord.sequence_introspection_subscribe_finish(stage).await,
502 }
503 }
504
505 fn message(self, _ctx: (), span: Span) -> super::Message {
506 Message::IntrospectionSubscribeStageReady { span, stage: self }
507 }
508
509 fn cancel_enabled(&self) -> bool {
510 false
511 }
512}
513
514#[derive(Debug)]
516pub(super) struct SubscribeSpec {
517 introspection_type: IntrospectionType,
520 sql: &'static str,
522}
523
524impl SubscribeSpec {
525 fn to_plan(&self, catalog: &dyn SessionCatalog) -> Result<SubscribePlan, anyhow::Error> {
526 let parsed = mz_sql::parse::parse(self.sql)?.into_element();
527 let (stmt, resolved_ids) = mz_sql::names::resolve(catalog, parsed.ast)?;
528 let (plan, _sql_impl_ids) =
529 mz_sql::plan::plan(None, catalog, stmt, &Params::empty(), &resolved_ids)?;
530 match plan {
531 Plan::Subscribe(plan) => Ok(plan),
532 _ => bail!("unexpected plan type: {plan:?}"),
533 }
534 }
535}
536
537const SUBSCRIBES: &[SubscribeSpec] = &[
538 SubscribeSpec {
539 introspection_type: IntrospectionType::ComputeErrorCounts,
540 sql: "SUBSCRIBE (
541 SELECT export_id, sum(count)
542 FROM mz_introspection.mz_compute_error_counts_raw
543 GROUP BY export_id
544 )",
545 },
546 SubscribeSpec {
547 introspection_type: IntrospectionType::ComputeHydrationTimes,
548 sql: "SUBSCRIBE (
549 SELECT
550 export_id,
551 CASE count(*) = count(time_ns)
552 WHEN true THEN max(time_ns)
553 ELSE NULL
554 END AS time_ns
555 FROM mz_introspection.mz_compute_hydration_times_per_worker
556 WHERE export_id NOT LIKE 't%'
557 GROUP BY export_id
558 OPTIONS (AGGREGATE INPUT GROUP SIZE = 1)
559 )",
560 },
561 SubscribeSpec {
562 introspection_type: IntrospectionType::ComputeOperatorHydrationStatus,
563 sql: "SUBSCRIBE (
564 SELECT
565 export_id,
566 lir_id,
567 bool_and(hydrated) AS hydrated
568 FROM mz_introspection.mz_compute_operator_hydration_statuses_per_worker
569 GROUP BY export_id, lir_id
570 )",
571 },
572 SubscribeSpec {
596 introspection_type: IntrospectionType::ComputeObjectArrangementSizes,
597 sql: "SUBSCRIBE (
598 SELECT
599 ce.export_id AS object_id,
600 ((COUNT(*) + 5242880) / 10485760 * 10485760)::int8 AS size
601 FROM mz_introspection.mz_compute_exports AS ce
602 JOIN (
603 SELECT addrs.address[1] AS dataflow_id, addrs.id AS operator_id
604 FROM mz_introspection.mz_dataflow_addresses addrs
605 ) AS od ON od.dataflow_id = ce.dataflow_id
606 JOIN (
607 SELECT operator_id FROM mz_introspection.mz_arrangement_heap_size_raw
608 UNION ALL
609 SELECT operator_id FROM mz_introspection.mz_arrangement_batcher_size_raw
610 ) AS rs ON rs.operator_id = od.operator_id
611 WHERE ce.export_id NOT LIKE 't%'
612 GROUP BY ce.export_id
613 HAVING COUNT(*) >= 10485760
614 )",
615 },
616];