Skip to main content

mz_ore/metrics/
rule.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Enrichment rules for Prometheus metrics.
17//!
18//! Subsystems register [`Rule`]s alongside their metrics on a
19//! [`crate::metrics::MetricsRegistry`]. When env's federated `/metrics/public`
20//! endpoint scrapes a remote registry, it resolves the rules against the
21//! catalog to attach human-readable name labels (e.g. `cluster_name`,
22//! `source_name`) onto metrics.
23
24use std::collections::BTreeMap;
25
26use prometheus::proto::{LabelPair, MetricFamily};
27use serde::{Deserialize, Serialize};
28
29/// The resolved, fully-qualified parts of a catalog object's name.
30#[derive(Clone, Debug, PartialEq, Eq)]
31pub struct ObjectName {
32    /// The database name
33    pub database: String,
34    /// The schema name.
35    pub schema: String,
36    /// The object (item) name.
37    pub object: String,
38}
39
40/// Resolves IDs to their names
41pub trait NameLookup {
42    /// Returns the name of the cluster with the given ID, if it exists.
43    fn cluster_name(&self, cluster_id: &str) -> Option<String>;
44    /// Returns the name of the replica with the given (cluster, replica) IDs.
45    fn replica_name(&self, cluster_id: &str, replica_id: &str) -> Option<String>;
46    /// Returns the fully-qualified name of the catalog object with the given
47    /// global ID.
48    fn object_name(&self, global_id: &str) -> Option<ObjectName>;
49}
50
51/// A declarative enrichment rule applied to a metric family at scrape time.
52///
53/// Each variant reads one or more ID labels already present on a metric and
54/// adds one or more resolved name labels.
55#[derive(
56    Clone,
57    Debug,
58    Serialize,
59    Deserialize,
60    PartialEq,
61    Eq,
62    PartialOrd,
63    Ord,
64    Hash
65)]
66#[serde(rename_all = "snake_case", tag = "kind")]
67pub enum Rule {
68    /// Reads the cluster ID from `cluster_id_label` and writes the resolved
69    /// cluster name into `output_label`.
70    ClusterNameLookup {
71        /// Name of the label on the metric that carries the cluster ID.
72        cluster_id_label: String,
73        /// Name of the label to add with the resolved cluster name.
74        output_label: String,
75    },
76    /// Reads the cluster ID from `cluster_id_label` and the replica ID from
77    /// `replica_id_label`, then writes the resolved replica name into
78    /// `output_label`.
79    ReplicaNameLookup {
80        /// Name of the label on the metric that carries the cluster ID.
81        cluster_id_label: String,
82        /// Name of the label on the metric that carries the replica ID.
83        replica_id_label: String,
84        /// Name of the label to add with the resolved replica name.
85        output_label: String,
86    },
87    /// Reads a `GlobalId` from `object_id_label` and writes the resolved
88    /// catalog object's fully-qualified name into three labels: the object
89    /// name into `output_object_label`, the schema into `output_schema_label`,
90    /// and the database into `output_database_label` (empty for system objects).
91    ObjectNameLookup {
92        /// Name of the label on the metric that carries the global ID.
93        object_id_label: String,
94        /// Name of the label to add with the resolved object name.
95        output_object_label: String,
96        /// Name of the label to add with the resolved schema name.
97        output_schema_label: String,
98        /// Name of the label to add with the resolved database name.
99        output_database_label: String,
100    },
101}
102
103/// Builds a Prometheus [`LabelPair`] from a label name and value.
104fn label_pair(name: String, value: String) -> LabelPair {
105    let mut pair = LabelPair::default();
106    pair.set_name(name);
107    pair.set_value(value);
108    pair
109}
110
111impl Rule {
112    /// Builds an [`Rule::ObjectNameLookup`] that resolves `object_id_label` and
113    /// writes the object name to `output_object_label`, with the schema and
114    /// database written to the default `schema_name`/`database_name` labels.
115    pub fn object_name_lookup_with_default_labels(
116        object_id_label: impl Into<String>,
117        output_object_label: impl Into<String>,
118    ) -> Rule {
119        Rule::ObjectNameLookup {
120            object_id_label: object_id_label.into(),
121            output_object_label: output_object_label.into(),
122            output_schema_label: "schema_name".into(),
123            output_database_label: "database_name".into(),
124        }
125    }
126
127    /// Resolves this rule against a metric's current `labels` using `lookup`,
128    /// returning the label pairs the rule wants to add. Empty if the required
129    /// input labels are missing or the lookup fails to resolve.
130    fn resolve<L: NameLookup>(&self, labels: &BTreeMap<&str, &str>, lookup: &L) -> Vec<LabelPair> {
131        match self {
132            Rule::ClusterNameLookup {
133                cluster_id_label,
134                output_label,
135            } => labels
136                .get(cluster_id_label.as_str())
137                .copied()
138                .and_then(|cid| lookup.cluster_name(cid))
139                .map(|name| vec![label_pair(output_label.clone(), name)])
140                .unwrap_or_default(),
141            Rule::ReplicaNameLookup {
142                cluster_id_label,
143                replica_id_label,
144                output_label,
145            } => match (
146                labels.get(cluster_id_label.as_str()).copied(),
147                labels.get(replica_id_label.as_str()).copied(),
148            ) {
149                (Some(cid), Some(rid)) => lookup
150                    .replica_name(cid, rid)
151                    .map(|name| vec![label_pair(output_label.clone(), name)])
152                    .unwrap_or_default(),
153                _ => Vec::new(),
154            },
155            Rule::ObjectNameLookup {
156                object_id_label,
157                output_object_label,
158                output_schema_label,
159                output_database_label,
160            } => {
161                let Some(name) = labels
162                    .get(object_id_label.as_str())
163                    .copied()
164                    .and_then(|oid| lookup.object_name(oid))
165                else {
166                    return Vec::new();
167                };
168                vec![
169                    label_pair(output_object_label.clone(), name.object),
170                    label_pair(output_schema_label.clone(), name.schema),
171                    label_pair(output_database_label.clone(), name.database),
172                ]
173            }
174        }
175    }
176
177    /// Applies the rule to every metric in `family`, adding the resolved name
178    /// labels. A label that is already present is left untouched (so rules
179    /// never overwrite existing values or produce duplicates).
180    pub fn apply<L: NameLookup>(&self, family: &mut MetricFamily, lookup: &L) {
181        for metric in family.mut_metric() {
182            let labels: BTreeMap<&str, &str> = metric
183                .get_label()
184                .iter()
185                .map(|l| (l.name(), l.value()))
186                .collect();
187            let labels_to_add: Vec<LabelPair> = self
188                .resolve(&labels, lookup)
189                .into_iter()
190                // Skip any resolved label already on the metric, so we never
191                // overwrite an existing value or emit a duplicate label.
192                .filter(|l| !labels.contains_key(l.name()))
193                .collect();
194
195            let mut all = metric.take_label();
196            all.extend(labels_to_add);
197            metric.set_label(all);
198        }
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use std::collections::BTreeMap;
205
206    use prometheus::proto::{Counter, Metric, MetricFamily, MetricType};
207
208    use super::*;
209
210    /// In-memory [`NameLookup`] for unit tests.
211    #[derive(Default)]
212    struct FakeCatalog {
213        clusters: BTreeMap<String, String>,
214        replicas: BTreeMap<(String, String), String>,
215        objects: BTreeMap<String, ObjectName>,
216    }
217
218    impl FakeCatalog {
219        fn with_cluster(mut self, id: &str, name: &str) -> Self {
220            self.clusters.insert(id.into(), name.into());
221            self
222        }
223        fn with_replica(mut self, cid: &str, rid: &str, name: &str) -> Self {
224            self.replicas.insert((cid.into(), rid.into()), name.into());
225            self
226        }
227        fn with_object(mut self, id: &str, object: &str, schema: &str, database: &str) -> Self {
228            self.objects.insert(
229                id.into(),
230                ObjectName {
231                    database: database.into(),
232                    schema: schema.into(),
233                    object: object.into(),
234                },
235            );
236            self
237        }
238    }
239
240    impl NameLookup for FakeCatalog {
241        fn cluster_name(&self, cluster_id: &str) -> Option<String> {
242            self.clusters.get(cluster_id).cloned()
243        }
244        fn replica_name(&self, cluster_id: &str, replica_id: &str) -> Option<String> {
245            self.replicas
246                .get(&(cluster_id.to_owned(), replica_id.to_owned()))
247                .cloned()
248        }
249        fn object_name(&self, global_id: &str) -> Option<ObjectName> {
250            self.objects.get(global_id).cloned()
251        }
252    }
253
254    fn label(name: &str, value: &str) -> LabelPair {
255        let mut p = LabelPair::default();
256        p.set_name(name.into());
257        p.set_value(value.into());
258        p
259    }
260
261    fn family_with_labels(labels: Vec<LabelPair>) -> MetricFamily {
262        let mut family = MetricFamily::default();
263        family.set_name("test_metric".into());
264        family.set_field_type(MetricType::COUNTER);
265        family.set_help("help for test_metric".into());
266        let mut metric = Metric::default();
267        let mut counter = Counter::default();
268        counter.set_value(1.0);
269        metric.set_counter(counter);
270        metric.set_label(labels);
271        family.set_metric(vec![metric]);
272        family
273    }
274
275    fn label_names(family: &MetricFamily) -> Vec<&str> {
276        family.get_metric()[0]
277            .get_label()
278            .iter()
279            .map(|l| l.name())
280            .collect()
281    }
282
283    fn label_value<'a>(family: &'a MetricFamily, name: &str) -> Option<&'a str> {
284        family.get_metric()[0]
285            .get_label()
286            .iter()
287            .find(|l| l.name() == name)
288            .map(|l| l.value())
289    }
290
291    #[crate::test]
292    fn cluster_name_lookup_attaches_name() {
293        let mut family = family_with_labels(vec![label("cluster_id", "u1")]);
294        let catalog = FakeCatalog::default().with_cluster("u1", "quickstart");
295        let rule = Rule::ClusterNameLookup {
296            cluster_id_label: "cluster_id".into(),
297            output_label: "cluster_name".into(),
298        };
299        rule.apply(&mut family, &catalog);
300        assert_eq!(label_names(&family), vec!["cluster_id", "cluster_name"]);
301        assert_eq!(label_value(&family, "cluster_name"), Some("quickstart"));
302    }
303
304    #[crate::test]
305    fn replica_name_lookup_attaches_name() {
306        let mut family =
307            family_with_labels(vec![label("cluster_id", "u1"), label("replica_id", "u2")]);
308        let catalog = FakeCatalog::default().with_replica("u1", "u2", "r1");
309        let rule = Rule::ReplicaNameLookup {
310            cluster_id_label: "cluster_id".into(),
311            replica_id_label: "replica_id".into(),
312            output_label: "replica_name".into(),
313        };
314        rule.apply(&mut family, &catalog);
315        assert_eq!(
316            label_names(&family),
317            vec!["cluster_id", "replica_id", "replica_name"]
318        );
319        assert_eq!(label_value(&family, "replica_name"), Some("r1"));
320    }
321
322    #[crate::test]
323    fn object_name_lookup_attaches_name_with_custom_output() {
324        let mut family = family_with_labels(vec![label("collection_id", "s100")]);
325        let catalog =
326            FakeCatalog::default().with_object("s100", "my_source", "public", "materialize");
327        let rule = Rule::object_name_lookup_with_default_labels("collection_id", "source_name");
328        rule.apply(&mut family, &catalog);
329        assert_eq!(
330            label_names(&family),
331            vec![
332                "collection_id",
333                "source_name",
334                "schema_name",
335                "database_name"
336            ]
337        );
338        assert_eq!(label_value(&family, "source_name"), Some("my_source"));
339        assert_eq!(label_value(&family, "schema_name"), Some("public"));
340        assert_eq!(label_value(&family, "database_name"), Some("materialize"));
341    }
342
343    #[crate::test]
344    fn object_name_lookup_emits_empty_database_for_system_object() {
345        let mut family = family_with_labels(vec![label("collection_id", "s1")]);
346        // System/ambient objects have no database; the lookup yields "".
347        let catalog = FakeCatalog::default().with_object("s1", "mz_objects", "mz_catalog", "");
348        let rule = Rule::object_name_lookup_with_default_labels("collection_id", "object_name");
349        rule.apply(&mut family, &catalog);
350        assert_eq!(label_value(&family, "object_name"), Some("mz_objects"));
351        assert_eq!(label_value(&family, "schema_name"), Some("mz_catalog"));
352        // The database label is still emitted, with an empty value.
353        assert_eq!(label_value(&family, "database_name"), Some(""));
354    }
355
356    #[crate::test]
357    fn object_name_lookup_with_distinct_labels_avoids_collision() {
358        // Two object lookups on one metric: the second uses distinct schema /
359        // database label names so it doesn't collide with the first's defaults.
360        let mut family = family_with_labels(vec![
361            label("source_id", "s1"),
362            label("parent_source_id", "s2"),
363        ]);
364        let catalog = FakeCatalog::default()
365            .with_object("s1", "child", "public", "materialize")
366            .with_object("s2", "parent", "other_schema", "other_db");
367        let source_rule = Rule::object_name_lookup_with_default_labels("source_id", "source_name");
368        let parent_rule = Rule::ObjectNameLookup {
369            object_id_label: "parent_source_id".into(),
370            output_object_label: "parent_source_name".into(),
371            output_schema_label: "parent_source_schema_name".into(),
372            output_database_label: "parent_source_database_name".into(),
373        };
374        source_rule.apply(&mut family, &catalog);
375        parent_rule.apply(&mut family, &catalog);
376        assert_eq!(label_value(&family, "source_name"), Some("child"));
377        assert_eq!(label_value(&family, "schema_name"), Some("public"));
378        assert_eq!(label_value(&family, "database_name"), Some("materialize"));
379        assert_eq!(label_value(&family, "parent_source_name"), Some("parent"));
380        assert_eq!(
381            label_value(&family, "parent_source_schema_name"),
382            Some("other_schema")
383        );
384        assert_eq!(
385            label_value(&family, "parent_source_database_name"),
386            Some("other_db")
387        );
388    }
389
390    #[crate::test]
391    fn apply_skips_when_input_label_missing() {
392        let mut family = family_with_labels(vec![label("unrelated", "x")]);
393        let catalog = FakeCatalog::default().with_cluster("u1", "quickstart");
394        let rule = Rule::ClusterNameLookup {
395            cluster_id_label: "cluster_id".into(),
396            output_label: "cluster_name".into(),
397        };
398        rule.apply(&mut family, &catalog);
399        assert_eq!(label_names(&family), vec!["unrelated"]);
400    }
401
402    #[crate::test]
403    fn apply_skips_when_lookup_returns_none() {
404        let mut family = family_with_labels(vec![label("cluster_id", "u99")]);
405        let catalog = FakeCatalog::default(); // no clusters
406        let rule = Rule::ClusterNameLookup {
407            cluster_id_label: "cluster_id".into(),
408            output_label: "cluster_name".into(),
409        };
410        rule.apply(&mut family, &catalog);
411        assert_eq!(label_names(&family), vec!["cluster_id"]);
412    }
413
414    #[crate::test]
415    fn apply_skips_when_output_label_already_present() {
416        let mut family = family_with_labels(vec![
417            label("cluster_id", "u1"),
418            label("cluster_name", "preset"),
419        ]);
420        let catalog = FakeCatalog::default().with_cluster("u1", "quickstart");
421        let rule = Rule::ClusterNameLookup {
422            cluster_id_label: "cluster_id".into(),
423            output_label: "cluster_name".into(),
424        };
425        rule.apply(&mut family, &catalog);
426        // Existing value preserved; not overwritten, not duplicated.
427        assert_eq!(label_names(&family), vec!["cluster_id", "cluster_name"]);
428        assert_eq!(label_value(&family, "cluster_name"), Some("preset"));
429    }
430
431    #[crate::test]
432    fn two_rules() {
433        let mut family =
434            family_with_labels(vec![label("cluster_id", "u1"), label("replica_id", "u2")]);
435        let catalog = FakeCatalog::default()
436            .with_cluster("u1", "quickstart")
437            .with_replica("u1", "u2", "r1");
438        let cluster_rule = Rule::ClusterNameLookup {
439            cluster_id_label: "cluster_id".into(),
440            output_label: "cluster_name".into(),
441        };
442        let replica_rule = Rule::ReplicaNameLookup {
443            cluster_id_label: "cluster_id".into(),
444            replica_id_label: "replica_id".into(),
445            output_label: "replica_name".into(),
446        };
447        cluster_rule.apply(&mut family, &catalog);
448        replica_rule.apply(&mut family, &catalog);
449        assert_eq!(label_value(&family, "cluster_name"), Some("quickstart"));
450        assert_eq!(label_value(&family, "replica_name"), Some("r1"));
451    }
452}