mz_metrics/
lib.rs
1#![warn(missing_docs, missing_debug_implementations)]
19
20use std::time::Duration;
21
22use mz_dyncfg::{ConfigSet, ConfigUpdates};
23use mz_ore::metrics::MetricsRegistry;
24use tokio::time::Interval;
25
26pub use dyncfgs::all_dyncfgs;
27
28mod dyncfgs;
29pub mod lgalloc;
30pub mod rusage;
31
32#[derive(Debug)]
34pub struct Metrics {
35 config_set: ConfigSet,
36 lgalloc: MetricsTask,
37 rusage: MetricsTask,
38}
39
40static METRICS: std::sync::Mutex<Option<Metrics>> = std::sync::Mutex::new(None);
41
42#[allow(clippy::unused_async)]
50pub async fn register_metrics_into(metrics_registry: &MetricsRegistry, config_set: ConfigSet) {
51 let update_duration_metric = metrics_registry.register(mz_ore::metric!(
52 name: "mz_metrics_update_duration",
53 help: "The time it took to update lgalloc stats",
54 var_labels: ["name"],
55 buckets: mz_ore::stats::histogram_seconds_buckets(0.000_500, 32.),
56 ));
57
58 let lgalloc = Metrics::new_metrics_task(
59 metrics_registry,
60 lgalloc::register_metrics_into,
61 dyncfgs::MZ_METRICS_LGALLOC_REFRESH_INTERVAL,
62 &update_duration_metric,
63 );
64 let rusage = Metrics::new_metrics_task(
65 metrics_registry,
66 rusage::register_metrics_into,
67 dyncfgs::MZ_METRICS_RUSAGE_REFRESH_INTERVAL,
68 &update_duration_metric,
69 );
70
71 *METRICS.lock().expect("lock poisoned") = Some(Metrics {
72 lgalloc,
73 rusage,
74 config_set,
75 });
76}
77
78pub fn update_dyncfg(config_updates: &ConfigUpdates) {
80 if let Some(metrics) = METRICS.lock().expect("lock poisoned").as_mut() {
81 metrics.apply_dyncfg_updates(config_updates);
82 }
83}
84
85impl Metrics {
86 pub fn apply_dyncfg_updates(&mut self, config_updates: &ConfigUpdates) {
88 config_updates.apply(&self.config_set);
90 self.lgalloc.update_dyncfg(&self.config_set);
92 self.rusage.update_dyncfg(&self.config_set);
93 }
94
95 fn new_metrics_task<T: MetricsUpdate>(
96 metrics_registry: &MetricsRegistry,
97 constructor: impl FnOnce(&MetricsRegistry) -> T,
98 interval_config: mz_dyncfg::Config<Duration>,
99 update_duration_metric: &mz_ore::metrics::HistogramVec,
100 ) -> MetricsTask {
101 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
102
103 let mut interval: Option<Interval> = None;
105
106 let update_duration_metric =
107 update_duration_metric.get_delete_on_drop_metric(&[T::NAME][..]);
108
109 let mut metrics = constructor(metrics_registry);
110
111 let mut update_metrics = move || {
112 tracing::debug!(metrics = T::NAME, "updating metrics");
113 let start = std::time::Instant::now();
114 if let Err(err) = metrics.update() {
115 tracing::error!(metrics = T::NAME, ?err, "metrics update failed");
116 }
117 let elapsed = start.elapsed();
118 update_duration_metric.observe(elapsed.as_secs_f64());
119 };
120
121 let update_interval = |new_interval, interval: &mut Option<Interval>| {
122 if new_interval == Duration::ZERO {
124 *interval = None;
125 return;
126 }
127 if Some(new_interval) == interval.as_ref().map(Interval::period) {
129 return;
130 }
131 tracing::debug!(metrics = T::NAME, ?new_interval, "updating interval");
132 let mut new_interval = tokio::time::interval(new_interval);
133 new_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
134 *interval = Some(new_interval);
135 };
136
137 mz_ore::task::spawn(|| format!("mz_metrics_update({})", T::NAME), async move {
138 loop {
139 tokio::select! {
140 _ = async { interval.as_mut().unwrap().tick().await }, if interval.is_some() => {
141 update_metrics()
142 }
143 new_interval = rx.recv() => match new_interval {
144 Some(new_interval) => update_interval(new_interval, &mut interval),
145 None => break,
146 }
147 }
148 }
149 });
150
151 MetricsTask {
152 tx,
153 interval_config,
154 }
155 }
156}
157
158pub trait MetricsUpdate: Send + Sync + 'static {
160 type Error: std::fmt::Debug;
162 const NAME: &'static str;
164 fn update(&mut self) -> Result<(), Self::Error>;
166}
167
168#[derive(Debug)]
169struct MetricsTask {
170 interval_config: mz_dyncfg::Config<Duration>,
171 tx: tokio::sync::mpsc::UnboundedSender<Duration>,
172}
173
174impl MetricsTask {
175 pub(crate) fn update_dyncfg(&self, config_set: &ConfigSet) {
176 self.tx
177 .send(self.interval_config.get(config_set))
178 .expect("Receiver exists");
179 }
180}