1use std::collections::btree_map::Entry as BEntry;
5use std::collections::hash_map::Entry as HEntry;
6use std::collections::{BTreeMap, HashMap, HashSet};
7use std::sync::Arc;
8
9use parking_lot::RwLock;
10
11use crate::errors::{Error, Result};
12use crate::metrics::Collector;
13use crate::proto;
14
15use cfg_if::cfg_if;
16use lazy_static::lazy_static;
17
18#[derive(Default)]
19struct RegistryCore {
20 pub collectors_by_id: HashMap<u64, Box<dyn Collector>>,
21 pub dim_hashes_by_name: HashMap<String, u64>,
22 pub desc_ids: HashSet<u64>,
23 pub labels: Option<HashMap<String, String>>,
25 pub prefix: Option<String>,
27}
28
29impl std::fmt::Debug for RegistryCore {
30 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31 write!(
32 f,
33 "RegistryCore ({} collectors)",
34 self.collectors_by_id.keys().len()
35 )
36 }
37}
38
39impl RegistryCore {
40 fn register(&mut self, c: Box<dyn Collector>) -> Result<()> {
41 let mut desc_id_set = HashSet::new();
42 let mut collector_id: u64 = 0;
43
44 for desc in c.desc() {
45 if self.desc_ids.contains(&desc.id) {
48 return Err(Error::AlreadyReg);
49 }
50
51 if let Some(hash) = self.dim_hashes_by_name.get(&desc.fq_name) {
52 if *hash != desc.dim_hash {
53 return Err(Error::Msg(format!(
54 "a previously registered descriptor with the \
55 same fully-qualified name as {:?} has \
56 different label names or a different help \
57 string",
58 desc
59 )));
60 }
61 }
62
63 self.dim_hashes_by_name
64 .insert(desc.fq_name.clone(), desc.dim_hash);
65
66 if desc_id_set.insert(desc.id) {
69 collector_id = collector_id.wrapping_add(desc.id);
71 } else {
72 return Err(Error::Msg(format!(
76 "a duplicate descriptor within the same \
77 collector the same fully-qualified name: {:?}",
78 desc.fq_name
79 )));
80 }
81 }
82
83 match self.collectors_by_id.entry(collector_id) {
84 HEntry::Vacant(vc) => {
85 self.desc_ids.extend(desc_id_set);
86 vc.insert(c);
87 Ok(())
88 }
89 HEntry::Occupied(_) => Err(Error::AlreadyReg),
90 }
91 }
92
93 fn unregister(&mut self, c: Box<dyn Collector>) -> Result<()> {
94 let mut id_set = Vec::new();
95 let mut collector_id: u64 = 0;
96 for desc in c.desc() {
97 if !id_set.iter().any(|id| *id == desc.id) {
98 id_set.push(desc.id);
99 collector_id = collector_id.wrapping_add(desc.id);
100 }
101 }
102
103 if self.collectors_by_id.remove(&collector_id).is_none() {
104 return Err(Error::Msg(format!(
105 "collector {:?} is not registered",
106 c.desc()
107 )));
108 }
109
110 for id in id_set {
111 self.desc_ids.remove(&id);
112 }
113
114 Ok(())
117 }
118
119 fn gather(&self) -> Vec<proto::MetricFamily> {
120 let mut mf_by_name = BTreeMap::new();
121
122 for c in self.collectors_by_id.values() {
123 let mfs = c.collect();
124 for mut mf in mfs {
125 if mf.get_metric().is_empty() {
127 continue;
128 }
129
130 let name = mf.get_name().to_owned();
131 match mf_by_name.entry(name) {
132 BEntry::Vacant(entry) => {
133 entry.insert(mf);
134 }
135 BEntry::Occupied(mut entry) => {
136 let existent_mf = entry.get_mut();
137 let existent_metrics = existent_mf.mut_metric();
138
139 for metric in mf.take_metric().into_iter() {
142 existent_metrics.push(metric);
143 }
144 }
145 }
146 }
147 }
148
149 for mf in mf_by_name.values_mut() {
154 mf.mut_metric().sort_by(|m1, m2| {
155 let lps1 = m1.get_label();
156 let lps2 = m2.get_label();
157
158 if lps1.len() != lps2.len() {
159 return lps1.len().cmp(&lps2.len());
166 }
167
168 for (lp1, lp2) in lps1.iter().zip(lps2.iter()) {
169 if lp1.get_value() != lp2.get_value() {
170 return lp1.get_value().cmp(lp2.get_value());
171 }
172 }
173
174 m1.get_timestamp_ms().cmp(&m2.get_timestamp_ms())
181 });
182 }
183
184 mf_by_name
186 .into_values()
187 .map(|mut m| {
188 if let Some(ref namespace) = self.prefix {
190 let prefixed = format!("{}_{}", namespace, m.get_name());
191 m.set_name(prefixed);
192 }
193
194 if let Some(ref hmap) = self.labels {
196 let pairs: Vec<proto::LabelPair> = hmap
197 .iter()
198 .map(|(k, v)| {
199 let mut label = proto::LabelPair::default();
200 label.set_name(k.to_string());
201 label.set_value(v.to_string());
202 label
203 })
204 .collect();
205
206 for metric in m.mut_metric().iter_mut() {
207 let mut labels: Vec<_> = metric.take_label().into();
208 labels.append(&mut pairs.clone());
209 metric.set_label(labels.into());
210 }
211 }
212 m
213 })
214 .collect()
215 }
216}
217
218#[derive(Clone, Default, Debug)]
221pub struct Registry {
222 r: Arc<RwLock<RegistryCore>>,
223}
224
225impl Registry {
226 pub fn new() -> Registry {
228 Default::default()
229 }
230
231 pub fn new_custom(
233 prefix: Option<String>,
234 labels: Option<HashMap<String, String>>,
235 ) -> Result<Registry> {
236 if let Some(ref namespace) = prefix {
237 if namespace.is_empty() {
238 return Err(Error::Msg("empty prefix namespace".to_string()));
239 }
240 }
241
242 let reg = Registry::default();
243 {
244 let mut core = reg.r.write();
245 core.prefix = prefix;
246 core.labels = labels;
247 }
248 Ok(reg)
249 }
250
251 pub fn register(&self, c: Box<dyn Collector>) -> Result<()> {
261 self.r.write().register(c)
262 }
263
264 pub fn unregister(&self, c: Box<dyn Collector>) -> Result<()> {
269 self.r.write().unregister(c)
270 }
271
272 pub fn gather(&self) -> Vec<proto::MetricFamily> {
276 self.r.read().gather()
277 }
278}
279
280cfg_if! {
281 if #[cfg(all(feature = "process", target_os="linux"))] {
282 fn register_default_process_collector(reg: &Registry) -> Result<()> {
283 use crate::process_collector::ProcessCollector;
284
285 let pc = ProcessCollector::for_self();
286 reg.register(Box::new(pc))
287 }
288 } else {
289 fn register_default_process_collector(_: &Registry) -> Result<()> {
290 Ok(())
291 }
292 }
293}
294
295lazy_static! {
297 static ref DEFAULT_REGISTRY: Registry = {
298 let reg = Registry::default();
299
300 register_default_process_collector(®).unwrap();
302
303 reg
304 };
305}
306
307pub fn default_registry() -> &'static Registry {
309 lazy_static::initialize(&DEFAULT_REGISTRY);
310 &DEFAULT_REGISTRY
311}
312
313pub fn register(c: Box<dyn Collector>) -> Result<()> {
319 DEFAULT_REGISTRY.register(c)
320}
321
322pub fn unregister(c: Box<dyn Collector>) -> Result<()> {
327 DEFAULT_REGISTRY.unregister(c)
328}
329
330pub fn gather() -> Vec<proto::MetricFamily> {
332 DEFAULT_REGISTRY.gather()
333}
334
335#[cfg(test)]
336mod tests {
337 use std::collections::HashMap;
338 use std::thread;
339
340 use super::*;
341 use crate::counter::{Counter, CounterVec};
342 use crate::desc::Desc;
343 use crate::metrics::{Collector, Opts};
344 use crate::proto;
345
346 #[test]
347 fn test_registry() {
348 let r = Registry::new();
349
350 let counter = Counter::new("test", "test help").unwrap();
351 r.register(Box::new(counter.clone())).unwrap();
352 counter.inc();
353
354 let r1 = r.clone();
355 let handler = thread::spawn(move || {
356 let metric_families = r1.gather();
357 assert_eq!(metric_families.len(), 1);
358 });
359
360 assert!(handler.join().is_ok());
361
362 assert!(r.register(Box::new(counter.clone())).is_err());
363 assert!(r.unregister(Box::new(counter.clone())).is_ok());
364 assert!(r.unregister(Box::new(counter.clone())).is_err());
365 assert!(r.register(Box::new(counter.clone())).is_ok());
366
367 let counter_vec =
368 CounterVec::new(Opts::new("test_vec", "test vec help"), &["a", "b"]).unwrap();
369
370 r.register(Box::new(counter_vec.clone())).unwrap();
371 counter_vec.with_label_values(&["1", "2"]).inc();
372 }
373
374 #[test]
375 fn test_default_registry() {
376 let counter = Counter::new("test", "test help").unwrap();
377
378 assert!(register(Box::new(counter.clone())).is_ok());
379 assert_ne!(gather().len(), 0);
380 assert_ne!(default_registry().gather().len(), 0);
381 assert_eq!(gather().len(), default_registry().gather().len());
382
383 assert!(unregister(Box::new(counter.clone())).is_ok());
384 assert!(unregister(Box::new(counter.clone())).is_err());
385 assert!(default_registry()
386 .unregister(Box::new(counter.clone()))
387 .is_err());
388 assert!(register(Box::new(counter.clone())).is_ok());
389 }
390
391 #[test]
392 fn test_gather_order() {
393 let r = Registry::new();
394
395 let counter_a = Counter::new("test_a_counter", "test help").unwrap();
396 let counter_b = Counter::new("test_b_counter", "test help").unwrap();
397 let counter_2 = Counter::new("test_2_counter", "test help").unwrap();
398 r.register(Box::new(counter_b.clone())).unwrap();
399 r.register(Box::new(counter_2.clone())).unwrap();
400 r.register(Box::new(counter_a.clone())).unwrap();
401
402 let mfs = r.gather();
403 assert_eq!(mfs.len(), 3);
404 assert_eq!(mfs[0].get_name(), "test_2_counter");
405 assert_eq!(mfs[1].get_name(), "test_a_counter");
406 assert_eq!(mfs[2].get_name(), "test_b_counter");
407
408 let r = Registry::new();
409 let opts = Opts::new("test", "test help")
410 .const_label("a", "1")
411 .const_label("b", "2");
412 let counter_vec = CounterVec::new(opts, &["cc", "c1", "a2", "c0"]).unwrap();
413 r.register(Box::new(counter_vec.clone())).unwrap();
414
415 let mut map1 = HashMap::new();
416 map1.insert("cc", "12");
417 map1.insert("c1", "a1");
418 map1.insert("a2", "0");
419 map1.insert("c0", "hello");
420 counter_vec.with(&map1).inc();
421
422 let mut map2 = HashMap::new();
423 map2.insert("cc", "12");
424 map2.insert("c1", "0");
425 map2.insert("a2", "0");
426 map2.insert("c0", "hello");
427 counter_vec.with(&map2).inc();
428 counter_vec.with(&map2).inc();
429
430 let mut map3 = HashMap::new();
431 map3.insert("cc", "12");
432 map3.insert("c1", "0");
433 map3.insert("a2", "da");
434 map3.insert("c0", "hello");
435 counter_vec.with(&map3).inc();
436 counter_vec.with(&map3).inc();
437 counter_vec.with(&map3).inc();
438
439 let mut map4 = HashMap::new();
440 map4.insert("cc", "12");
441 map4.insert("c1", "0");
442 map4.insert("a2", "da");
443 map4.insert("c0", "你好");
444 counter_vec.with(&map4).inc();
445 counter_vec.with(&map4).inc();
446 counter_vec.with(&map4).inc();
447 counter_vec.with(&map4).inc();
448
449 let mfs = r.gather();
457 assert_eq!(mfs.len(), 1);
458 let ms = mfs[0].get_metric();
459 assert_eq!(ms.len(), 4);
460 assert_eq!(ms[0].get_counter().get_value() as u64, 2);
461 assert_eq!(ms[1].get_counter().get_value() as u64, 1);
462 assert_eq!(ms[2].get_counter().get_value() as u64, 3);
463 assert_eq!(ms[3].get_counter().get_value() as u64, 4);
464 }
465
466 #[test]
467 fn test_with_prefix_gather() {
468 assert!(Registry::new_custom(Some("".to_string()), None).is_err());
469
470 let r = Registry::new_custom(Some("common_prefix".to_string()), None).unwrap();
471 let counter_a = Counter::new("test_a_counter", "test help").unwrap();
472 r.register(Box::new(counter_a.clone())).unwrap();
473
474 let mfs = r.gather();
475 assert_eq!(mfs.len(), 1);
476 assert_eq!(mfs[0].get_name(), "common_prefix_test_a_counter");
477 }
478
479 #[test]
480 fn test_with_labels_gather() {
481 let mut labels = HashMap::new();
482 labels.insert("tkey".to_string(), "tvalue".to_string());
483
484 let r = Registry::new_custom(None, Some(labels)).unwrap();
485 let counter_a = Counter::new("test_a_counter", "test help").unwrap();
486 r.register(Box::new(counter_a.clone())).unwrap();
487 let counter_vec =
488 CounterVec::new(Opts::new("test_vec", "test vec help"), &["a", "b"]).unwrap();
489 r.register(Box::new(counter_vec.clone())).unwrap();
490
491 counter_vec.with_label_values(&["one", "two"]).inc();
492 counter_vec.with_label_values(&["three", "four"]).inc();
493
494 let mfs = r.gather();
495 assert_eq!(mfs.len(), 2);
496 assert_eq!(mfs[0].get_name(), "test_a_counter");
497 assert_eq!(mfs[1].get_name(), "test_vec");
498
499 let mut needle = proto::LabelPair::default();
500 needle.set_name("tkey".to_string());
501 needle.set_value("tvalue".to_string());
502 let metrics = mfs[0].get_metric();
503 for m in metrics {
504 assert!(m.get_label().contains(&needle));
505 }
506 let metrics = mfs[1].get_metric();
507 for m in metrics {
508 assert!(m.get_label().contains(&needle));
509 }
510 }
511
512 struct MultipleCollector {
513 descs: Vec<Desc>,
514 counters: Vec<Counter>,
515 }
516
517 impl Collector for MultipleCollector {
518 fn desc(&self) -> Vec<&Desc> {
519 self.descs.iter().collect()
520 }
521
522 fn collect(&self) -> Vec<proto::MetricFamily> {
523 self.counters
524 .iter()
525 .inspect(|c| c.inc())
526 .map(|c| c.collect())
527 .fold(Vec::new(), |mut acc, mfs| {
528 acc.extend(mfs);
529 acc
530 })
531 }
532 }
533
534 #[test]
535 fn test_register_multiplecollector() {
536 let counters = vec![
537 Counter::new("c1", "c1 is a counter").unwrap(),
538 Counter::new("c2", "c2 is a counter").unwrap(),
539 ];
540
541 let descs = counters.iter().map(|c| c.desc().into_iter().cloned()).fold(
542 Vec::new(),
543 |mut acc, ds| {
544 acc.extend(ds);
545 acc
546 },
547 );
548
549 let mc = MultipleCollector { descs, counters };
550
551 let r = Registry::new();
552 r.register(Box::new(mc)).unwrap();
553 }
554
555 #[test]
556 fn test_prune_empty_metric_family() {
557 let counter_vec =
558 CounterVec::new(Opts::new("test_vec", "test vec help"), &["a", "b"]).unwrap();
559 let r = Registry::new();
560 r.register(Box::new(counter_vec.clone())).unwrap();
561 assert!(r.gather().is_empty());
562 counter_vec.with_label_values(&["1", "2"]).inc();
563 assert!(!r.gather().is_empty());
564 }
565}