1use std::time::{Duration, Instant};
25
26use mz_adapter_types::dyncfgs::CATALOG_INFO_METRICS_RECONCILE_INTERVAL;
27use mz_catalog::memory::objects::{
28 CatalogItem, Cluster, ClusterReplica, ClusterVariant, DataSourceDesc,
29};
30use mz_controller::clusters::ReplicaLocation;
31use mz_ore::metric;
32use mz_ore::metrics::{
33 DeleteOnDropGauge, Histogram, MetricTag, MetricVisibility, MetricsRegistry, UIntGaugeVec,
34};
35use mz_ore::stats::histogram_seconds_buckets;
36use mz_ore::task;
37use mz_ore::tracing::OpenTelemetryContext;
38use mz_repr::CatalogItemId;
39use mz_sql::names::{FullItemName, RawDatabaseSpecifier};
40use prometheus::core::AtomicU64;
41use tokio::sync::oneshot;
42use tracing::{debug, warn};
43
44use crate::catalog::{Catalog, catalog_type_to_audit_object_type};
45use crate::command::{CatalogSnapshot, Command};
46use crate::coord::{Coordinator, Message};
47
48const FALLBACK_RECONCILE_INTERVAL: Duration = Duration::from_secs(30);
52
53const RECONCILE_WARN_THRESHOLD: Duration = Duration::from_secs(5);
55
56type InfoGauge = DeleteOnDropGauge<AtomicU64, Vec<String>>;
57
58#[derive(Debug)]
64pub(crate) struct CatalogInfoMetrics {
65 object_info: UIntGaugeVec,
66 cluster_info: UIntGaugeVec,
67 replica_info: UIntGaugeVec,
68 source_info: UIntGaugeVec,
69 sink_info: UIntGaugeVec,
70 series: Vec<InfoGauge>,
74 last_revision: Option<u64>,
77 reconcile_seconds: Histogram,
79}
80
81impl CatalogInfoMetrics {
82 pub fn new(registry: &MetricsRegistry) -> Self {
83 Self {
84 object_info: registry.register(metric!(
85 name: "mz_object_info",
86 help: "Maps catalog object IDs to the object's name, schema, database, and \
87 type. Constant 1.",
88 var_labels: ["object_id", "global_id", "name", "schema_name", "database_name", "type"],
89 visibility: MetricVisibility::Public,
90 tags: [MetricTag::Environment],
91 )),
92 cluster_info: registry.register(metric!(
93 name: "mz_cluster_info",
94 help: "Maps cluster IDs to the cluster's name and size. Constant 1.",
95 var_labels: ["cluster_id", "name", "size"],
96 visibility: MetricVisibility::Public,
97 tags: [MetricTag::Compute],
98 )),
99 replica_info: registry.register(metric!(
100 name: "mz_replica_info",
101 help: "Maps cluster replica IDs to the replica's name and size. Constant 1.",
102 var_labels: ["replica_id", "cluster_id", "name", "size"],
103 visibility: MetricVisibility::Public,
104 tags: [MetricTag::Compute],
105 )),
106 source_info: registry.register(metric!(
107 name: "mz_source_info",
108 help: "Maps user source IDs to the source's type, envelope type, and \
109 cluster. Constant 1.",
110 var_labels: ["source_id", "type", "envelope_type", "cluster_id"],
111 visibility: MetricVisibility::Public,
112 tags: [MetricTag::Source],
113 )),
114 sink_info: registry.register(metric!(
115 name: "mz_sink_info",
116 help: "Maps user sink IDs to the sink's type, envelope type, and \
117 cluster. Constant 1.",
118 var_labels: ["sink_id", "type", "envelope_type", "cluster_id"],
119 visibility: MetricVisibility::Public,
120 tags: [MetricTag::Sink],
121 )),
122 series: Vec::new(),
123 last_revision: None,
124 reconcile_seconds: registry.register(metric!(
125 name: "mz_catalog_info_metrics_reconcile_seconds",
126 help: "Time taken to rebuild the catalog info metrics from a catalog snapshot.",
127 buckets: histogram_seconds_buckets(0.000_128, 8.0),
128 )),
129 }
130 }
131
132 pub fn reconcile(&mut self, catalog: &Catalog) {
135 if self.last_revision != Some(catalog.transient_revision()) {
136 debug!(
137 revision = catalog.transient_revision(),
138 last_revision = self.last_revision,
139 "reconciling catalog info metrics"
140 );
141 let start = Instant::now();
142 self.populate(catalog);
143 let elapsed = start.elapsed();
144 self.reconcile_seconds.observe(elapsed.as_secs_f64());
145 if elapsed > RECONCILE_WARN_THRESHOLD {
146 warn!(
147 ?elapsed,
148 series = self.series.len(),
149 "catalog info metrics reconcile was slow"
150 );
151 }
152 self.last_revision = Some(catalog.transient_revision());
153 }
154 }
155
156 fn populate(&mut self, catalog: &Catalog) {
158 self.series.clear();
162
163 for entry in catalog.entries() {
164 let full_name = catalog.resolve_full_name(entry.name(), entry.conn_id());
165 self.insert_item(entry.id(), entry.item(), &full_name);
166 self.insert_source_or_sink(entry.id(), entry.item());
167 }
168 for cluster in catalog.clusters() {
169 self.insert_cluster(cluster);
170 for replica in cluster.replicas() {
171 self.insert_replica(replica);
172 }
173 }
174 }
175
176 fn insert_item(&mut self, id: CatalogItemId, item: &CatalogItem, full_name: &FullItemName) {
177 match id {
178 CatalogItemId::IntrospectionSourceIndex(_) | CatalogItemId::Transient(_) => return,
181 CatalogItemId::System(_) | CatalogItemId::User(_) => (),
182 }
183
184 let database_name = match &full_name.database {
186 RawDatabaseSpecifier::Name(name) => name.clone(),
187 RawDatabaseSpecifier::Ambient => String::new(),
188 };
189 let item_type = catalog_type_to_audit_object_type(item.typ()).to_string();
190 for global_id in item.global_ids() {
191 let series = new_series(
192 &self.object_info,
193 vec![
194 id.to_string(),
195 global_id.to_string(),
196 full_name.item.clone(),
197 full_name.schema.clone(),
198 database_name.clone(),
199 item_type.clone(),
200 ],
201 );
202 self.series.push(series);
203 }
204 }
205
206 fn insert_source_or_sink(&mut self, id: CatalogItemId, item: &CatalogItem) {
209 let (info_vec, object_type, envelope_type) = match item {
210 CatalogItem::Source(source) => {
211 match &source.data_source {
212 DataSourceDesc::Ingestion { .. }
213 | DataSourceDesc::OldSyntaxIngestion { .. }
214 | DataSourceDesc::Webhook { .. } => (),
215 DataSourceDesc::IngestionExport { .. }
216 | DataSourceDesc::Progress
217 | DataSourceDesc::Introspection(_)
218 | DataSourceDesc::Catalog => return,
219 }
220 (
221 &self.source_info,
222 source.source_type(),
223 source.data_source.envelope(),
224 )
225 }
226 CatalogItem::Sink(sink) => (&self.sink_info, sink.sink_type(), sink.envelope()),
227 _ => return,
228 };
229
230 let series = new_series(
231 info_vec,
232 vec![
233 id.to_string(),
234 object_type.to_string(),
235 envelope_type.map(|s| s.to_string()).unwrap_or_default(),
236 item.cluster_id()
237 .map(|id| id.to_string())
238 .unwrap_or_default(),
239 ],
240 );
241 self.series.push(series);
242 }
243
244 fn insert_cluster(&mut self, cluster: &Cluster) {
245 let size = match &cluster.config.variant {
246 ClusterVariant::Managed(managed) => managed.size.clone(),
247 ClusterVariant::Unmanaged => String::new(),
248 };
249 let series = new_series(
250 &self.cluster_info,
251 vec![cluster.id.to_string(), cluster.name.clone(), size],
252 );
253 self.series.push(series);
254 }
255
256 fn insert_replica(&mut self, replica: &ClusterReplica) {
257 let size = match &replica.config.location {
258 ReplicaLocation::Managed(managed) => managed.size.clone(),
259 ReplicaLocation::Unmanaged(_) => String::new(),
260 };
261 let series = new_series(
262 &self.replica_info,
263 vec![
264 replica.replica_id.to_string(),
265 replica.cluster_id.to_string(),
266 replica.name.clone(),
267 size,
268 ],
269 );
270 self.series.push(series);
271 }
272}
273
274impl Coordinator {
275 pub(crate) fn spawn_catalog_info_metrics_task(&self) {
278 let internal_cmd_tx = self.internal_cmd_tx.clone();
279 let mut metrics = CatalogInfoMetrics::new(&self.catalog_info_metrics_registry);
280 task::spawn(|| "catalog_info_metrics", async move {
281 loop {
282 let (tx, rx) = oneshot::channel();
283 let send = internal_cmd_tx.send(Message::Command(
284 OpenTelemetryContext::obtain(),
285 Command::CatalogSnapshot { tx },
286 ));
287 if send.is_err() {
289 break;
290 }
291 let Ok(CatalogSnapshot { catalog }) = rx.await else {
292 break;
293 };
294
295 let interval =
298 CATALOG_INFO_METRICS_RECONCILE_INTERVAL.get(catalog.system_config().dyncfgs());
299 if !interval.is_zero() {
300 metrics.reconcile(&catalog);
301 }
302
303 let sleep = if interval.is_zero() {
306 FALLBACK_RECONCILE_INTERVAL
307 } else {
308 interval
309 };
310 tokio::time::sleep(sleep).await;
311 }
312 });
313 }
314}
315
316fn new_series(vec: &UIntGaugeVec, labels: Vec<String>) -> InfoGauge {
317 let gauge = vec.get_delete_on_drop_metric(labels);
318 gauge.set(1);
319 gauge
320}
321#[cfg(test)]
322mod tests {
323 use std::collections::{BTreeMap, BTreeSet};
324
325 use mz_catalog::memory::objects::{
326 ClusterConfig, ClusterVariantManaged, Sink, Source, Table, TableDataSource,
327 };
328 use mz_compute_types::config::ComputeReplicaConfig;
329 use mz_controller::clusters::{
330 ManagedReplicaLocation, ReplicaAllocation, ReplicaConfig, UnmanagedReplicaLocation,
331 };
332 use mz_controller_types::{ClusterId, ReplicaId};
333 use mz_repr::adt::mz_acl_item::PrivilegeMap;
334 use mz_repr::role_id::RoleId;
335 use mz_repr::{GlobalId, RelationDesc, RelationVersion, SqlScalarType, VersionedRelationDesc};
336 use mz_sql::names::ResolvedIds;
337 use mz_sql::plan::{WebhookBodyFormat, WebhookHeaders};
338 use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
339 use mz_storage_types::sinks::{
340 KafkaIdStyle, KafkaSinkCompressionType, KafkaSinkConnection, KafkaSinkFormat,
341 KafkaSinkFormatType, SinkEnvelope, StorageSinkConnection,
342 };
343 use mz_storage_types::sources::Timeline;
344
345 use super::*;
346 use crate::catalog::Op;
347
348 fn series(registry: &MetricsRegistry, name: &str) -> Vec<BTreeMap<String, String>> {
350 registry
351 .gather()
352 .iter()
353 .filter(|family| family.name() == name)
354 .flat_map(|family| family.get_metric())
355 .map(|metric| {
356 assert_eq!(metric.get_gauge().value(), 1.0, "info series must be 1");
358 metric
359 .get_label()
360 .iter()
361 .map(|label| (label.name().to_string(), label.value().to_string()))
362 .collect()
363 })
364 .collect()
365 }
366
367 fn labels(pairs: &[(&str, &str)]) -> BTreeMap<String, String> {
368 pairs
369 .iter()
370 .map(|(k, v)| (k.to_string(), v.to_string()))
371 .collect()
372 }
373
374 fn test_table(gid: GlobalId) -> CatalogItem {
375 CatalogItem::Table(Table {
376 create_sql: None,
377 desc: VersionedRelationDesc::new(
378 RelationDesc::builder()
379 .with_column("a", SqlScalarType::String.nullable(false))
380 .finish(),
381 ),
382 collections: BTreeMap::from([(RelationVersion::root(), gid)]),
383 conn_id: None,
384 resolved_ids: ResolvedIds::empty(),
385 custom_logical_compaction_window: None,
386 is_retained_metrics_object: false,
387 data_source: TableDataSource::TableWrites { defaults: vec![] },
388 })
389 }
390
391 fn test_webhook_source(gid: u64, cluster_id: ClusterId) -> CatalogItem {
392 CatalogItem::Source(Source {
393 create_sql: None,
394 global_id: GlobalId::User(gid),
395 data_source: DataSourceDesc::Webhook {
396 validate_using: None,
397 body_format: WebhookBodyFormat::Json { array: false },
398 headers: WebhookHeaders::default(),
399 cluster_id,
400 },
401 desc: RelationDesc::builder()
402 .with_column("a", SqlScalarType::String.nullable(false))
403 .finish(),
404 timeline: Timeline::EpochMilliseconds,
405 resolved_ids: ResolvedIds::empty(),
406 custom_logical_compaction_window: None,
407 is_retained_metrics_object: false,
408 })
409 }
410
411 fn test_kafka_sink(gid: u64, cluster_id: ClusterId) -> CatalogItem {
412 CatalogItem::Sink(Sink {
413 create_sql: "CREATE SINK s FROM t INTO KAFKA CONNECTION c (TOPIC 'topic')".to_string(),
414 global_id: GlobalId::User(gid),
415 from: GlobalId::User(1),
416 connection: StorageSinkConnection::Kafka(KafkaSinkConnection {
417 connection_id: CatalogItemId::User(2),
418 connection: CatalogItemId::User(2),
419 format: KafkaSinkFormat {
420 key_format: None,
421 value_format: KafkaSinkFormatType::Json,
422 },
423 relation_key_indices: None,
424 key_desc_and_indices: None,
425 headers_index: None,
426 value_desc: RelationDesc::builder()
427 .with_column("a", SqlScalarType::String.nullable(false))
428 .finish(),
429 partition_by: None,
430 topic: "topic".to_string(),
431 topic_options: Default::default(),
432 compression_type: KafkaSinkCompressionType::None,
433 progress_group_id: KafkaIdStyle::Legacy,
434 transactional_id: KafkaIdStyle::Legacy,
435 topic_metadata_refresh_interval: Duration::from_secs(60),
436 }),
437 envelope: SinkEnvelope::Upsert,
438 with_snapshot: true,
439 version: 0,
440 resolved_ids: ResolvedIds::empty(),
441 cluster_id,
442 commit_interval: None,
443 })
444 }
445
446 fn test_cluster_config(size: &str) -> ClusterConfig {
447 ClusterConfig {
448 variant: ClusterVariant::Managed(ClusterVariantManaged {
449 size: size.to_string(),
450 availability_zones: Vec::new(),
451 logging: Default::default(),
452 replication_factor: 1,
453 optimizer_feature_overrides: Default::default(),
454 schedule: Default::default(),
455 auto_scaling_strategy: None,
456 reconfiguration: None,
457 burst: None,
458 }),
459 workload_class: None,
460 }
461 }
462
463 fn test_cluster(id: ClusterId, name: &str, size: &str) -> Cluster {
464 Cluster {
465 name: name.to_string(),
466 id,
467 config: test_cluster_config(size),
468 log_indexes: BTreeMap::new(),
469 bound_objects: BTreeSet::new(),
470 replica_id_by_name_: BTreeMap::new(),
471 replicas_by_id_: BTreeMap::new(),
472 owner_id: RoleId::User(1),
473 privileges: PrivilegeMap::default(),
474 }
475 }
476
477 fn test_replica(cluster_id: ClusterId, replica_id: ReplicaId, name: &str) -> ClusterReplica {
478 ClusterReplica {
479 name: name.to_string(),
480 cluster_id,
481 replica_id,
482 config: ReplicaConfig {
483 location: ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
484 storagectl_addrs: Vec::new(),
485 computectl_addrs: Vec::new(),
486 }),
487 compute: ComputeReplicaConfig {
488 logging: Default::default(),
489 },
490 },
491 owner_id: RoleId::User(1),
492 }
493 }
494
495 fn full_name(database: Option<&str>, schema: &str, item: &str) -> FullItemName {
496 FullItemName {
497 database: match database {
498 Some(database) => RawDatabaseSpecifier::Name(database.to_string()),
499 None => RawDatabaseSpecifier::Ambient,
500 },
501 schema: schema.to_string(),
502 item: item.to_string(),
503 }
504 }
505
506 #[mz_ore::test]
507 fn item_insertion_creates_object_info_series() {
508 let registry = MetricsRegistry::new();
509 let mut metrics = CatalogInfoMetrics::new(®istry);
510
511 metrics.insert_item(
512 CatalogItemId::User(1),
513 &test_table(GlobalId::User(1)),
514 &full_name(Some("materialize"), "public", "t"),
515 );
516
517 assert_eq!(
518 series(®istry, "mz_object_info"),
519 vec![labels(&[
520 ("object_id", "u1"),
521 ("global_id", "u1"),
522 ("name", "t"),
523 ("schema_name", "public"),
524 ("database_name", "materialize"),
525 ("type", "table"),
526 ])]
527 );
528 }
529
530 #[mz_ore::test]
531 fn system_items_are_reported() {
532 let registry = MetricsRegistry::new();
533 let mut metrics = CatalogInfoMetrics::new(®istry);
534
535 metrics.insert_item(
536 CatalogItemId::System(456),
537 &test_table(GlobalId::System(456)),
538 &full_name(None, "mz_catalog", "mz_test"),
540 );
541
542 assert_eq!(
543 series(®istry, "mz_object_info"),
544 vec![labels(&[
545 ("object_id", "s456"),
546 ("global_id", "s456"),
547 ("name", "mz_test"),
548 ("schema_name", "mz_catalog"),
549 ("database_name", ""),
550 ("type", "table"),
551 ])]
552 );
553 }
554
555 #[mz_ore::test]
556 fn introspection_indexes_and_temporary_items_are_not_reported() {
557 let registry = MetricsRegistry::new();
558 let mut metrics = CatalogInfoMetrics::new(®istry);
559
560 metrics.insert_item(
561 CatalogItemId::IntrospectionSourceIndex(5),
562 &test_table(GlobalId::IntrospectionSourceIndex(5)),
563 &full_name(None, "mz_introspection", "mz_test"),
564 );
565 metrics.insert_item(
566 CatalogItemId::Transient(6),
567 &test_table(GlobalId::Transient(6)),
568 &full_name(None, "mz_temp", "t"),
569 );
570
571 assert_eq!(series(®istry, "mz_object_info"), Vec::new());
572 }
573
574 #[mz_ore::test]
575 fn user_sources_get_source_info_series() {
576 let registry = MetricsRegistry::new();
577 let mut metrics = CatalogInfoMetrics::new(®istry);
578
579 let cluster_id = ClusterId::user(9).expect("valid id");
580 metrics.insert_source_or_sink(CatalogItemId::User(4), &test_webhook_source(4, cluster_id));
581
582 assert_eq!(
583 series(®istry, "mz_source_info"),
584 vec![labels(&[
585 ("source_id", "u4"),
586 ("type", "webhook"),
587 ("envelope_type", ""),
589 ("cluster_id", "u9"),
590 ])]
591 );
592 }
593
594 #[mz_ore::test]
595 fn user_sinks_get_sink_info_series() {
596 let registry = MetricsRegistry::new();
597 let mut metrics = CatalogInfoMetrics::new(®istry);
598
599 let cluster_id = ClusterId::user(9).expect("valid id");
600 metrics.insert_source_or_sink(CatalogItemId::User(5), &test_kafka_sink(5, cluster_id));
601
602 assert_eq!(
603 series(®istry, "mz_sink_info"),
604 vec![labels(&[
605 ("sink_id", "u5"),
606 ("type", "kafka"),
607 ("envelope_type", "upsert"),
608 ("cluster_id", "u9"),
609 ])]
610 );
611 }
612
613 #[mz_ore::test]
614 fn cluster_and_replica_insertions_create_info_series() {
615 let registry = MetricsRegistry::new();
616 let mut metrics = CatalogInfoMetrics::new(®istry);
617
618 let cluster_id = ClusterId::user(7).expect("valid id");
619 let replica_id = ReplicaId::User(2);
620 metrics.insert_cluster(&test_cluster(cluster_id, "prod", "123cc"));
621 metrics.insert_replica(&test_replica(cluster_id, replica_id, "r1"));
622
623 assert_eq!(
624 series(®istry, "mz_cluster_info"),
625 vec![labels(&[
626 ("cluster_id", "u7"),
627 ("name", "prod"),
628 ("size", "123cc"),
629 ])]
630 );
631 assert_eq!(
632 series(®istry, "mz_replica_info"),
633 vec![labels(&[
634 ("replica_id", "u2"),
635 ("cluster_id", "u7"),
636 ("name", "r1"),
637 ("size", ""),
640 ])]
641 );
642 }
643
644 #[mz_ore::test]
645 #[cfg_attr(miri, ignore)] fn replicas_of_unmanaged_clusters_report_their_size() {
647 let registry = MetricsRegistry::new();
648 let mut metrics = CatalogInfoMetrics::new(®istry);
649
650 let allocation: ReplicaAllocation =
654 serde_json::from_str(r#"{"scale": 2, "workers": 4, "credits_per_hour": "0"}"#)
655 .expect("valid allocation");
656 let replica = ClusterReplica {
657 name: "r2".to_string(),
658 cluster_id: ClusterId::user(7).expect("valid id"),
659 replica_id: ReplicaId::User(3),
660 config: ReplicaConfig {
661 location: ReplicaLocation::Managed(ManagedReplicaLocation {
662 allocation,
663 size: "scale=2,workers=4".to_string(),
664 internal: false,
665 billed_as: None,
666 availability_zones: Vec::new(),
667 pending: false,
668 }),
669 compute: ComputeReplicaConfig {
670 logging: Default::default(),
671 },
672 },
673 owner_id: RoleId::User(1),
674 };
675 metrics.insert_replica(&replica);
676
677 assert_eq!(
678 series(®istry, "mz_replica_info"),
679 vec![labels(&[
680 ("replica_id", "u3"),
681 ("cluster_id", "u7"),
682 ("name", "r2"),
683 ("size", "scale=2,workers=4"),
684 ])]
685 );
686 }
687
688 #[mz_ore::test(tokio::test)]
689 #[cfg_attr(miri, ignore)]
690 async fn reconcile_rebuilds_from_the_catalog_when_its_revision_changes() {
691 Catalog::with_debug(|mut catalog| async move {
692 let registry = MetricsRegistry::new();
693 let mut metrics = CatalogInfoMetrics::new(®istry);
694
695 let has_test_cluster_series = |registry: &MetricsRegistry| {
696 series(registry, "mz_cluster_info")
697 .iter()
698 .any(|s| s["name"] == "test_cluster")
699 };
700
701 metrics.reconcile(&catalog);
702
703 assert!(
706 series(®istry, "mz_cluster_info")
707 .iter()
708 .any(|s| s["name"] == "mz_system")
709 );
710 assert_ne!(series(®istry, "mz_replica_info"), Vec::new());
711 let objects = series(®istry, "mz_object_info");
712 assert!(objects.iter().any(|s| s["object_id"].starts_with("s")));
714 assert!(!objects.iter().any(|s| s["object_id"].starts_with("si")));
716
717 assert!(!has_test_cluster_series(®istry));
719
720 let commit_ts = catalog.current_upper().await;
722 let cluster_id = catalog
723 .allocate_user_cluster_id(commit_ts)
724 .await
725 .expect("failed to allocate cluster id");
726 let commit_ts = catalog.current_upper().await;
727 catalog
728 .transact(
729 None,
730 commit_ts,
731 None,
732 vec![Op::CreateCluster {
733 id: cluster_id,
734 name: "test_cluster".to_string(),
735 introspection_sources: Vec::new(),
736 owner_id: MZ_SYSTEM_ROLE_ID,
737 config: test_cluster_config("scale=1,workers=2"),
738 }],
739 )
740 .await
741 .expect("failed to transact");
742
743 assert!(!has_test_cluster_series(®istry));
745
746 metrics.reconcile(&catalog);
747 assert!(has_test_cluster_series(®istry));
748
749 catalog.expire().await;
750 })
751 .await
752 }
753}