prometheus/
registry.rs

1// Copyright 2014 The Prometheus Authors
2// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
3
4use std::collections::btree_map::Entry as BEntry;
5use std::collections::hash_map::Entry as HEntry;
6use std::collections::{BTreeMap, HashMap, HashSet};
7use std::sync::Arc;
8
9use parking_lot::RwLock;
10
11use crate::errors::{Error, Result};
12use crate::metrics::Collector;
13use crate::proto;
14
15use cfg_if::cfg_if;
16use lazy_static::lazy_static;
17
18#[derive(Default)]
19struct RegistryCore {
20    pub collectors_by_id: HashMap<u64, Box<dyn Collector>>,
21    pub dim_hashes_by_name: HashMap<String, u64>,
22    pub desc_ids: HashSet<u64>,
23    /// Optional common labels for all registered collectors.
24    pub labels: Option<HashMap<String, String>>,
25    /// Optional common namespace for all registered collectors.
26    pub prefix: Option<String>,
27}
28
29impl std::fmt::Debug for RegistryCore {
30    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31        write!(
32            f,
33            "RegistryCore ({} collectors)",
34            self.collectors_by_id.keys().len()
35        )
36    }
37}
38
39impl RegistryCore {
40    fn register(&mut self, c: Box<dyn Collector>) -> Result<()> {
41        let mut desc_id_set = HashSet::new();
42        let mut collector_id: u64 = 0;
43
44        for desc in c.desc() {
45            // Is the desc_id unique?
46            // (In other words: Is the fqName + constLabel combination unique?)
47            if self.desc_ids.contains(&desc.id) {
48                return Err(Error::AlreadyReg);
49            }
50
51            if let Some(hash) = self.dim_hashes_by_name.get(&desc.fq_name) {
52                if *hash != desc.dim_hash {
53                    return Err(Error::Msg(format!(
54                        "a previously registered descriptor with the \
55                         same fully-qualified name as {:?} has \
56                         different label names or a different help \
57                         string",
58                        desc
59                    )));
60                }
61            }
62
63            self.dim_hashes_by_name
64                .insert(desc.fq_name.clone(), desc.dim_hash);
65
66            // If it is not a duplicate desc in this collector, add it to
67            // the collector_id.
68            if desc_id_set.insert(desc.id) {
69                // The set did not have this value present, true is returned.
70                collector_id = collector_id.wrapping_add(desc.id);
71            } else {
72                // The set did have this value present, false is returned.
73                //
74                // TODO: Should we allow duplicate descs within the same collector?
75                return Err(Error::Msg(format!(
76                    "a duplicate descriptor within the same \
77                     collector the same fully-qualified name: {:?}",
78                    desc.fq_name
79                )));
80            }
81        }
82
83        match self.collectors_by_id.entry(collector_id) {
84            HEntry::Vacant(vc) => {
85                self.desc_ids.extend(desc_id_set);
86                vc.insert(c);
87                Ok(())
88            }
89            HEntry::Occupied(_) => Err(Error::AlreadyReg),
90        }
91    }
92
93    fn unregister(&mut self, c: Box<dyn Collector>) -> Result<()> {
94        let mut id_set = Vec::new();
95        let mut collector_id: u64 = 0;
96        for desc in c.desc() {
97            if !id_set.iter().any(|id| *id == desc.id) {
98                id_set.push(desc.id);
99                collector_id = collector_id.wrapping_add(desc.id);
100            }
101        }
102
103        if self.collectors_by_id.remove(&collector_id).is_none() {
104            return Err(Error::Msg(format!(
105                "collector {:?} is not registered",
106                c.desc()
107            )));
108        }
109
110        for id in id_set {
111            self.desc_ids.remove(&id);
112        }
113
114        // dim_hashes_by_name is left untouched as those must be consistent
115        // throughout the lifetime of a program.
116        Ok(())
117    }
118
119    fn gather(&self) -> Vec<proto::MetricFamily> {
120        let mut mf_by_name = BTreeMap::new();
121
122        for c in self.collectors_by_id.values() {
123            let mfs = c.collect();
124            for mut mf in mfs {
125                // Prune empty MetricFamilies.
126                if mf.get_metric().is_empty() {
127                    continue;
128                }
129
130                let name = mf.get_name().to_owned();
131                match mf_by_name.entry(name) {
132                    BEntry::Vacant(entry) => {
133                        entry.insert(mf);
134                    }
135                    BEntry::Occupied(mut entry) => {
136                        let existent_mf = entry.get_mut();
137                        let existent_metrics = existent_mf.mut_metric();
138
139                        // TODO: check type.
140                        // TODO: check consistency.
141                        for metric in mf.take_metric().into_iter() {
142                            existent_metrics.push(metric);
143                        }
144                    }
145                }
146            }
147        }
148
149        // TODO: metric_family injection hook.
150
151        // Now that MetricFamilies are all set, sort their Metrics
152        // lexicographically by their label values.
153        for mf in mf_by_name.values_mut() {
154            mf.mut_metric().sort_by(|m1, m2| {
155                let lps1 = m1.get_label();
156                let lps2 = m2.get_label();
157
158                if lps1.len() != lps2.len() {
159                    // This should not happen. The metrics are
160                    // inconsistent. However, we have to deal with the fact, as
161                    // people might use custom collectors or metric family injection
162                    // to create inconsistent metrics. So let's simply compare the
163                    // number of labels in this case. That will still yield
164                    // reproducible sorting.
165                    return lps1.len().cmp(&lps2.len());
166                }
167
168                for (lp1, lp2) in lps1.iter().zip(lps2.iter()) {
169                    if lp1.get_value() != lp2.get_value() {
170                        return lp1.get_value().cmp(lp2.get_value());
171                    }
172                }
173
174                // We should never arrive here. Multiple metrics with the same
175                // label set in the same scrape will lead to undefined ingestion
176                // behavior. However, as above, we have to provide stable sorting
177                // here, even for inconsistent metrics. So sort equal metrics
178                // by their timestamp, with missing timestamps (implying "now")
179                // coming last.
180                m1.get_timestamp_ms().cmp(&m2.get_timestamp_ms())
181            });
182        }
183
184        // Write out MetricFamilies sorted by their name.
185        mf_by_name
186            .into_values()
187            .map(|mut m| {
188                // Add registry namespace prefix, if any.
189                if let Some(ref namespace) = self.prefix {
190                    let prefixed = format!("{}_{}", namespace, m.get_name());
191                    m.set_name(prefixed);
192                }
193
194                // Add registry common labels, if any.
195                if let Some(ref hmap) = self.labels {
196                    let pairs: Vec<proto::LabelPair> = hmap
197                        .iter()
198                        .map(|(k, v)| {
199                            let mut label = proto::LabelPair::default();
200                            label.set_name(k.to_string());
201                            label.set_value(v.to_string());
202                            label
203                        })
204                        .collect();
205
206                    for metric in m.mut_metric().iter_mut() {
207                        let mut labels: Vec<_> = metric.take_label().into();
208                        labels.append(&mut pairs.clone());
209                        metric.set_label(labels.into());
210                    }
211                }
212                m
213            })
214            .collect()
215    }
216}
217
218/// A struct for registering Prometheus collectors, collecting their metrics, and gathering
219/// them into `MetricFamilies` for exposition.
220#[derive(Clone, Default, Debug)]
221pub struct Registry {
222    r: Arc<RwLock<RegistryCore>>,
223}
224
225impl Registry {
226    /// `new` creates a Registry.
227    pub fn new() -> Registry {
228        Default::default()
229    }
230
231    /// Create a new registry, with optional custom prefix and labels.
232    pub fn new_custom(
233        prefix: Option<String>,
234        labels: Option<HashMap<String, String>>,
235    ) -> Result<Registry> {
236        if let Some(ref namespace) = prefix {
237            if namespace.is_empty() {
238                return Err(Error::Msg("empty prefix namespace".to_string()));
239            }
240        }
241
242        let reg = Registry::default();
243        {
244            let mut core = reg.r.write();
245            core.prefix = prefix;
246            core.labels = labels;
247        }
248        Ok(reg)
249    }
250
251    /// `register` registers a new [`Collector`] to be included in metrics
252    /// collection. It returns an error if the descriptors provided by the
253    /// [`Collector`] are invalid or if they — in combination with descriptors of
254    /// already registered Collectors — do not fulfill the consistency and
255    /// uniqueness criteria described in the documentation of [`Desc`](crate::core::Desc).
256    ///
257    /// If the provided [`Collector`] is equal to a [`Collector`] already registered
258    /// (which includes the case of re-registering the same [`Collector`]), the
259    /// AlreadyReg error returns.
260    pub fn register(&self, c: Box<dyn Collector>) -> Result<()> {
261        self.r.write().register(c)
262    }
263
264    /// `unregister` unregisters the [`Collector`] that equals the [`Collector`] passed
265    /// in as an argument.  (Two Collectors are considered equal if their
266    /// Describe method yields the same set of descriptors.) The function
267    /// returns error when the [`Collector`] is not registered.
268    pub fn unregister(&self, c: Box<dyn Collector>) -> Result<()> {
269        self.r.write().unregister(c)
270    }
271
272    /// `gather` calls the Collect method of the registered Collectors and then
273    /// gathers the collected metrics into a lexicographically sorted slice
274    /// of MetricFamily protobufs.
275    pub fn gather(&self) -> Vec<proto::MetricFamily> {
276        self.r.read().gather()
277    }
278}
279
280cfg_if! {
281    if #[cfg(all(feature = "process", target_os="linux"))] {
282        fn register_default_process_collector(reg: &Registry) -> Result<()> {
283            use crate::process_collector::ProcessCollector;
284
285            let pc = ProcessCollector::for_self();
286            reg.register(Box::new(pc))
287        }
288    } else {
289        fn register_default_process_collector(_: &Registry) -> Result<()> {
290            Ok(())
291        }
292    }
293}
294
295// Default registry for rust-prometheus.
296lazy_static! {
297    static ref DEFAULT_REGISTRY: Registry = {
298        let reg = Registry::default();
299
300        // Register a default process collector.
301        register_default_process_collector(&reg).unwrap();
302
303        reg
304    };
305}
306
307/// Default registry (global static).
308pub fn default_registry() -> &'static Registry {
309    lazy_static::initialize(&DEFAULT_REGISTRY);
310    &DEFAULT_REGISTRY
311}
312
313/// Registers a new [`Collector`] to be included in metrics collection. It
314/// returns an error if the descriptors provided by the [`Collector`] are invalid or
315/// if they - in combination with descriptors of already registered Collectors -
316/// do not fulfill the consistency and uniqueness criteria described in the
317/// [`Desc`](crate::core::Desc) documentation.
318pub fn register(c: Box<dyn Collector>) -> Result<()> {
319    DEFAULT_REGISTRY.register(c)
320}
321
322/// Unregisters the [`Collector`] that equals the [`Collector`] passed in as
323/// an argument. (Two Collectors are considered equal if their Describe method
324/// yields the same set of descriptors.) The function returns an error if a
325/// [`Collector`] was not registered.
326pub fn unregister(c: Box<dyn Collector>) -> Result<()> {
327    DEFAULT_REGISTRY.unregister(c)
328}
329
330/// Return all `MetricFamily` of `DEFAULT_REGISTRY`.
331pub fn gather() -> Vec<proto::MetricFamily> {
332    DEFAULT_REGISTRY.gather()
333}
334
335#[cfg(test)]
336mod tests {
337    use std::collections::HashMap;
338    use std::thread;
339
340    use super::*;
341    use crate::counter::{Counter, CounterVec};
342    use crate::desc::Desc;
343    use crate::metrics::{Collector, Opts};
344    use crate::proto;
345
346    #[test]
347    fn test_registry() {
348        let r = Registry::new();
349
350        let counter = Counter::new("test", "test help").unwrap();
351        r.register(Box::new(counter.clone())).unwrap();
352        counter.inc();
353
354        let r1 = r.clone();
355        let handler = thread::spawn(move || {
356            let metric_families = r1.gather();
357            assert_eq!(metric_families.len(), 1);
358        });
359
360        assert!(handler.join().is_ok());
361
362        assert!(r.register(Box::new(counter.clone())).is_err());
363        assert!(r.unregister(Box::new(counter.clone())).is_ok());
364        assert!(r.unregister(Box::new(counter.clone())).is_err());
365        assert!(r.register(Box::new(counter.clone())).is_ok());
366
367        let counter_vec =
368            CounterVec::new(Opts::new("test_vec", "test vec help"), &["a", "b"]).unwrap();
369
370        r.register(Box::new(counter_vec.clone())).unwrap();
371        counter_vec.with_label_values(&["1", "2"]).inc();
372    }
373
374    #[test]
375    fn test_default_registry() {
376        let counter = Counter::new("test", "test help").unwrap();
377
378        assert!(register(Box::new(counter.clone())).is_ok());
379        assert_ne!(gather().len(), 0);
380        assert_ne!(default_registry().gather().len(), 0);
381        assert_eq!(gather().len(), default_registry().gather().len());
382
383        assert!(unregister(Box::new(counter.clone())).is_ok());
384        assert!(unregister(Box::new(counter.clone())).is_err());
385        assert!(default_registry()
386            .unregister(Box::new(counter.clone()))
387            .is_err());
388        assert!(register(Box::new(counter.clone())).is_ok());
389    }
390
391    #[test]
392    fn test_gather_order() {
393        let r = Registry::new();
394
395        let counter_a = Counter::new("test_a_counter", "test help").unwrap();
396        let counter_b = Counter::new("test_b_counter", "test help").unwrap();
397        let counter_2 = Counter::new("test_2_counter", "test help").unwrap();
398        r.register(Box::new(counter_b.clone())).unwrap();
399        r.register(Box::new(counter_2.clone())).unwrap();
400        r.register(Box::new(counter_a.clone())).unwrap();
401
402        let mfs = r.gather();
403        assert_eq!(mfs.len(), 3);
404        assert_eq!(mfs[0].get_name(), "test_2_counter");
405        assert_eq!(mfs[1].get_name(), "test_a_counter");
406        assert_eq!(mfs[2].get_name(), "test_b_counter");
407
408        let r = Registry::new();
409        let opts = Opts::new("test", "test help")
410            .const_label("a", "1")
411            .const_label("b", "2");
412        let counter_vec = CounterVec::new(opts, &["cc", "c1", "a2", "c0"]).unwrap();
413        r.register(Box::new(counter_vec.clone())).unwrap();
414
415        let mut map1 = HashMap::new();
416        map1.insert("cc", "12");
417        map1.insert("c1", "a1");
418        map1.insert("a2", "0");
419        map1.insert("c0", "hello");
420        counter_vec.with(&map1).inc();
421
422        let mut map2 = HashMap::new();
423        map2.insert("cc", "12");
424        map2.insert("c1", "0");
425        map2.insert("a2", "0");
426        map2.insert("c0", "hello");
427        counter_vec.with(&map2).inc();
428        counter_vec.with(&map2).inc();
429
430        let mut map3 = HashMap::new();
431        map3.insert("cc", "12");
432        map3.insert("c1", "0");
433        map3.insert("a2", "da");
434        map3.insert("c0", "hello");
435        counter_vec.with(&map3).inc();
436        counter_vec.with(&map3).inc();
437        counter_vec.with(&map3).inc();
438
439        let mut map4 = HashMap::new();
440        map4.insert("cc", "12");
441        map4.insert("c1", "0");
442        map4.insert("a2", "da");
443        map4.insert("c0", "你好");
444        counter_vec.with(&map4).inc();
445        counter_vec.with(&map4).inc();
446        counter_vec.with(&map4).inc();
447        counter_vec.with(&map4).inc();
448
449        // # HELP test test help
450        // # TYPE test counter
451        // test{a="1",a2="0",b="2",c0="hello",c1="0",cc="12"} 2
452        // test{a="1",a2="0",b="2",c0="hello",c1="a1",cc="12"} 1
453        // test{a="1",a2="da",b="2",c0="hello",c1="0",cc="12"} 3
454        // test{a="1",a2="da",b="2",c0="你好",c1="0",cc="12"} 4
455
456        let mfs = r.gather();
457        assert_eq!(mfs.len(), 1);
458        let ms = mfs[0].get_metric();
459        assert_eq!(ms.len(), 4);
460        assert_eq!(ms[0].get_counter().get_value() as u64, 2);
461        assert_eq!(ms[1].get_counter().get_value() as u64, 1);
462        assert_eq!(ms[2].get_counter().get_value() as u64, 3);
463        assert_eq!(ms[3].get_counter().get_value() as u64, 4);
464    }
465
466    #[test]
467    fn test_with_prefix_gather() {
468        assert!(Registry::new_custom(Some("".to_string()), None).is_err());
469
470        let r = Registry::new_custom(Some("common_prefix".to_string()), None).unwrap();
471        let counter_a = Counter::new("test_a_counter", "test help").unwrap();
472        r.register(Box::new(counter_a.clone())).unwrap();
473
474        let mfs = r.gather();
475        assert_eq!(mfs.len(), 1);
476        assert_eq!(mfs[0].get_name(), "common_prefix_test_a_counter");
477    }
478
479    #[test]
480    fn test_with_labels_gather() {
481        let mut labels = HashMap::new();
482        labels.insert("tkey".to_string(), "tvalue".to_string());
483
484        let r = Registry::new_custom(None, Some(labels)).unwrap();
485        let counter_a = Counter::new("test_a_counter", "test help").unwrap();
486        r.register(Box::new(counter_a.clone())).unwrap();
487        let counter_vec =
488            CounterVec::new(Opts::new("test_vec", "test vec help"), &["a", "b"]).unwrap();
489        r.register(Box::new(counter_vec.clone())).unwrap();
490
491        counter_vec.with_label_values(&["one", "two"]).inc();
492        counter_vec.with_label_values(&["three", "four"]).inc();
493
494        let mfs = r.gather();
495        assert_eq!(mfs.len(), 2);
496        assert_eq!(mfs[0].get_name(), "test_a_counter");
497        assert_eq!(mfs[1].get_name(), "test_vec");
498
499        let mut needle = proto::LabelPair::default();
500        needle.set_name("tkey".to_string());
501        needle.set_value("tvalue".to_string());
502        let metrics = mfs[0].get_metric();
503        for m in metrics {
504            assert!(m.get_label().contains(&needle));
505        }
506        let metrics = mfs[1].get_metric();
507        for m in metrics {
508            assert!(m.get_label().contains(&needle));
509        }
510    }
511
512    struct MultipleCollector {
513        descs: Vec<Desc>,
514        counters: Vec<Counter>,
515    }
516
517    impl Collector for MultipleCollector {
518        fn desc(&self) -> Vec<&Desc> {
519            self.descs.iter().collect()
520        }
521
522        fn collect(&self) -> Vec<proto::MetricFamily> {
523            self.counters
524                .iter()
525                .inspect(|c| c.inc())
526                .map(|c| c.collect())
527                .fold(Vec::new(), |mut acc, mfs| {
528                    acc.extend(mfs);
529                    acc
530                })
531        }
532    }
533
534    #[test]
535    fn test_register_multiplecollector() {
536        let counters = vec![
537            Counter::new("c1", "c1 is a counter").unwrap(),
538            Counter::new("c2", "c2 is a counter").unwrap(),
539        ];
540
541        let descs = counters.iter().map(|c| c.desc().into_iter().cloned()).fold(
542            Vec::new(),
543            |mut acc, ds| {
544                acc.extend(ds);
545                acc
546            },
547        );
548
549        let mc = MultipleCollector { descs, counters };
550
551        let r = Registry::new();
552        r.register(Box::new(mc)).unwrap();
553    }
554
555    #[test]
556    fn test_prune_empty_metric_family() {
557        let counter_vec =
558            CounterVec::new(Opts::new("test_vec", "test vec help"), &["a", "b"]).unwrap();
559        let r = Registry::new();
560        r.register(Box::new(counter_vec.clone())).unwrap();
561        assert!(r.gather().is_empty());
562        counter_vec.with_label_values(&["1", "2"]).inc();
563        assert!(!r.gather().is_empty());
564    }
565}