Skip to main content

mz_metrics/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Internal metrics libraries for Materialize.
17
18#![warn(missing_docs, missing_debug_implementations)]
19
20use std::time::Duration;
21
22use mz_dyncfg::{ConfigSet, ConfigUpdates};
23use mz_ore::metrics::MetricsRegistry;
24use tokio::time::Interval;
25
26pub use dyncfgs::all_dyncfgs;
27
28mod dyncfgs;
29pub mod lgalloc;
30pub mod rusage;
31
32/// Handle to metrics defined in this crate.
33#[derive(Debug)]
34pub struct Metrics {
35    config_set: ConfigSet,
36    lgalloc: MetricsTask,
37    lgalloc_map: MetricsTask,
38    rusage: MetricsTask,
39}
40
41static METRICS: std::sync::Mutex<Option<Metrics>> = std::sync::Mutex::new(None);
42
43/// Register all metrics into the provided registry.
44///
45/// We do not recommend calling this function multiple times. It is safe to call this function,
46/// but it might delete previous metrics. If we ever want to change this, we should
47/// remove the shared static mutex and make this function return a handle to the metrics.
48///
49/// This function is async, because it needs to be called from a tokio runtime context.
50#[allow(clippy::unused_async)]
51pub async fn register_metrics_into(metrics_registry: &MetricsRegistry, config_set: ConfigSet) {
52    let update_duration_metric = metrics_registry.register(mz_ore::metric!(
53        name: "mz_metrics_update_duration",
54        help: "The time it took to update lgalloc stats",
55        var_labels: ["name"],
56        buckets: mz_ore::stats::histogram_seconds_buckets(0.000_500, 32.),
57    ));
58
59    let lgalloc = Metrics::new_metrics_task(
60        metrics_registry,
61        lgalloc::register_metrics_into,
62        dyncfgs::MZ_METRICS_LGALLOC_REFRESH_INTERVAL,
63        &update_duration_metric,
64    );
65    let lgalloc_map = Metrics::new_metrics_task(
66        metrics_registry,
67        lgalloc::register_map_metrics_into,
68        dyncfgs::MZ_METRICS_LGALLOC_MAP_REFRESH_INTERVAL,
69        &update_duration_metric,
70    );
71    let rusage = Metrics::new_metrics_task(
72        metrics_registry,
73        rusage::register_metrics_into,
74        dyncfgs::MZ_METRICS_RUSAGE_REFRESH_INTERVAL,
75        &update_duration_metric,
76    );
77
78    *METRICS.lock().expect("lock poisoned") = Some(Metrics {
79        lgalloc,
80        lgalloc_map,
81        rusage,
82        config_set,
83    });
84}
85
86/// Returns the `(name, help, labels, source)` of every metric this crate
87/// registers through a `metric!`-wrapping macro (lgalloc and rusage).
88///
89/// The metrics catalog (`mz-metrics-catalog`) builds the user-facing metrics
90/// docs by scraping `metric!` invocations out of the source with `syn`. These
91/// metrics are invisible to that scraper because their names are assembled at
92/// macro-expansion time, so the catalog
93/// imports them from here instead: it registers them into a throwaway registry
94/// and reads their descriptors back out.
95pub fn describe_metrics() -> Vec<(String, String, Vec<String>, &'static str)> {
96    let registry = MetricsRegistry::new();
97
98    let tag = |descs: Vec<(String, String, Vec<String>)>, src: &'static str| {
99        descs
100            .into_iter()
101            .map(move |(name, help, labels)| (name, help, labels, src))
102    };
103
104    let mut out = Vec::new();
105    out.extend(tag(
106        lgalloc::register_metrics_into(&registry).descs(),
107        lgalloc::SOURCE,
108    ));
109    out.extend(tag(
110        lgalloc::register_map_metrics_into(&registry).descs(),
111        lgalloc::SOURCE,
112    ));
113    out.extend(tag(
114        rusage::register_metrics_into(&registry).descs(),
115        rusage::SOURCE,
116    ));
117    out
118}
119
120/// Extracts a metric's label keys from its Prometheus descriptor
121pub(crate) fn desc_labels(desc: &prometheus::core::Desc) -> Vec<String> {
122    let mut labels: Vec<String> = desc
123        .variable_labels
124        .iter()
125        .cloned()
126        .chain(desc.const_label_pairs.iter().map(|p| p.name().to_owned()))
127        .collect();
128    labels.sort();
129    labels.dedup();
130    labels
131}
132
133/// Update the configuration of the metrics.
134pub fn update_dyncfg(config_updates: &ConfigUpdates) {
135    if let Some(metrics) = METRICS.lock().expect("lock poisoned").as_mut() {
136        metrics.apply_dyncfg_updates(config_updates);
137    }
138}
139
140impl Metrics {
141    /// Update the dynamic configuration.
142    pub fn apply_dyncfg_updates(&mut self, config_updates: &ConfigUpdates) {
143        // Update the config set.
144        config_updates.apply(&self.config_set);
145        // Notify tasks about updated configuration.
146        self.lgalloc.update_dyncfg(&self.config_set);
147        self.lgalloc_map.update_dyncfg(&self.config_set);
148        self.rusage.update_dyncfg(&self.config_set);
149    }
150
151    fn new_metrics_task<T: MetricsUpdate>(
152        metrics_registry: &MetricsRegistry,
153        constructor: impl FnOnce(&MetricsRegistry) -> T,
154        interval_config: mz_dyncfg::Config<Duration>,
155        update_duration_metric: &mz_ore::metrics::HistogramVec,
156    ) -> MetricsTask {
157        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
158
159        // Start disabled.
160        let mut interval: Option<Interval> = None;
161
162        let update_duration_metric =
163            update_duration_metric.get_delete_on_drop_metric(&[T::NAME][..]);
164
165        let mut metrics = constructor(metrics_registry);
166
167        let mut update_metrics = move || {
168            tracing::debug!(metrics = T::NAME, "updating metrics");
169            let start = std::time::Instant::now();
170            if let Err(err) = metrics.update() {
171                tracing::error!(metrics = T::NAME, ?err, "metrics update failed");
172            }
173            let elapsed = start.elapsed();
174            update_duration_metric.observe(elapsed.as_secs_f64());
175        };
176
177        let update_interval = |new_interval, interval: &mut Option<Interval>| {
178            // Zero duration disables ticking.
179            if new_interval == Duration::ZERO {
180                *interval = None;
181                return;
182            }
183            // Prevent no-op changes.
184            if Some(new_interval) == interval.as_ref().map(Interval::period) {
185                return;
186            }
187            tracing::debug!(metrics = T::NAME, ?new_interval, "updating interval");
188            let mut new_interval = tokio::time::interval(new_interval);
189            new_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
190            *interval = Some(new_interval);
191        };
192
193        mz_ore::task::spawn(|| format!("mz_metrics_update({})", T::NAME), async move {
194            loop {
195                tokio::select! {
196                    _ = async {
197                        interval.as_mut().unwrap().tick().await
198                    }, if interval.is_some() => {
199                        update_metrics()
200                    }
201                    new_interval = rx.recv() => match new_interval {
202                        Some(new_interval) => update_interval(new_interval, &mut interval),
203                        None => break,
204                    }
205                }
206            }
207        });
208
209        MetricsTask {
210            tx,
211            interval_config,
212        }
213    }
214}
215
216/// Behavior to update metrics.
217pub trait MetricsUpdate: Send + Sync + 'static {
218    /// Error type to indicate updating failed.
219    type Error: std::fmt::Debug;
220    /// A human-readable name.
221    const NAME: &'static str;
222    /// Update the metrics.
223    fn update(&mut self) -> Result<(), Self::Error>;
224}
225
226#[derive(Debug)]
227struct MetricsTask {
228    interval_config: mz_dyncfg::Config<Duration>,
229    tx: tokio::sync::mpsc::UnboundedSender<Duration>,
230}
231
232impl MetricsTask {
233    pub(crate) fn update_dyncfg(&self, config_set: &ConfigSet) {
234        self.tx
235            .send(self.interval_config.get(config_set))
236            .expect("Receiver exists");
237    }
238}