use std::collections::btree_map::Entry as BEntry;
use std::collections::hash_map::Entry as HEntry;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;
use parking_lot::RwLock;
use crate::errors::{Error, Result};
use crate::metrics::Collector;
use crate::proto;
use cfg_if::cfg_if;
use lazy_static::lazy_static;
#[derive(Default)]
struct RegistryCore {
pub collectors_by_id: HashMap<u64, Box<dyn Collector>>,
pub dim_hashes_by_name: HashMap<String, u64>,
pub desc_ids: HashSet<u64>,
pub labels: Option<HashMap<String, String>>,
pub prefix: Option<String>,
}
impl std::fmt::Debug for RegistryCore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"RegistryCore ({} collectors)",
self.collectors_by_id.keys().len()
)
}
}
impl RegistryCore {
fn register(&mut self, c: Box<dyn Collector>) -> Result<()> {
let mut desc_id_set = HashSet::new();
let mut collector_id: u64 = 0;
for desc in c.desc() {
if self.desc_ids.contains(&desc.id) {
return Err(Error::AlreadyReg);
}
if let Some(hash) = self.dim_hashes_by_name.get(&desc.fq_name) {
if *hash != desc.dim_hash {
return Err(Error::Msg(format!(
"a previously registered descriptor with the \
same fully-qualified name as {:?} has \
different label names or a different help \
string",
desc
)));
}
}
self.dim_hashes_by_name
.insert(desc.fq_name.clone(), desc.dim_hash);
if desc_id_set.insert(desc.id) {
collector_id = collector_id.wrapping_add(desc.id);
} else {
return Err(Error::Msg(format!(
"a duplicate descriptor within the same \
collector the same fully-qualified name: {:?}",
desc.fq_name
)));
}
}
match self.collectors_by_id.entry(collector_id) {
HEntry::Vacant(vc) => {
self.desc_ids.extend(desc_id_set);
vc.insert(c);
Ok(())
}
HEntry::Occupied(_) => Err(Error::AlreadyReg),
}
}
fn unregister(&mut self, c: Box<dyn Collector>) -> Result<()> {
let mut id_set = Vec::new();
let mut collector_id: u64 = 0;
for desc in c.desc() {
if !id_set.iter().any(|id| *id == desc.id) {
id_set.push(desc.id);
collector_id = collector_id.wrapping_add(desc.id);
}
}
if self.collectors_by_id.remove(&collector_id).is_none() {
return Err(Error::Msg(format!(
"collector {:?} is not registered",
c.desc()
)));
}
for id in id_set {
self.desc_ids.remove(&id);
}
Ok(())
}
fn gather(&self) -> Vec<proto::MetricFamily> {
let mut mf_by_name = BTreeMap::new();
for c in self.collectors_by_id.values() {
let mfs = c.collect();
for mut mf in mfs {
if mf.get_metric().is_empty() {
continue;
}
let name = mf.get_name().to_owned();
match mf_by_name.entry(name) {
BEntry::Vacant(entry) => {
entry.insert(mf);
}
BEntry::Occupied(mut entry) => {
let existent_mf = entry.get_mut();
let existent_metrics = existent_mf.mut_metric();
for metric in mf.take_metric().into_iter() {
existent_metrics.push(metric);
}
}
}
}
}
for mf in mf_by_name.values_mut() {
mf.mut_metric().sort_by(|m1, m2| {
let lps1 = m1.get_label();
let lps2 = m2.get_label();
if lps1.len() != lps2.len() {
return lps1.len().cmp(&lps2.len());
}
for (lp1, lp2) in lps1.iter().zip(lps2.iter()) {
if lp1.get_value() != lp2.get_value() {
return lp1.get_value().cmp(lp2.get_value());
}
}
m1.get_timestamp_ms().cmp(&m2.get_timestamp_ms())
});
}
mf_by_name
.into_iter()
.map(|(_, mut m)| {
if let Some(ref namespace) = self.prefix {
let prefixed = format!("{}_{}", namespace, m.get_name());
m.set_name(prefixed);
}
if let Some(ref hmap) = self.labels {
let pairs: Vec<proto::LabelPair> = hmap
.iter()
.map(|(k, v)| {
let mut label = proto::LabelPair::default();
label.set_name(k.to_string());
label.set_value(v.to_string());
label
})
.collect();
for metric in m.mut_metric().iter_mut() {
let mut labels: Vec<_> = metric.take_label().into();
labels.append(&mut pairs.clone());
metric.set_label(labels.into());
}
}
m
})
.collect()
}
}
#[derive(Clone, Default, Debug)]
pub struct Registry {
r: Arc<RwLock<RegistryCore>>,
}
impl Registry {
pub fn new() -> Registry {
Default::default()
}
pub fn new_custom(
prefix: Option<String>,
labels: Option<HashMap<String, String>>,
) -> Result<Registry> {
if let Some(ref namespace) = prefix {
if namespace.is_empty() {
return Err(Error::Msg("empty prefix namespace".to_string()));
}
}
let reg = Registry::default();
{
let mut core = reg.r.write();
core.prefix = prefix;
core.labels = labels;
}
Ok(reg)
}
pub fn register(&self, c: Box<dyn Collector>) -> Result<()> {
self.r.write().register(c)
}
pub fn unregister(&self, c: Box<dyn Collector>) -> Result<()> {
self.r.write().unregister(c)
}
pub fn gather(&self) -> Vec<proto::MetricFamily> {
self.r.read().gather()
}
}
cfg_if! {
if #[cfg(all(feature = "process", target_os="linux"))] {
fn register_default_process_collector(reg: &Registry) -> Result<()> {
use crate::process_collector::ProcessCollector;
let pc = ProcessCollector::for_self();
reg.register(Box::new(pc))
}
} else {
fn register_default_process_collector(_: &Registry) -> Result<()> {
Ok(())
}
}
}
lazy_static! {
static ref DEFAULT_REGISTRY: Registry = {
let reg = Registry::default();
register_default_process_collector(®).unwrap();
reg
};
}
pub fn default_registry() -> &'static Registry {
lazy_static::initialize(&DEFAULT_REGISTRY);
&DEFAULT_REGISTRY
}
pub fn register(c: Box<dyn Collector>) -> Result<()> {
DEFAULT_REGISTRY.register(c)
}
pub fn unregister(c: Box<dyn Collector>) -> Result<()> {
DEFAULT_REGISTRY.unregister(c)
}
pub fn gather() -> Vec<proto::MetricFamily> {
DEFAULT_REGISTRY.gather()
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::thread;
use super::*;
use crate::counter::{Counter, CounterVec};
use crate::desc::Desc;
use crate::metrics::{Collector, Opts};
use crate::proto;
#[test]
fn test_registry() {
let r = Registry::new();
let counter = Counter::new("test", "test help").unwrap();
r.register(Box::new(counter.clone())).unwrap();
counter.inc();
let r1 = r.clone();
let handler = thread::spawn(move || {
let metric_families = r1.gather();
assert_eq!(metric_families.len(), 1);
});
assert!(handler.join().is_ok());
assert!(r.register(Box::new(counter.clone())).is_err());
assert!(r.unregister(Box::new(counter.clone())).is_ok());
assert!(r.unregister(Box::new(counter.clone())).is_err());
assert!(r.register(Box::new(counter.clone())).is_ok());
let counter_vec =
CounterVec::new(Opts::new("test_vec", "test vec help"), &["a", "b"]).unwrap();
r.register(Box::new(counter_vec.clone())).unwrap();
counter_vec.with_label_values(&["1", "2"]).inc();
}
#[test]
fn test_default_registry() {
let counter = Counter::new("test", "test help").unwrap();
assert!(register(Box::new(counter.clone())).is_ok());
assert_ne!(gather().len(), 0);
assert_ne!(default_registry().gather().len(), 0);
assert_eq!(gather().len(), default_registry().gather().len());
assert!(unregister(Box::new(counter.clone())).is_ok());
assert!(unregister(Box::new(counter.clone())).is_err());
assert!(default_registry()
.unregister(Box::new(counter.clone()))
.is_err());
assert!(register(Box::new(counter.clone())).is_ok());
}
#[test]
fn test_gather_order() {
let r = Registry::new();
let counter_a = Counter::new("test_a_counter", "test help").unwrap();
let counter_b = Counter::new("test_b_counter", "test help").unwrap();
let counter_2 = Counter::new("test_2_counter", "test help").unwrap();
r.register(Box::new(counter_b.clone())).unwrap();
r.register(Box::new(counter_2.clone())).unwrap();
r.register(Box::new(counter_a.clone())).unwrap();
let mfs = r.gather();
assert_eq!(mfs.len(), 3);
assert_eq!(mfs[0].get_name(), "test_2_counter");
assert_eq!(mfs[1].get_name(), "test_a_counter");
assert_eq!(mfs[2].get_name(), "test_b_counter");
let r = Registry::new();
let opts = Opts::new("test", "test help")
.const_label("a", "1")
.const_label("b", "2");
let counter_vec = CounterVec::new(opts, &["cc", "c1", "a2", "c0"]).unwrap();
r.register(Box::new(counter_vec.clone())).unwrap();
let mut map1 = HashMap::new();
map1.insert("cc", "12");
map1.insert("c1", "a1");
map1.insert("a2", "0");
map1.insert("c0", "hello");
counter_vec.with(&map1).inc();
let mut map2 = HashMap::new();
map2.insert("cc", "12");
map2.insert("c1", "0");
map2.insert("a2", "0");
map2.insert("c0", "hello");
counter_vec.with(&map2).inc();
counter_vec.with(&map2).inc();
let mut map3 = HashMap::new();
map3.insert("cc", "12");
map3.insert("c1", "0");
map3.insert("a2", "da");
map3.insert("c0", "hello");
counter_vec.with(&map3).inc();
counter_vec.with(&map3).inc();
counter_vec.with(&map3).inc();
let mut map4 = HashMap::new();
map4.insert("cc", "12");
map4.insert("c1", "0");
map4.insert("a2", "da");
map4.insert("c0", "你好");
counter_vec.with(&map4).inc();
counter_vec.with(&map4).inc();
counter_vec.with(&map4).inc();
counter_vec.with(&map4).inc();
let mfs = r.gather();
assert_eq!(mfs.len(), 1);
let ms = mfs[0].get_metric();
assert_eq!(ms.len(), 4);
assert_eq!(ms[0].get_counter().get_value() as u64, 2);
assert_eq!(ms[1].get_counter().get_value() as u64, 1);
assert_eq!(ms[2].get_counter().get_value() as u64, 3);
assert_eq!(ms[3].get_counter().get_value() as u64, 4);
}
#[test]
fn test_with_prefix_gather() {
assert!(Registry::new_custom(Some("".to_string()), None).is_err());
let r = Registry::new_custom(Some("common_prefix".to_string()), None).unwrap();
let counter_a = Counter::new("test_a_counter", "test help").unwrap();
r.register(Box::new(counter_a.clone())).unwrap();
let mfs = r.gather();
assert_eq!(mfs.len(), 1);
assert_eq!(mfs[0].get_name(), "common_prefix_test_a_counter");
}
#[test]
fn test_with_labels_gather() {
let mut labels = HashMap::new();
labels.insert("tkey".to_string(), "tvalue".to_string());
let r = Registry::new_custom(None, Some(labels)).unwrap();
let counter_a = Counter::new("test_a_counter", "test help").unwrap();
r.register(Box::new(counter_a.clone())).unwrap();
let counter_vec =
CounterVec::new(Opts::new("test_vec", "test vec help"), &["a", "b"]).unwrap();
r.register(Box::new(counter_vec.clone())).unwrap();
counter_vec.with_label_values(&["one", "two"]).inc();
counter_vec.with_label_values(&["three", "four"]).inc();
let mfs = r.gather();
assert_eq!(mfs.len(), 2);
assert_eq!(mfs[0].get_name(), "test_a_counter");
assert_eq!(mfs[1].get_name(), "test_vec");
let mut needle = proto::LabelPair::default();
needle.set_name("tkey".to_string());
needle.set_value("tvalue".to_string());
let metrics = mfs[0].get_metric();
for m in metrics {
assert!(m.get_label().contains(&needle));
}
let metrics = mfs[1].get_metric();
for m in metrics {
assert!(m.get_label().contains(&needle));
}
}
struct MultipleCollector {
descs: Vec<Desc>,
counters: Vec<Counter>,
}
impl Collector for MultipleCollector {
fn desc(&self) -> Vec<&Desc> {
self.descs.iter().collect()
}
fn collect(&self) -> Vec<proto::MetricFamily> {
self.counters
.iter()
.inspect(|c| c.inc())
.map(|c| c.collect())
.fold(Vec::new(), |mut acc, mfs| {
acc.extend(mfs);
acc
})
}
}
#[test]
fn test_register_multiplecollector() {
let counters = vec![
Counter::new("c1", "c1 is a counter").unwrap(),
Counter::new("c2", "c2 is a counter").unwrap(),
];
let descs = counters.iter().map(|c| c.desc().into_iter().cloned()).fold(
Vec::new(),
|mut acc, ds| {
acc.extend(ds);
acc
},
);
let mc = MultipleCollector { descs, counters };
let r = Registry::new();
r.register(Box::new(mc)).unwrap();
}
#[test]
fn test_prune_empty_metric_family() {
let counter_vec =
CounterVec::new(Opts::new("test_vec", "test vec help"), &["a", "b"]).unwrap();
let r = Registry::new();
r.register(Box::new(counter_vec.clone())).unwrap();
assert!(r.gather().is_empty());
counter_vec.with_label_values(&["1", "2"]).inc();
assert!(!r.gather().is_empty());
}
}