1use std::collections::BTreeMap;
25
26use prometheus::proto::{LabelPair, MetricFamily};
27use serde::{Deserialize, Serialize};
28
29#[derive(Clone, Debug, PartialEq, Eq)]
31pub struct ObjectName {
32 pub database: String,
34 pub schema: String,
36 pub object: String,
38}
39
40pub trait NameLookup {
42 fn cluster_name(&self, cluster_id: &str) -> Option<String>;
44 fn replica_name(&self, cluster_id: &str, replica_id: &str) -> Option<String>;
46 fn object_name(&self, global_id: &str) -> Option<ObjectName>;
49}
50
51#[derive(
56 Clone,
57 Debug,
58 Serialize,
59 Deserialize,
60 PartialEq,
61 Eq,
62 PartialOrd,
63 Ord,
64 Hash
65)]
66#[serde(rename_all = "snake_case", tag = "kind")]
67pub enum Rule {
68 ClusterNameLookup {
71 cluster_id_label: String,
73 output_label: String,
75 },
76 ReplicaNameLookup {
80 cluster_id_label: String,
82 replica_id_label: String,
84 output_label: String,
86 },
87 ObjectNameLookup {
92 object_id_label: String,
94 output_object_label: String,
96 output_schema_label: String,
98 output_database_label: String,
100 },
101}
102
103fn label_pair(name: String, value: String) -> LabelPair {
105 let mut pair = LabelPair::default();
106 pair.set_name(name);
107 pair.set_value(value);
108 pair
109}
110
111impl Rule {
112 pub fn object_name_lookup_with_default_labels(
116 object_id_label: impl Into<String>,
117 output_object_label: impl Into<String>,
118 ) -> Rule {
119 Rule::ObjectNameLookup {
120 object_id_label: object_id_label.into(),
121 output_object_label: output_object_label.into(),
122 output_schema_label: "schema_name".into(),
123 output_database_label: "database_name".into(),
124 }
125 }
126
127 fn resolve<L: NameLookup>(&self, labels: &BTreeMap<&str, &str>, lookup: &L) -> Vec<LabelPair> {
131 match self {
132 Rule::ClusterNameLookup {
133 cluster_id_label,
134 output_label,
135 } => labels
136 .get(cluster_id_label.as_str())
137 .copied()
138 .and_then(|cid| lookup.cluster_name(cid))
139 .map(|name| vec![label_pair(output_label.clone(), name)])
140 .unwrap_or_default(),
141 Rule::ReplicaNameLookup {
142 cluster_id_label,
143 replica_id_label,
144 output_label,
145 } => match (
146 labels.get(cluster_id_label.as_str()).copied(),
147 labels.get(replica_id_label.as_str()).copied(),
148 ) {
149 (Some(cid), Some(rid)) => lookup
150 .replica_name(cid, rid)
151 .map(|name| vec![label_pair(output_label.clone(), name)])
152 .unwrap_or_default(),
153 _ => Vec::new(),
154 },
155 Rule::ObjectNameLookup {
156 object_id_label,
157 output_object_label,
158 output_schema_label,
159 output_database_label,
160 } => {
161 let Some(name) = labels
162 .get(object_id_label.as_str())
163 .copied()
164 .and_then(|oid| lookup.object_name(oid))
165 else {
166 return Vec::new();
167 };
168 vec![
169 label_pair(output_object_label.clone(), name.object),
170 label_pair(output_schema_label.clone(), name.schema),
171 label_pair(output_database_label.clone(), name.database),
172 ]
173 }
174 }
175 }
176
177 pub fn apply<L: NameLookup>(&self, family: &mut MetricFamily, lookup: &L) {
181 for metric in family.mut_metric() {
182 let labels: BTreeMap<&str, &str> = metric
183 .get_label()
184 .iter()
185 .map(|l| (l.name(), l.value()))
186 .collect();
187 let labels_to_add: Vec<LabelPair> = self
188 .resolve(&labels, lookup)
189 .into_iter()
190 .filter(|l| !labels.contains_key(l.name()))
193 .collect();
194
195 let mut all = metric.take_label();
196 all.extend(labels_to_add);
197 metric.set_label(all);
198 }
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use std::collections::BTreeMap;
205
206 use prometheus::proto::{Counter, Metric, MetricFamily, MetricType};
207
208 use super::*;
209
210 #[derive(Default)]
212 struct FakeCatalog {
213 clusters: BTreeMap<String, String>,
214 replicas: BTreeMap<(String, String), String>,
215 objects: BTreeMap<String, ObjectName>,
216 }
217
218 impl FakeCatalog {
219 fn with_cluster(mut self, id: &str, name: &str) -> Self {
220 self.clusters.insert(id.into(), name.into());
221 self
222 }
223 fn with_replica(mut self, cid: &str, rid: &str, name: &str) -> Self {
224 self.replicas.insert((cid.into(), rid.into()), name.into());
225 self
226 }
227 fn with_object(mut self, id: &str, object: &str, schema: &str, database: &str) -> Self {
228 self.objects.insert(
229 id.into(),
230 ObjectName {
231 database: database.into(),
232 schema: schema.into(),
233 object: object.into(),
234 },
235 );
236 self
237 }
238 }
239
240 impl NameLookup for FakeCatalog {
241 fn cluster_name(&self, cluster_id: &str) -> Option<String> {
242 self.clusters.get(cluster_id).cloned()
243 }
244 fn replica_name(&self, cluster_id: &str, replica_id: &str) -> Option<String> {
245 self.replicas
246 .get(&(cluster_id.to_owned(), replica_id.to_owned()))
247 .cloned()
248 }
249 fn object_name(&self, global_id: &str) -> Option<ObjectName> {
250 self.objects.get(global_id).cloned()
251 }
252 }
253
254 fn label(name: &str, value: &str) -> LabelPair {
255 let mut p = LabelPair::default();
256 p.set_name(name.into());
257 p.set_value(value.into());
258 p
259 }
260
261 fn family_with_labels(labels: Vec<LabelPair>) -> MetricFamily {
262 let mut family = MetricFamily::default();
263 family.set_name("test_metric".into());
264 family.set_field_type(MetricType::COUNTER);
265 family.set_help("help for test_metric".into());
266 let mut metric = Metric::default();
267 let mut counter = Counter::default();
268 counter.set_value(1.0);
269 metric.set_counter(counter);
270 metric.set_label(labels);
271 family.set_metric(vec![metric]);
272 family
273 }
274
275 fn label_names(family: &MetricFamily) -> Vec<&str> {
276 family.get_metric()[0]
277 .get_label()
278 .iter()
279 .map(|l| l.name())
280 .collect()
281 }
282
283 fn label_value<'a>(family: &'a MetricFamily, name: &str) -> Option<&'a str> {
284 family.get_metric()[0]
285 .get_label()
286 .iter()
287 .find(|l| l.name() == name)
288 .map(|l| l.value())
289 }
290
291 #[crate::test]
292 fn cluster_name_lookup_attaches_name() {
293 let mut family = family_with_labels(vec![label("cluster_id", "u1")]);
294 let catalog = FakeCatalog::default().with_cluster("u1", "quickstart");
295 let rule = Rule::ClusterNameLookup {
296 cluster_id_label: "cluster_id".into(),
297 output_label: "cluster_name".into(),
298 };
299 rule.apply(&mut family, &catalog);
300 assert_eq!(label_names(&family), vec!["cluster_id", "cluster_name"]);
301 assert_eq!(label_value(&family, "cluster_name"), Some("quickstart"));
302 }
303
304 #[crate::test]
305 fn replica_name_lookup_attaches_name() {
306 let mut family =
307 family_with_labels(vec![label("cluster_id", "u1"), label("replica_id", "u2")]);
308 let catalog = FakeCatalog::default().with_replica("u1", "u2", "r1");
309 let rule = Rule::ReplicaNameLookup {
310 cluster_id_label: "cluster_id".into(),
311 replica_id_label: "replica_id".into(),
312 output_label: "replica_name".into(),
313 };
314 rule.apply(&mut family, &catalog);
315 assert_eq!(
316 label_names(&family),
317 vec!["cluster_id", "replica_id", "replica_name"]
318 );
319 assert_eq!(label_value(&family, "replica_name"), Some("r1"));
320 }
321
322 #[crate::test]
323 fn object_name_lookup_attaches_name_with_custom_output() {
324 let mut family = family_with_labels(vec![label("collection_id", "s100")]);
325 let catalog =
326 FakeCatalog::default().with_object("s100", "my_source", "public", "materialize");
327 let rule = Rule::object_name_lookup_with_default_labels("collection_id", "source_name");
328 rule.apply(&mut family, &catalog);
329 assert_eq!(
330 label_names(&family),
331 vec![
332 "collection_id",
333 "source_name",
334 "schema_name",
335 "database_name"
336 ]
337 );
338 assert_eq!(label_value(&family, "source_name"), Some("my_source"));
339 assert_eq!(label_value(&family, "schema_name"), Some("public"));
340 assert_eq!(label_value(&family, "database_name"), Some("materialize"));
341 }
342
343 #[crate::test]
344 fn object_name_lookup_emits_empty_database_for_system_object() {
345 let mut family = family_with_labels(vec![label("collection_id", "s1")]);
346 let catalog = FakeCatalog::default().with_object("s1", "mz_objects", "mz_catalog", "");
348 let rule = Rule::object_name_lookup_with_default_labels("collection_id", "object_name");
349 rule.apply(&mut family, &catalog);
350 assert_eq!(label_value(&family, "object_name"), Some("mz_objects"));
351 assert_eq!(label_value(&family, "schema_name"), Some("mz_catalog"));
352 assert_eq!(label_value(&family, "database_name"), Some(""));
354 }
355
356 #[crate::test]
357 fn object_name_lookup_with_distinct_labels_avoids_collision() {
358 let mut family = family_with_labels(vec![
361 label("source_id", "s1"),
362 label("parent_source_id", "s2"),
363 ]);
364 let catalog = FakeCatalog::default()
365 .with_object("s1", "child", "public", "materialize")
366 .with_object("s2", "parent", "other_schema", "other_db");
367 let source_rule = Rule::object_name_lookup_with_default_labels("source_id", "source_name");
368 let parent_rule = Rule::ObjectNameLookup {
369 object_id_label: "parent_source_id".into(),
370 output_object_label: "parent_source_name".into(),
371 output_schema_label: "parent_source_schema_name".into(),
372 output_database_label: "parent_source_database_name".into(),
373 };
374 source_rule.apply(&mut family, &catalog);
375 parent_rule.apply(&mut family, &catalog);
376 assert_eq!(label_value(&family, "source_name"), Some("child"));
377 assert_eq!(label_value(&family, "schema_name"), Some("public"));
378 assert_eq!(label_value(&family, "database_name"), Some("materialize"));
379 assert_eq!(label_value(&family, "parent_source_name"), Some("parent"));
380 assert_eq!(
381 label_value(&family, "parent_source_schema_name"),
382 Some("other_schema")
383 );
384 assert_eq!(
385 label_value(&family, "parent_source_database_name"),
386 Some("other_db")
387 );
388 }
389
390 #[crate::test]
391 fn apply_skips_when_input_label_missing() {
392 let mut family = family_with_labels(vec![label("unrelated", "x")]);
393 let catalog = FakeCatalog::default().with_cluster("u1", "quickstart");
394 let rule = Rule::ClusterNameLookup {
395 cluster_id_label: "cluster_id".into(),
396 output_label: "cluster_name".into(),
397 };
398 rule.apply(&mut family, &catalog);
399 assert_eq!(label_names(&family), vec!["unrelated"]);
400 }
401
402 #[crate::test]
403 fn apply_skips_when_lookup_returns_none() {
404 let mut family = family_with_labels(vec![label("cluster_id", "u99")]);
405 let catalog = FakeCatalog::default(); let rule = Rule::ClusterNameLookup {
407 cluster_id_label: "cluster_id".into(),
408 output_label: "cluster_name".into(),
409 };
410 rule.apply(&mut family, &catalog);
411 assert_eq!(label_names(&family), vec!["cluster_id"]);
412 }
413
414 #[crate::test]
415 fn apply_skips_when_output_label_already_present() {
416 let mut family = family_with_labels(vec![
417 label("cluster_id", "u1"),
418 label("cluster_name", "preset"),
419 ]);
420 let catalog = FakeCatalog::default().with_cluster("u1", "quickstart");
421 let rule = Rule::ClusterNameLookup {
422 cluster_id_label: "cluster_id".into(),
423 output_label: "cluster_name".into(),
424 };
425 rule.apply(&mut family, &catalog);
426 assert_eq!(label_names(&family), vec!["cluster_id", "cluster_name"]);
428 assert_eq!(label_value(&family, "cluster_name"), Some("preset"));
429 }
430
431 #[crate::test]
432 fn two_rules() {
433 let mut family =
434 family_with_labels(vec![label("cluster_id", "u1"), label("replica_id", "u2")]);
435 let catalog = FakeCatalog::default()
436 .with_cluster("u1", "quickstart")
437 .with_replica("u1", "u2", "r1");
438 let cluster_rule = Rule::ClusterNameLookup {
439 cluster_id_label: "cluster_id".into(),
440 output_label: "cluster_name".into(),
441 };
442 let replica_rule = Rule::ReplicaNameLookup {
443 cluster_id_label: "cluster_id".into(),
444 replica_id_label: "replica_id".into(),
445 output_label: "replica_name".into(),
446 };
447 cluster_rule.apply(&mut family, &catalog);
448 replica_rule.apply(&mut family, &catalog);
449 assert_eq!(label_value(&family, "cluster_name"), Some("quickstart"));
450 assert_eq!(label_value(&family, "replica_name"), Some("r1"));
451 }
452}