mz_metrics/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Internal metrics libraries for Materialize.
17
18#![warn(missing_docs, missing_debug_implementations)]
19
20use std::time::Duration;
21
22use mz_dyncfg::{ConfigSet, ConfigUpdates};
23use mz_ore::metrics::MetricsRegistry;
24use tokio::time::Interval;
25
26pub use dyncfgs::all_dyncfgs;
27
28mod dyncfgs;
29pub mod lgalloc;
30pub mod rusage;
31
32/// Handle to metrics defined in this crate.
33#[derive(Debug)]
34pub struct Metrics {
35    config_set: ConfigSet,
36    lgalloc: MetricsTask,
37    rusage: MetricsTask,
38}
39
40static METRICS: std::sync::Mutex<Option<Metrics>> = std::sync::Mutex::new(None);
41
42/// Register all metrics into the provided registry.
43///
44/// We do not recommend calling this function multiple times. It is safe to call this function,
45/// but it might delete previous metrics. If we ever want to change this, we should
46/// remove the shared static mutex and make this function return a handle to the metrics.
47///
48/// This function is async, because it needs to be called from a tokio runtime context.
49#[allow(clippy::unused_async)]
50pub async fn register_metrics_into(metrics_registry: &MetricsRegistry, config_set: ConfigSet) {
51    let update_duration_metric = metrics_registry.register(mz_ore::metric!(
52        name: "mz_metrics_update_duration",
53        help: "The time it took to update lgalloc stats",
54        var_labels: ["name"],
55        buckets: mz_ore::stats::histogram_seconds_buckets(0.000_500, 32.),
56    ));
57
58    let lgalloc = Metrics::new_metrics_task(
59        metrics_registry,
60        lgalloc::register_metrics_into,
61        dyncfgs::MZ_METRICS_LGALLOC_REFRESH_INTERVAL,
62        &update_duration_metric,
63    );
64    let rusage = Metrics::new_metrics_task(
65        metrics_registry,
66        rusage::register_metrics_into,
67        dyncfgs::MZ_METRICS_RUSAGE_REFRESH_INTERVAL,
68        &update_duration_metric,
69    );
70
71    *METRICS.lock().expect("lock poisoned") = Some(Metrics {
72        lgalloc,
73        rusage,
74        config_set,
75    });
76}
77
78/// Update the configuration of the metrics.
79pub fn update_dyncfg(config_updates: &ConfigUpdates) {
80    if let Some(metrics) = METRICS.lock().expect("lock poisoned").as_mut() {
81        metrics.apply_dyncfg_updates(config_updates);
82    }
83}
84
85impl Metrics {
86    /// Update the dynamic configuration.
87    pub fn apply_dyncfg_updates(&mut self, config_updates: &ConfigUpdates) {
88        // Update the config set.
89        config_updates.apply(&self.config_set);
90        // Notify tasks about updated configuration.
91        self.lgalloc.update_dyncfg(&self.config_set);
92        self.rusage.update_dyncfg(&self.config_set);
93    }
94
95    fn new_metrics_task<T: MetricsUpdate>(
96        metrics_registry: &MetricsRegistry,
97        constructor: impl FnOnce(&MetricsRegistry) -> T,
98        interval_config: mz_dyncfg::Config<Duration>,
99        update_duration_metric: &mz_ore::metrics::HistogramVec,
100    ) -> MetricsTask {
101        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
102
103        // Start disabled.
104        let mut interval: Option<Interval> = None;
105
106        let update_duration_metric =
107            update_duration_metric.get_delete_on_drop_metric(&[T::NAME][..]);
108
109        let mut metrics = constructor(metrics_registry);
110
111        let mut update_metrics = move || {
112            tracing::debug!(metrics = T::NAME, "updating metrics");
113            let start = std::time::Instant::now();
114            if let Err(err) = metrics.update() {
115                tracing::error!(metrics = T::NAME, ?err, "metrics update failed");
116            }
117            let elapsed = start.elapsed();
118            update_duration_metric.observe(elapsed.as_secs_f64());
119        };
120
121        let update_interval = |new_interval, interval: &mut Option<Interval>| {
122            // Zero duration disables ticking.
123            if new_interval == Duration::ZERO {
124                *interval = None;
125                return;
126            }
127            // Prevent no-op changes.
128            if Some(new_interval) == interval.as_ref().map(Interval::period) {
129                return;
130            }
131            tracing::debug!(metrics = T::NAME, ?new_interval, "updating interval");
132            let mut new_interval = tokio::time::interval(new_interval);
133            new_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
134            *interval = Some(new_interval);
135        };
136
137        mz_ore::task::spawn(|| format!("mz_metrics_update({})", T::NAME), async move {
138            loop {
139                tokio::select! {
140                    _ = async { interval.as_mut().unwrap().tick().await }, if interval.is_some() => {
141                        update_metrics()
142                    }
143                    new_interval = rx.recv() => match new_interval {
144                        Some(new_interval) => update_interval(new_interval, &mut interval),
145                        None => break,
146                    }
147                }
148            }
149        });
150
151        MetricsTask {
152            tx,
153            interval_config,
154        }
155    }
156}
157
158/// Behavior to update metrics.
159pub trait MetricsUpdate: Send + Sync + 'static {
160    /// Error type to indicate updating failed.
161    type Error: std::fmt::Debug;
162    /// A human-readable name.
163    const NAME: &'static str;
164    /// Update the metrics.
165    fn update(&mut self) -> Result<(), Self::Error>;
166}
167
168#[derive(Debug)]
169struct MetricsTask {
170    interval_config: mz_dyncfg::Config<Duration>,
171    tx: tokio::sync::mpsc::UnboundedSender<Duration>,
172}
173
174impl MetricsTask {
175    pub(crate) fn update_dyncfg(&self, config_set: &ConfigSet) {
176        self.tx
177            .send(self.interval_config.get(config_set))
178            .expect("Receiver exists");
179    }
180}