Skip to main content

mz_adapter/coord/
info_metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Prometheus `*_info` metrics describing catalog objects.
11//!
12//! These follow the standard Prometheus "info" pattern (`kube_pod_info`
13//! style): one series per object, constant value `1`, with descriptive
14//! labels. They give other metrics a stable `group_left` join target for
15//! resolving object IDs to names.
16//!
17//! The metrics are periodically reconciled with the catalog by a
18//! background task ([Coordinator::spawn_catalog_info_metrics_task], driven by an
19//! interval off the coordinator's main loop. It rebuilds the series whenever the
20//! catalog's [transient revision](Catalog::transient_revision) changes, from the
21//! catalog. It's okay if the info metrics are not exactly up to date and
22//! eventually consistent.
23
24use std::time::{Duration, Instant};
25
26use mz_adapter_types::dyncfgs::CATALOG_INFO_METRICS_RECONCILE_INTERVAL;
27use mz_catalog::memory::objects::{
28    CatalogItem, Cluster, ClusterReplica, ClusterVariant, DataSourceDesc,
29};
30use mz_controller::clusters::ReplicaLocation;
31use mz_ore::metric;
32use mz_ore::metrics::{
33    DeleteOnDropGauge, Histogram, MetricTag, MetricVisibility, MetricsRegistry, UIntGaugeVec,
34};
35use mz_ore::stats::histogram_seconds_buckets;
36use mz_ore::task;
37use mz_ore::tracing::OpenTelemetryContext;
38use mz_repr::CatalogItemId;
39use mz_sql::names::{FullItemName, RawDatabaseSpecifier};
40use prometheus::core::AtomicU64;
41use tokio::sync::oneshot;
42use tracing::{debug, warn};
43
44use crate::catalog::{Catalog, catalog_type_to_audit_object_type};
45use crate::command::{CatalogSnapshot, Command};
46use crate::coord::{Coordinator, Message};
47
48/// Fallback reconcile cadence: the re-poll cadence used while reconciliation is
49/// disabled (a zero [CATALOG_INFO_METRICS_RECONCILE_INTERVAL]), so it can be
50/// re-enabled at runtime.
51const FALLBACK_RECONCILE_INTERVAL: Duration = Duration::from_secs(30);
52
53/// Reconciles that take longer than this are logged at warn level.
54const RECONCILE_WARN_THRESHOLD: Duration = Duration::from_secs(5);
55
56type InfoGauge = DeleteOnDropGauge<AtomicU64, Vec<String>>;
57
58/// `*_info` metrics for catalog objects, mirroring the in-memory catalog.
59///
60/// System and user items, clusters, and replicas are all reported. Temporary
61/// (session-scoped) items and per-cluster introspection source indexes are
62/// not.
63#[derive(Debug)]
64pub(crate) struct CatalogInfoMetrics {
65    object_info: UIntGaugeVec,
66    cluster_info: UIntGaugeVec,
67    replica_info: UIntGaugeVec,
68    source_info: UIntGaugeVec,
69    sink_info: UIntGaugeVec,
70    /// Handles to all live series, held purely so that dropping them removes
71    /// the series from the registry. Series are only ever created and dropped
72    /// wholesale, by [CatalogInfoMetrics::populate].
73    series: Vec<InfoGauge>,
74    /// The [Catalog::transient_revision] the metrics were last populated
75    /// from. Used to avoid rebuilding the metrics if the catalog has not changed.
76    last_revision: Option<u64>,
77    /// Times a full (re)build of the series in [CatalogInfoMetrics::populate].
78    reconcile_seconds: Histogram,
79}
80
81impl CatalogInfoMetrics {
82    pub fn new(registry: &MetricsRegistry) -> Self {
83        Self {
84            object_info: registry.register(metric!(
85                name: "mz_object_info",
86                help: "Maps catalog object IDs to the object's name, schema, database, and \
87                       type. Constant 1.",
88                var_labels: ["object_id", "global_id", "name", "schema_name", "database_name", "type"],
89                visibility: MetricVisibility::Public,
90                tags: [MetricTag::Environment],
91            )),
92            cluster_info: registry.register(metric!(
93                name: "mz_cluster_info",
94                help: "Maps cluster IDs to the cluster's name and size. Constant 1.",
95                var_labels: ["cluster_id", "name", "size"],
96                visibility: MetricVisibility::Public,
97                tags: [MetricTag::Compute],
98            )),
99            replica_info: registry.register(metric!(
100                name: "mz_replica_info",
101                help: "Maps cluster replica IDs to the replica's name and size. Constant 1.",
102                var_labels: ["replica_id", "cluster_id", "name", "size"],
103                visibility: MetricVisibility::Public,
104                tags: [MetricTag::Compute],
105            )),
106            source_info: registry.register(metric!(
107                name: "mz_source_info",
108                help: "Maps user source IDs to the source's type, envelope type, and \
109                       cluster. Constant 1.",
110                var_labels: ["source_id", "type", "envelope_type", "cluster_id"],
111                visibility: MetricVisibility::Public,
112                tags: [MetricTag::Source],
113            )),
114            sink_info: registry.register(metric!(
115                name: "mz_sink_info",
116                help: "Maps user sink IDs to the sink's type, envelope type, and \
117                       cluster. Constant 1.",
118                var_labels: ["sink_id", "type", "envelope_type", "cluster_id"],
119                visibility: MetricVisibility::Public,
120                tags: [MetricTag::Sink],
121            )),
122            series: Vec::new(),
123            last_revision: None,
124            reconcile_seconds: registry.register(metric!(
125                name: "mz_catalog_info_metrics_reconcile_seconds",
126                help: "Time taken to rebuild the catalog info metrics from a catalog snapshot.",
127                buckets: histogram_seconds_buckets(0.000_128, 8.0),
128            )),
129        }
130    }
131
132    /// Reconciles the info metrics with the given catalog, unless they
133    /// already reflect its revision.
134    pub fn reconcile(&mut self, catalog: &Catalog) {
135        if self.last_revision != Some(catalog.transient_revision()) {
136            debug!(
137                revision = catalog.transient_revision(),
138                last_revision = self.last_revision,
139                "reconciling catalog info metrics"
140            );
141            let start = Instant::now();
142            self.populate(catalog);
143            let elapsed = start.elapsed();
144            self.reconcile_seconds.observe(elapsed.as_secs_f64());
145            if elapsed > RECONCILE_WARN_THRESHOLD {
146                warn!(
147                    ?elapsed,
148                    series = self.series.len(),
149                    "catalog info metrics reconcile was slow"
150                );
151            }
152            self.last_revision = Some(catalog.transient_revision());
153        }
154    }
155
156    /// (Re)populates all info metrics from the given catalog.
157    fn populate(&mut self, catalog: &Catalog) {
158        // Drop all existing series before creating replacements: dropping a
159        // stale handle whose labels match a newly created series would remove
160        // the new series from the vector
161        self.series.clear();
162
163        for entry in catalog.entries() {
164            let full_name = catalog.resolve_full_name(entry.name(), entry.conn_id());
165            self.insert_item(entry.id(), entry.item(), &full_name);
166            self.insert_source_or_sink(entry.id(), entry.item());
167        }
168        for cluster in catalog.clusters() {
169            self.insert_cluster(cluster);
170            for replica in cluster.replicas() {
171                self.insert_replica(replica);
172            }
173        }
174    }
175
176    fn insert_item(&mut self, id: CatalogItemId, item: &CatalogItem, full_name: &FullItemName) {
177        match id {
178            // Ignore per-cluster introspection source indexes and temporary
179            // (session-scoped) items.
180            CatalogItemId::IntrospectionSourceIndex(_) | CatalogItemId::Transient(_) => return,
181            CatalogItemId::System(_) | CatalogItemId::User(_) => (),
182        }
183
184        // System (ambient) objects don't belong to a database.
185        let database_name = match &full_name.database {
186            RawDatabaseSpecifier::Name(name) => name.clone(),
187            RawDatabaseSpecifier::Ambient => String::new(),
188        };
189        let item_type = catalog_type_to_audit_object_type(item.typ()).to_string();
190        for global_id in item.global_ids() {
191            let series = new_series(
192                &self.object_info,
193                vec![
194                    id.to_string(),
195                    global_id.to_string(),
196                    full_name.item.clone(),
197                    full_name.schema.clone(),
198                    database_name.clone(),
199                    item_type.clone(),
200                ],
201            );
202            self.series.push(series);
203        }
204    }
205
206    /// Reports an `mz_source_info` or `mz_sink_info` series for `item` if it
207    /// is a source or sink. Progress sources and subsources are not reported.
208    fn insert_source_or_sink(&mut self, id: CatalogItemId, item: &CatalogItem) {
209        let (info_vec, object_type, envelope_type) = match item {
210            CatalogItem::Source(source) => {
211                match &source.data_source {
212                    DataSourceDesc::Ingestion { .. }
213                    | DataSourceDesc::OldSyntaxIngestion { .. }
214                    | DataSourceDesc::Webhook { .. } => (),
215                    DataSourceDesc::IngestionExport { .. }
216                    | DataSourceDesc::Progress
217                    | DataSourceDesc::Introspection(_)
218                    | DataSourceDesc::Catalog => return,
219                }
220                (
221                    &self.source_info,
222                    source.source_type(),
223                    source.data_source.envelope(),
224                )
225            }
226            CatalogItem::Sink(sink) => (&self.sink_info, sink.sink_type(), sink.envelope()),
227            _ => return,
228        };
229
230        let series = new_series(
231            info_vec,
232            vec![
233                id.to_string(),
234                object_type.to_string(),
235                envelope_type.map(|s| s.to_string()).unwrap_or_default(),
236                item.cluster_id()
237                    .map(|id| id.to_string())
238                    .unwrap_or_default(),
239            ],
240        );
241        self.series.push(series);
242    }
243
244    fn insert_cluster(&mut self, cluster: &Cluster) {
245        let size = match &cluster.config.variant {
246            ClusterVariant::Managed(managed) => managed.size.clone(),
247            ClusterVariant::Unmanaged => String::new(),
248        };
249        let series = new_series(
250            &self.cluster_info,
251            vec![cluster.id.to_string(), cluster.name.clone(), size],
252        );
253        self.series.push(series);
254    }
255
256    fn insert_replica(&mut self, replica: &ClusterReplica) {
257        let size = match &replica.config.location {
258            ReplicaLocation::Managed(managed) => managed.size.clone(),
259            ReplicaLocation::Unmanaged(_) => String::new(),
260        };
261        let series = new_series(
262            &self.replica_info,
263            vec![
264                replica.replica_id.to_string(),
265                replica.cluster_id.to_string(),
266                replica.name.clone(),
267                size,
268            ],
269        );
270        self.series.push(series);
271    }
272}
273
274impl Coordinator {
275    /// Spawns a background task that keeps the catalog info metrics in sync with
276    /// the catalog.
277    pub(crate) fn spawn_catalog_info_metrics_task(&self) {
278        let internal_cmd_tx = self.internal_cmd_tx.clone();
279        let mut metrics = CatalogInfoMetrics::new(&self.catalog_info_metrics_registry);
280        task::spawn(|| "catalog_info_metrics", async move {
281            loop {
282                let (tx, rx) = oneshot::channel();
283                let send = internal_cmd_tx.send(Message::Command(
284                    OpenTelemetryContext::obtain(),
285                    Command::CatalogSnapshot { tx },
286                ));
287                // Bail if the coordinator has gone away.
288                if send.is_err() {
289                    break;
290                }
291                let Ok(CatalogSnapshot { catalog }) = rx.await else {
292                    break;
293                };
294
295                // The reconcile cadence is a dyncfg; a zero interval disables
296                // reconciliation.
297                let interval =
298                    CATALOG_INFO_METRICS_RECONCILE_INTERVAL.get(catalog.system_config().dyncfgs());
299                if !interval.is_zero() {
300                    metrics.reconcile(&catalog);
301                }
302
303                // When disabled, keep polling at the fallback cadence so it can
304                // be re-enabled at runtime.
305                let sleep = if interval.is_zero() {
306                    FALLBACK_RECONCILE_INTERVAL
307                } else {
308                    interval
309                };
310                tokio::time::sleep(sleep).await;
311            }
312        });
313    }
314}
315
316fn new_series(vec: &UIntGaugeVec, labels: Vec<String>) -> InfoGauge {
317    let gauge = vec.get_delete_on_drop_metric(labels);
318    gauge.set(1);
319    gauge
320}
321#[cfg(test)]
322mod tests {
323    use std::collections::{BTreeMap, BTreeSet};
324
325    use mz_catalog::memory::objects::{
326        ClusterConfig, ClusterVariantManaged, Sink, Source, Table, TableDataSource,
327    };
328    use mz_compute_types::config::ComputeReplicaConfig;
329    use mz_controller::clusters::{
330        ManagedReplicaLocation, ReplicaAllocation, ReplicaConfig, UnmanagedReplicaLocation,
331    };
332    use mz_controller_types::{ClusterId, ReplicaId};
333    use mz_repr::adt::mz_acl_item::PrivilegeMap;
334    use mz_repr::role_id::RoleId;
335    use mz_repr::{GlobalId, RelationDesc, RelationVersion, SqlScalarType, VersionedRelationDesc};
336    use mz_sql::names::ResolvedIds;
337    use mz_sql::plan::{WebhookBodyFormat, WebhookHeaders};
338    use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
339    use mz_storage_types::sinks::{
340        KafkaIdStyle, KafkaSinkCompressionType, KafkaSinkConnection, KafkaSinkFormat,
341        KafkaSinkFormatType, SinkEnvelope, StorageSinkConnection,
342    };
343    use mz_storage_types::sources::Timeline;
344
345    use super::*;
346    use crate::catalog::Op;
347
348    /// Returns the label maps of all series of the metric `name`
349    fn series(registry: &MetricsRegistry, name: &str) -> Vec<BTreeMap<String, String>> {
350        registry
351            .gather()
352            .iter()
353            .filter(|family| family.name() == name)
354            .flat_map(|family| family.get_metric())
355            .map(|metric| {
356                // ensure every series has the value 1.
357                assert_eq!(metric.get_gauge().value(), 1.0, "info series must be 1");
358                metric
359                    .get_label()
360                    .iter()
361                    .map(|label| (label.name().to_string(), label.value().to_string()))
362                    .collect()
363            })
364            .collect()
365    }
366
367    fn labels(pairs: &[(&str, &str)]) -> BTreeMap<String, String> {
368        pairs
369            .iter()
370            .map(|(k, v)| (k.to_string(), v.to_string()))
371            .collect()
372    }
373
374    fn test_table(gid: GlobalId) -> CatalogItem {
375        CatalogItem::Table(Table {
376            create_sql: None,
377            desc: VersionedRelationDesc::new(
378                RelationDesc::builder()
379                    .with_column("a", SqlScalarType::String.nullable(false))
380                    .finish(),
381            ),
382            collections: BTreeMap::from([(RelationVersion::root(), gid)]),
383            conn_id: None,
384            resolved_ids: ResolvedIds::empty(),
385            custom_logical_compaction_window: None,
386            is_retained_metrics_object: false,
387            data_source: TableDataSource::TableWrites { defaults: vec![] },
388        })
389    }
390
391    fn test_webhook_source(gid: u64, cluster_id: ClusterId) -> CatalogItem {
392        CatalogItem::Source(Source {
393            create_sql: None,
394            global_id: GlobalId::User(gid),
395            data_source: DataSourceDesc::Webhook {
396                validate_using: None,
397                body_format: WebhookBodyFormat::Json { array: false },
398                headers: WebhookHeaders::default(),
399                cluster_id,
400            },
401            desc: RelationDesc::builder()
402                .with_column("a", SqlScalarType::String.nullable(false))
403                .finish(),
404            timeline: Timeline::EpochMilliseconds,
405            resolved_ids: ResolvedIds::empty(),
406            custom_logical_compaction_window: None,
407            is_retained_metrics_object: false,
408        })
409    }
410
411    fn test_kafka_sink(gid: u64, cluster_id: ClusterId) -> CatalogItem {
412        CatalogItem::Sink(Sink {
413            create_sql: "CREATE SINK s FROM t INTO KAFKA CONNECTION c (TOPIC 'topic')".to_string(),
414            global_id: GlobalId::User(gid),
415            from: GlobalId::User(1),
416            connection: StorageSinkConnection::Kafka(KafkaSinkConnection {
417                connection_id: CatalogItemId::User(2),
418                connection: CatalogItemId::User(2),
419                format: KafkaSinkFormat {
420                    key_format: None,
421                    value_format: KafkaSinkFormatType::Json,
422                },
423                relation_key_indices: None,
424                key_desc_and_indices: None,
425                headers_index: None,
426                value_desc: RelationDesc::builder()
427                    .with_column("a", SqlScalarType::String.nullable(false))
428                    .finish(),
429                partition_by: None,
430                topic: "topic".to_string(),
431                topic_options: Default::default(),
432                compression_type: KafkaSinkCompressionType::None,
433                progress_group_id: KafkaIdStyle::Legacy,
434                transactional_id: KafkaIdStyle::Legacy,
435                topic_metadata_refresh_interval: Duration::from_secs(60),
436            }),
437            envelope: SinkEnvelope::Upsert,
438            with_snapshot: true,
439            version: 0,
440            resolved_ids: ResolvedIds::empty(),
441            cluster_id,
442            commit_interval: None,
443        })
444    }
445
446    fn test_cluster_config(size: &str) -> ClusterConfig {
447        ClusterConfig {
448            variant: ClusterVariant::Managed(ClusterVariantManaged {
449                size: size.to_string(),
450                availability_zones: Vec::new(),
451                logging: Default::default(),
452                replication_factor: 1,
453                optimizer_feature_overrides: Default::default(),
454                schedule: Default::default(),
455                auto_scaling_strategy: None,
456                reconfiguration: None,
457                burst: None,
458            }),
459            workload_class: None,
460        }
461    }
462
463    fn test_cluster(id: ClusterId, name: &str, size: &str) -> Cluster {
464        Cluster {
465            name: name.to_string(),
466            id,
467            config: test_cluster_config(size),
468            log_indexes: BTreeMap::new(),
469            bound_objects: BTreeSet::new(),
470            replica_id_by_name_: BTreeMap::new(),
471            replicas_by_id_: BTreeMap::new(),
472            owner_id: RoleId::User(1),
473            privileges: PrivilegeMap::default(),
474        }
475    }
476
477    fn test_replica(cluster_id: ClusterId, replica_id: ReplicaId, name: &str) -> ClusterReplica {
478        ClusterReplica {
479            name: name.to_string(),
480            cluster_id,
481            replica_id,
482            config: ReplicaConfig {
483                location: ReplicaLocation::Unmanaged(UnmanagedReplicaLocation {
484                    storagectl_addrs: Vec::new(),
485                    computectl_addrs: Vec::new(),
486                }),
487                compute: ComputeReplicaConfig {
488                    logging: Default::default(),
489                },
490            },
491            owner_id: RoleId::User(1),
492        }
493    }
494
495    fn full_name(database: Option<&str>, schema: &str, item: &str) -> FullItemName {
496        FullItemName {
497            database: match database {
498                Some(database) => RawDatabaseSpecifier::Name(database.to_string()),
499                None => RawDatabaseSpecifier::Ambient,
500            },
501            schema: schema.to_string(),
502            item: item.to_string(),
503        }
504    }
505
506    #[mz_ore::test]
507    fn item_insertion_creates_object_info_series() {
508        let registry = MetricsRegistry::new();
509        let mut metrics = CatalogInfoMetrics::new(&registry);
510
511        metrics.insert_item(
512            CatalogItemId::User(1),
513            &test_table(GlobalId::User(1)),
514            &full_name(Some("materialize"), "public", "t"),
515        );
516
517        assert_eq!(
518            series(&registry, "mz_object_info"),
519            vec![labels(&[
520                ("object_id", "u1"),
521                ("global_id", "u1"),
522                ("name", "t"),
523                ("schema_name", "public"),
524                ("database_name", "materialize"),
525                ("type", "table"),
526            ])]
527        );
528    }
529
530    #[mz_ore::test]
531    fn system_items_are_reported() {
532        let registry = MetricsRegistry::new();
533        let mut metrics = CatalogInfoMetrics::new(&registry);
534
535        metrics.insert_item(
536            CatalogItemId::System(456),
537            &test_table(GlobalId::System(456)),
538            // System schemas are ambient, i.e. not in a database.
539            &full_name(None, "mz_catalog", "mz_test"),
540        );
541
542        assert_eq!(
543            series(&registry, "mz_object_info"),
544            vec![labels(&[
545                ("object_id", "s456"),
546                ("global_id", "s456"),
547                ("name", "mz_test"),
548                ("schema_name", "mz_catalog"),
549                ("database_name", ""),
550                ("type", "table"),
551            ])]
552        );
553    }
554
555    #[mz_ore::test]
556    fn introspection_indexes_and_temporary_items_are_not_reported() {
557        let registry = MetricsRegistry::new();
558        let mut metrics = CatalogInfoMetrics::new(&registry);
559
560        metrics.insert_item(
561            CatalogItemId::IntrospectionSourceIndex(5),
562            &test_table(GlobalId::IntrospectionSourceIndex(5)),
563            &full_name(None, "mz_introspection", "mz_test"),
564        );
565        metrics.insert_item(
566            CatalogItemId::Transient(6),
567            &test_table(GlobalId::Transient(6)),
568            &full_name(None, "mz_temp", "t"),
569        );
570
571        assert_eq!(series(&registry, "mz_object_info"), Vec::new());
572    }
573
574    #[mz_ore::test]
575    fn user_sources_get_source_info_series() {
576        let registry = MetricsRegistry::new();
577        let mut metrics = CatalogInfoMetrics::new(&registry);
578
579        let cluster_id = ClusterId::user(9).expect("valid id");
580        metrics.insert_source_or_sink(CatalogItemId::User(4), &test_webhook_source(4, cluster_id));
581
582        assert_eq!(
583            series(&registry, "mz_source_info"),
584            vec![labels(&[
585                ("source_id", "u4"),
586                ("type", "webhook"),
587                // Webhook sources have no envelope.
588                ("envelope_type", ""),
589                ("cluster_id", "u9"),
590            ])]
591        );
592    }
593
594    #[mz_ore::test]
595    fn user_sinks_get_sink_info_series() {
596        let registry = MetricsRegistry::new();
597        let mut metrics = CatalogInfoMetrics::new(&registry);
598
599        let cluster_id = ClusterId::user(9).expect("valid id");
600        metrics.insert_source_or_sink(CatalogItemId::User(5), &test_kafka_sink(5, cluster_id));
601
602        assert_eq!(
603            series(&registry, "mz_sink_info"),
604            vec![labels(&[
605                ("sink_id", "u5"),
606                ("type", "kafka"),
607                ("envelope_type", "upsert"),
608                ("cluster_id", "u9"),
609            ])]
610        );
611    }
612
613    #[mz_ore::test]
614    fn cluster_and_replica_insertions_create_info_series() {
615        let registry = MetricsRegistry::new();
616        let mut metrics = CatalogInfoMetrics::new(&registry);
617
618        let cluster_id = ClusterId::user(7).expect("valid id");
619        let replica_id = ReplicaId::User(2);
620        metrics.insert_cluster(&test_cluster(cluster_id, "prod", "123cc"));
621        metrics.insert_replica(&test_replica(cluster_id, replica_id, "r1"));
622
623        assert_eq!(
624            series(&registry, "mz_cluster_info"),
625            vec![labels(&[
626                ("cluster_id", "u7"),
627                ("name", "prod"),
628                ("size", "123cc"),
629            ])]
630        );
631        assert_eq!(
632            series(&registry, "mz_replica_info"),
633            vec![labels(&[
634                ("replica_id", "u2"),
635                ("cluster_id", "u7"),
636                ("name", "r1"),
637                // Replicas with unmanaged *locations* (user-specified
638                // addresses) have no allocation, and so no size.
639                ("size", ""),
640            ])]
641        );
642    }
643
644    #[mz_ore::test]
645    #[cfg_attr(miri, ignore)] // can't call foreign function `decContextDefault` on OS `linux`
646    fn replicas_of_unmanaged_clusters_report_their_size() {
647        let registry = MetricsRegistry::new();
648        let mut metrics = CatalogInfoMetrics::new(&registry);
649
650        // Replicas of unmanaged clusters (`CREATE CLUSTER c REPLICAS (..)`)
651        // have a managed *location* with an allocation and a size of their
652        // own, even though the cluster variant carries none.
653        let allocation: ReplicaAllocation =
654            serde_json::from_str(r#"{"scale": 2, "workers": 4, "credits_per_hour": "0"}"#)
655                .expect("valid allocation");
656        let replica = ClusterReplica {
657            name: "r2".to_string(),
658            cluster_id: ClusterId::user(7).expect("valid id"),
659            replica_id: ReplicaId::User(3),
660            config: ReplicaConfig {
661                location: ReplicaLocation::Managed(ManagedReplicaLocation {
662                    allocation,
663                    size: "scale=2,workers=4".to_string(),
664                    internal: false,
665                    billed_as: None,
666                    availability_zones: Vec::new(),
667                    pending: false,
668                }),
669                compute: ComputeReplicaConfig {
670                    logging: Default::default(),
671                },
672            },
673            owner_id: RoleId::User(1),
674        };
675        metrics.insert_replica(&replica);
676
677        assert_eq!(
678            series(&registry, "mz_replica_info"),
679            vec![labels(&[
680                ("replica_id", "u3"),
681                ("cluster_id", "u7"),
682                ("name", "r2"),
683                ("size", "scale=2,workers=4"),
684            ])]
685        );
686    }
687
688    #[mz_ore::test(tokio::test)]
689    #[cfg_attr(miri, ignore)]
690    async fn reconcile_rebuilds_from_the_catalog_when_its_revision_changes() {
691        Catalog::with_debug(|mut catalog| async move {
692            let registry = MetricsRegistry::new();
693            let mut metrics = CatalogInfoMetrics::new(&registry);
694
695            let has_test_cluster_series = |registry: &MetricsRegistry| {
696                series(registry, "mz_cluster_info")
697                    .iter()
698                    .any(|s| s["name"] == "test_cluster")
699            };
700
701            metrics.reconcile(&catalog);
702
703            // The initial reconciliation reports the catalog's contents,
704            // including system items, but not introspection source indexes.
705            assert!(
706                series(&registry, "mz_cluster_info")
707                    .iter()
708                    .any(|s| s["name"] == "mz_system")
709            );
710            assert_ne!(series(&registry, "mz_replica_info"), Vec::new());
711            let objects = series(&registry, "mz_object_info");
712            // System items
713            assert!(objects.iter().any(|s| s["object_id"].starts_with("s")));
714            // Introspection source indexes
715            assert!(!objects.iter().any(|s| s["object_id"].starts_with("si")));
716
717            // Assert that the test cluster series is not present.
718            assert!(!has_test_cluster_series(&registry));
719
720            // Create a cluster in the catalog.
721            let commit_ts = catalog.current_upper().await;
722            let cluster_id = catalog
723                .allocate_user_cluster_id(commit_ts)
724                .await
725                .expect("failed to allocate cluster id");
726            let commit_ts = catalog.current_upper().await;
727            catalog
728                .transact(
729                    None,
730                    commit_ts,
731                    None,
732                    vec![Op::CreateCluster {
733                        id: cluster_id,
734                        name: "test_cluster".to_string(),
735                        introspection_sources: Vec::new(),
736                        owner_id: MZ_SYSTEM_ROLE_ID,
737                        config: test_cluster_config("scale=1,workers=2"),
738                    }],
739                )
740                .await
741                .expect("failed to transact");
742
743            // The metrics are stale until the next reconciliation.
744            assert!(!has_test_cluster_series(&registry));
745
746            metrics.reconcile(&catalog);
747            assert!(has_test_cluster_series(&registry));
748
749            catalog.expire().await;
750        })
751        .await
752    }
753}