1#![warn(missing_docs, missing_debug_implementations)]
19
20use std::time::Duration;
21
22use mz_dyncfg::{ConfigSet, ConfigUpdates};
23use mz_ore::metrics::MetricsRegistry;
24use tokio::time::Interval;
25
26pub use dyncfgs::all_dyncfgs;
27
28mod dyncfgs;
29pub mod lgalloc;
30pub mod rusage;
31
32#[derive(Debug)]
34pub struct Metrics {
35 config_set: ConfigSet,
36 lgalloc: MetricsTask,
37 lgalloc_map: MetricsTask,
38 rusage: MetricsTask,
39}
40
41static METRICS: std::sync::Mutex<Option<Metrics>> = std::sync::Mutex::new(None);
42
43#[allow(clippy::unused_async)]
51pub async fn register_metrics_into(metrics_registry: &MetricsRegistry, config_set: ConfigSet) {
52 let update_duration_metric = metrics_registry.register(mz_ore::metric!(
53 name: "mz_metrics_update_duration",
54 help: "The time it took to update lgalloc stats",
55 var_labels: ["name"],
56 buckets: mz_ore::stats::histogram_seconds_buckets(0.000_500, 32.),
57 ));
58
59 let lgalloc = Metrics::new_metrics_task(
60 metrics_registry,
61 lgalloc::register_metrics_into,
62 dyncfgs::MZ_METRICS_LGALLOC_REFRESH_INTERVAL,
63 &update_duration_metric,
64 );
65 let lgalloc_map = Metrics::new_metrics_task(
66 metrics_registry,
67 lgalloc::register_map_metrics_into,
68 dyncfgs::MZ_METRICS_LGALLOC_MAP_REFRESH_INTERVAL,
69 &update_duration_metric,
70 );
71 let rusage = Metrics::new_metrics_task(
72 metrics_registry,
73 rusage::register_metrics_into,
74 dyncfgs::MZ_METRICS_RUSAGE_REFRESH_INTERVAL,
75 &update_duration_metric,
76 );
77
78 *METRICS.lock().expect("lock poisoned") = Some(Metrics {
79 lgalloc,
80 lgalloc_map,
81 rusage,
82 config_set,
83 });
84}
85
86pub fn describe_metrics() -> Vec<(String, String, Vec<String>, &'static str)> {
96 let registry = MetricsRegistry::new();
97
98 let tag = |descs: Vec<(String, String, Vec<String>)>, src: &'static str| {
99 descs
100 .into_iter()
101 .map(move |(name, help, labels)| (name, help, labels, src))
102 };
103
104 let mut out = Vec::new();
105 out.extend(tag(
106 lgalloc::register_metrics_into(®istry).descs(),
107 lgalloc::SOURCE,
108 ));
109 out.extend(tag(
110 lgalloc::register_map_metrics_into(®istry).descs(),
111 lgalloc::SOURCE,
112 ));
113 out.extend(tag(
114 rusage::register_metrics_into(®istry).descs(),
115 rusage::SOURCE,
116 ));
117 out
118}
119
120pub(crate) fn desc_labels(desc: &prometheus::core::Desc) -> Vec<String> {
122 let mut labels: Vec<String> = desc
123 .variable_labels
124 .iter()
125 .cloned()
126 .chain(desc.const_label_pairs.iter().map(|p| p.name().to_owned()))
127 .collect();
128 labels.sort();
129 labels.dedup();
130 labels
131}
132
133pub fn update_dyncfg(config_updates: &ConfigUpdates) {
135 if let Some(metrics) = METRICS.lock().expect("lock poisoned").as_mut() {
136 metrics.apply_dyncfg_updates(config_updates);
137 }
138}
139
140impl Metrics {
141 pub fn apply_dyncfg_updates(&mut self, config_updates: &ConfigUpdates) {
143 config_updates.apply(&self.config_set);
145 self.lgalloc.update_dyncfg(&self.config_set);
147 self.lgalloc_map.update_dyncfg(&self.config_set);
148 self.rusage.update_dyncfg(&self.config_set);
149 }
150
151 fn new_metrics_task<T: MetricsUpdate>(
152 metrics_registry: &MetricsRegistry,
153 constructor: impl FnOnce(&MetricsRegistry) -> T,
154 interval_config: mz_dyncfg::Config<Duration>,
155 update_duration_metric: &mz_ore::metrics::HistogramVec,
156 ) -> MetricsTask {
157 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
158
159 let mut interval: Option<Interval> = None;
161
162 let update_duration_metric =
163 update_duration_metric.get_delete_on_drop_metric(&[T::NAME][..]);
164
165 let mut metrics = constructor(metrics_registry);
166
167 let mut update_metrics = move || {
168 tracing::debug!(metrics = T::NAME, "updating metrics");
169 let start = std::time::Instant::now();
170 if let Err(err) = metrics.update() {
171 tracing::error!(metrics = T::NAME, ?err, "metrics update failed");
172 }
173 let elapsed = start.elapsed();
174 update_duration_metric.observe(elapsed.as_secs_f64());
175 };
176
177 let update_interval = |new_interval, interval: &mut Option<Interval>| {
178 if new_interval == Duration::ZERO {
180 *interval = None;
181 return;
182 }
183 if Some(new_interval) == interval.as_ref().map(Interval::period) {
185 return;
186 }
187 tracing::debug!(metrics = T::NAME, ?new_interval, "updating interval");
188 let mut new_interval = tokio::time::interval(new_interval);
189 new_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
190 *interval = Some(new_interval);
191 };
192
193 mz_ore::task::spawn(|| format!("mz_metrics_update({})", T::NAME), async move {
194 loop {
195 tokio::select! {
196 _ = async {
197 interval.as_mut().unwrap().tick().await
198 }, if interval.is_some() => {
199 update_metrics()
200 }
201 new_interval = rx.recv() => match new_interval {
202 Some(new_interval) => update_interval(new_interval, &mut interval),
203 None => break,
204 }
205 }
206 }
207 });
208
209 MetricsTask {
210 tx,
211 interval_config,
212 }
213 }
214}
215
216pub trait MetricsUpdate: Send + Sync + 'static {
218 type Error: std::fmt::Debug;
220 const NAME: &'static str;
222 fn update(&mut self) -> Result<(), Self::Error>;
224}
225
226#[derive(Debug)]
227struct MetricsTask {
228 interval_config: mz_dyncfg::Config<Duration>,
229 tx: tokio::sync::mpsc::UnboundedSender<Duration>,
230}
231
232impl MetricsTask {
233 pub(crate) fn update_dyncfg(&self, config_set: &ConfigSet) {
234 self.tx
235 .send(self.interval_config.get(config_set))
236 .expect("Receiver exists");
237 }
238}