1use anyhow::bail;
32use derivative::Derivative;
33use mz_adapter_types::dyncfgs::ENABLE_INTROSPECTION_SUBSCRIBES;
34use mz_cluster_client::ReplicaId;
35use mz_compute_client::controller::error::ERROR_TARGET_REPLICA_FAILED;
36use mz_compute_client::protocol::response::SubscribeBatch;
37use mz_controller_types::ClusterId;
38use mz_ore::collections::CollectionExt;
39use mz_ore::soft_panic_or_log;
40use mz_repr::optimize::OverrideFrom;
41use mz_repr::{Datum, GlobalId, Row};
42use mz_sql::catalog::SessionCatalog;
43use mz_sql::plan::{Params, Plan, SubscribePlan};
44use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, RoleMetadata};
45use mz_storage_client::controller::{IntrospectionType, StorageWriteOp};
46use tracing::{Span, info};
47
48use crate::coord::{
49 Coordinator, IntrospectionSubscribeFinish, IntrospectionSubscribeOptimizeMir,
50 IntrospectionSubscribeStage, IntrospectionSubscribeTimestampOptimizeLir, Message, PlanValidity,
51 StageResult, Staged,
52};
53use crate::optimize::Optimize;
54use crate::{AdapterError, ExecuteResponse, optimize};
55
56#[derive(Derivative)]
58#[derivative(Debug)]
59pub(super) struct IntrospectionSubscribe {
60 cluster_id: ClusterId,
62 replica_id: ReplicaId,
64 spec: &'static SubscribeSpec,
66 #[derivative(Debug = "ignore")]
74 deferred_write: Option<StorageWriteOp>,
75}
76
77impl IntrospectionSubscribe {
78 fn delete_write_op(&self) -> StorageWriteOp {
81 let target_replica = self.replica_id.to_string();
82 let filter = Box::new(move |row: &Row| {
83 let replica_id = row.unpack_first();
84 replica_id == Datum::String(&target_replica)
85 });
86 StorageWriteOp::Delete { filter }
87 }
88}
89
90impl Coordinator {
91 pub(super) async fn bootstrap_introspection_subscribes(&mut self) {
95 let mut cluster_replicas = Vec::new();
96 for cluster in self.catalog.clusters() {
97 for replica in cluster.replicas() {
98 cluster_replicas.push((cluster.id, replica.replica_id));
99 }
100 }
101
102 for (cluster_id, replica_id) in cluster_replicas {
103 self.install_introspection_subscribes(cluster_id, replica_id)
104 .await;
105 }
106 }
107
108 pub(super) async fn install_introspection_subscribes(
110 &mut self,
111 cluster_id: ClusterId,
112 replica_id: ReplicaId,
113 ) {
114 let dyncfgs = self.catalog().system_config().dyncfgs();
115 if !ENABLE_INTROSPECTION_SUBSCRIBES.get(dyncfgs) {
116 return;
117 }
118
119 for spec in SUBSCRIBES {
120 self.install_introspection_subscribe(cluster_id, replica_id, spec)
121 .await;
122 }
123 }
124
125 async fn install_introspection_subscribe(
126 &mut self,
127 cluster_id: ClusterId,
128 replica_id: ReplicaId,
129 spec: &'static SubscribeSpec,
130 ) {
131 let (_, id) = self.allocate_transient_id();
132 info!(
133 %id,
134 %replica_id,
135 type_ = ?spec.introspection_type,
136 "installing introspection subscribe",
137 );
138
139 let subscribe = IntrospectionSubscribe {
143 cluster_id,
144 replica_id,
145 spec,
146 deferred_write: None,
147 };
148 self.introspection_subscribes.insert(id, subscribe);
149
150 self.sequence_introspection_subscribe(id, spec, cluster_id, replica_id)
151 .await;
152 }
153
154 async fn sequence_introspection_subscribe(
155 &mut self,
156 subscribe_id: GlobalId,
157 spec: &'static SubscribeSpec,
158 cluster_id: ClusterId,
159 replica_id: ReplicaId,
160 ) {
161 let catalog = self.catalog().for_system_session();
162 let plan = spec.to_plan(&catalog).expect("valid spec");
163
164 let role_metadata = RoleMetadata::new(MZ_SYSTEM_ROLE_ID);
165 let dependencies = plan
166 .from
167 .depends_on()
168 .iter()
169 .map(|id| self.catalog().resolve_item_id(id))
170 .collect();
171 let validity = PlanValidity::new(
172 self.catalog.transient_revision(),
173 dependencies,
174 Some(cluster_id),
175 Some(replica_id),
176 role_metadata,
177 );
178
179 let stage = IntrospectionSubscribeStage::OptimizeMir(IntrospectionSubscribeOptimizeMir {
180 validity,
181 plan,
182 subscribe_id,
183 cluster_id,
184 replica_id,
185 });
186 self.sequence_staged((), Span::current(), stage).await;
187 }
188
189 fn sequence_introspection_subscribe_optimize_mir(
190 &self,
191 stage: IntrospectionSubscribeOptimizeMir,
192 ) -> Result<StageResult<Box<IntrospectionSubscribeStage>>, AdapterError> {
193 let IntrospectionSubscribeOptimizeMir {
194 mut validity,
195 plan,
196 subscribe_id,
197 cluster_id,
198 replica_id,
199 } = stage;
200
201 let compute_instance = self.instance_snapshot(cluster_id).expect("must exist");
202 let (_, view_id) = self.allocate_transient_id();
203
204 let vars = self.catalog().system_config();
205 let overrides = self.catalog.get_cluster(cluster_id).config.features();
206 let optimizer_config = optimize::OptimizerConfig::from(vars).override_from(&overrides);
207
208 let mut optimizer = optimize::subscribe::Optimizer::new(
209 self.owned_catalog(),
210 compute_instance,
211 view_id,
212 subscribe_id,
213 None,
214 plan.with_snapshot,
215 None,
216 format!("introspection-subscribe-{subscribe_id}"),
217 optimizer_config,
218 self.optimizer_metrics(),
219 );
220 let catalog = self.owned_catalog();
221
222 let span = Span::current();
223 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
224 || "optimize introspection subscribe (mir)",
225 move || {
226 span.in_scope(|| {
227 let global_mir_plan = optimizer.catch_unwind_optimize(plan.from)?;
229 let id_bundle = global_mir_plan.id_bundle(cluster_id);
231 let item_ids = id_bundle.iter().map(|id| catalog.resolve_item_id(&id));
232 validity.extend_dependencies(item_ids);
233
234 let stage = IntrospectionSubscribeStage::TimestampOptimizeLir(
235 IntrospectionSubscribeTimestampOptimizeLir {
236 validity,
237 optimizer,
238 global_mir_plan,
239 cluster_id,
240 replica_id,
241 },
242 );
243 Ok(Box::new(stage))
244 })
245 },
246 )))
247 }
248
249 fn sequence_introspection_subscribe_timestamp_optimize_lir(
250 &self,
251 stage: IntrospectionSubscribeTimestampOptimizeLir,
252 ) -> Result<StageResult<Box<IntrospectionSubscribeStage>>, AdapterError> {
253 let IntrospectionSubscribeTimestampOptimizeLir {
254 validity,
255 mut optimizer,
256 global_mir_plan,
257 cluster_id,
258 replica_id,
259 } = stage;
260
261 let id_bundle = global_mir_plan.id_bundle(cluster_id);
263 let read_holds = self.acquire_read_holds(&id_bundle);
264 let as_of = read_holds.least_valid_read();
265
266 let global_mir_plan = global_mir_plan.resolve(as_of);
267
268 let span = Span::current();
269 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
270 || "optimize introspection subscribe (lir)",
271 move || {
272 span.in_scope(|| {
273 let global_lir_plan =
275 optimizer.catch_unwind_optimize(global_mir_plan.clone())?;
276
277 let stage = IntrospectionSubscribeStage::Finish(IntrospectionSubscribeFinish {
278 validity,
279 global_lir_plan,
280 read_holds,
281 cluster_id,
282 replica_id,
283 });
284 Ok(Box::new(stage))
285 })
286 },
287 )))
288 }
289
290 async fn sequence_introspection_subscribe_finish(
291 &mut self,
292 stage: IntrospectionSubscribeFinish,
293 ) -> Result<StageResult<Box<IntrospectionSubscribeStage>>, AdapterError> {
294 let IntrospectionSubscribeFinish {
295 validity: _,
296 global_lir_plan,
297 read_holds,
298 cluster_id,
299 replica_id,
300 } = stage;
301
302 let subscribe_id = global_lir_plan.sink_id();
303
304 let response = if self.introspection_subscribes.contains_key(&subscribe_id) {
307 let (df_desc, _df_meta) = global_lir_plan.unapply();
308 self.ship_dataflow(df_desc, cluster_id, Some(replica_id))
309 .await;
310
311 Ok(StageResult::Response(
312 ExecuteResponse::CreatedIntrospectionSubscribe,
313 ))
314 } else {
315 Err(AdapterError::internal(
316 "introspection",
317 "introspection subscribe has already been dropped",
318 ))
319 };
320
321 drop(read_holds);
322 response
323 }
324
325 pub(super) fn drop_introspection_subscribes(&mut self, replica_id: ReplicaId) {
333 let to_drop: Vec<_> = self
334 .introspection_subscribes
335 .iter()
336 .filter(|(_, s)| s.replica_id == replica_id)
337 .map(|(id, _)| *id)
338 .collect();
339
340 for id in to_drop {
341 self.drop_introspection_subscribe(id);
342 }
343 }
344
345 fn drop_introspection_subscribe(&mut self, id: GlobalId) {
346 let Some(subscribe) = self.introspection_subscribes.remove(&id) else {
347 soft_panic_or_log!("attempt to remove unknown introspection subscribe (id={id})");
348 return;
349 };
350
351 info!(
352 %id,
353 replica_id = %subscribe.replica_id,
354 type_ = ?subscribe.spec.introspection_type,
355 "dropping introspection subscribe",
356 );
357
358 let _ = self
362 .controller
363 .compute
364 .drop_collections(subscribe.cluster_id, vec![id]);
365
366 self.controller.storage.update_introspection_collection(
367 subscribe.spec.introspection_type,
368 subscribe.delete_write_op(),
369 );
370 }
371
372 async fn reinstall_introspection_subscribe(&mut self, id: GlobalId) {
373 let Some(mut subscribe) = self.introspection_subscribes.remove(&id) else {
374 soft_panic_or_log!("attempt to reinstall unknown introspection subscribe (id={id})");
375 return;
376 };
377
378 let IntrospectionSubscribe {
384 cluster_id,
385 replica_id,
386 spec,
387 ..
388 } = subscribe;
389 let old_id = id;
390 let (_, new_id) = self.allocate_transient_id();
391
392 info!(
393 %old_id, %new_id, %replica_id,
394 type_ = ?subscribe.spec.introspection_type,
395 "reinstalling introspection subscribe",
396 );
397
398 if let Err(error) = self
399 .controller
400 .compute
401 .drop_collections(cluster_id, vec![old_id])
402 {
403 soft_panic_or_log!(
404 "error dropping compute collection for introspection subscribe: {error} \
405 (id={old_id}, cluster_id={cluster_id})"
406 );
407 }
408
409 subscribe.deferred_write = Some(subscribe.delete_write_op());
412
413 self.introspection_subscribes.insert(new_id, subscribe);
414 self.sequence_introspection_subscribe(new_id, spec, cluster_id, replica_id)
415 .await;
416 }
417
418 pub(super) async fn handle_introspection_subscribe_batch(
423 &mut self,
424 id: GlobalId,
425 batch: SubscribeBatch,
426 ) {
427 let Some(subscribe) = self.introspection_subscribes.get_mut(&id) else {
428 soft_panic_or_log!("updates for unknown introspection subscribe (id={id})");
429 return;
430 };
431
432 let updates = match batch.updates {
433 Ok(updates) if updates.is_empty() => return,
434 Ok(updates) => updates,
435 Err(error) if error == ERROR_TARGET_REPLICA_FAILED => {
436 self.reinstall_introspection_subscribe(id).await;
438 return;
439 }
440 Err(error) => {
441 soft_panic_or_log!(
442 "introspection subscribe produced an error: {error} \
443 (id={id}, subscribe={subscribe:?})",
444 );
445 return;
446 }
447 };
448
449 let replica_id = subscribe.replica_id.to_string();
451 let mut new_updates = Vec::with_capacity(updates.len());
452 let mut new_row = Row::default();
453 for (_time, row, diff) in updates {
454 let mut packer = new_row.packer();
455 packer.push(Datum::String(&replica_id));
456 packer.extend_by_row(&row);
457 new_updates.push((new_row.clone(), diff));
458 }
459
460 if let Some(op) = subscribe.deferred_write.take() {
463 self.controller
464 .storage
465 .update_introspection_collection(subscribe.spec.introspection_type, op);
466 }
467
468 self.controller.storage.update_introspection_collection(
469 subscribe.spec.introspection_type,
470 StorageWriteOp::Append {
471 updates: new_updates,
472 },
473 );
474 }
475}
476
477impl Staged for IntrospectionSubscribeStage {
478 type Ctx = ();
479
480 fn validity(&mut self) -> &mut PlanValidity {
481 match self {
482 Self::OptimizeMir(stage) => &mut stage.validity,
483 Self::TimestampOptimizeLir(stage) => &mut stage.validity,
484 Self::Finish(stage) => &mut stage.validity,
485 }
486 }
487
488 async fn stage(
489 self,
490 coord: &mut Coordinator,
491 _ctx: &mut (),
492 ) -> Result<StageResult<Box<Self>>, AdapterError> {
493 match self {
494 Self::OptimizeMir(stage) => coord.sequence_introspection_subscribe_optimize_mir(stage),
495 Self::TimestampOptimizeLir(stage) => {
496 coord.sequence_introspection_subscribe_timestamp_optimize_lir(stage)
497 }
498 Self::Finish(stage) => coord.sequence_introspection_subscribe_finish(stage).await,
499 }
500 }
501
502 fn message(self, _ctx: (), span: Span) -> super::Message {
503 Message::IntrospectionSubscribeStageReady { span, stage: self }
504 }
505
506 fn cancel_enabled(&self) -> bool {
507 false
508 }
509}
510
511#[derive(Debug)]
513pub(super) struct SubscribeSpec {
514 introspection_type: IntrospectionType,
517 sql: &'static str,
519}
520
521impl SubscribeSpec {
522 fn to_plan(&self, catalog: &dyn SessionCatalog) -> Result<SubscribePlan, anyhow::Error> {
523 let parsed = mz_sql::parse::parse(self.sql)?.into_element();
524 let (stmt, resolved_ids) = mz_sql::names::resolve(catalog, parsed.ast)?;
525 let plan = mz_sql::plan::plan(None, catalog, stmt, &Params::empty(), &resolved_ids)?;
526 match plan {
527 Plan::Subscribe(plan) => Ok(plan),
528 _ => bail!("unexpected plan type: {plan:?}"),
529 }
530 }
531}
532
533const SUBSCRIBES: &[SubscribeSpec] = &[
534 SubscribeSpec {
535 introspection_type: IntrospectionType::ComputeErrorCounts,
536 sql: "SUBSCRIBE (
537 SELECT export_id, sum(count)
538 FROM mz_introspection.mz_compute_error_counts_raw
539 GROUP BY export_id
540 )",
541 },
542 SubscribeSpec {
543 introspection_type: IntrospectionType::ComputeHydrationTimes,
544 sql: "SUBSCRIBE (
545 SELECT
546 export_id,
547 CASE count(*) = count(time_ns)
548 WHEN true THEN max(time_ns)
549 ELSE NULL
550 END AS time_ns
551 FROM mz_introspection.mz_compute_hydration_times_per_worker
552 WHERE export_id NOT LIKE 't%'
553 GROUP BY export_id
554 OPTIONS (AGGREGATE INPUT GROUP SIZE = 1)
555 )",
556 },
557];