Skip to main content

mz_environmentd/http/
metrics_public.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Federated `/metrics` endpoint.
11//!
12//! Aggregates environmentd's local metrics with every clusterd replica's
13//! `/metrics` output, adding `cluster_id`, `replica_id`, `process`,
14//! `cluster_name`, and `replica_name` labels onto the clusterd-sourced
15//! metrics.
16
17use std::collections::BTreeMap;
18use std::sync::Arc;
19
20use axum::Extension;
21use axum::body::Body;
22use axum::response::{IntoResponse, Response};
23use axum_extra::TypedHeader;
24use futures::future::join_all;
25use headers::ContentType;
26use http::{Method, Request, StatusCode};
27use http_body_util::BodyExt;
28use mz_adapter_types::dyncfgs::ENABLE_PUBLIC_METRICS_ENDPOINT;
29use mz_controller_types::{ClusterId, ReplicaId};
30use mz_ore::metrics::MetricsRegistry;
31use prometheus::Encoder;
32
33use crate::http::AuthedClient;
34use crate::http::cluster::{
35    ClusterProxyConfig, proxy_replica_request, rewrite_request_for_replica,
36};
37
38/// The customer-facing `/metrics` endpoint.
39///
40/// Aggregates environmentd's local metrics with every clusterd replica's
41/// `/metrics` output.
42pub(crate) async fn handle_public_metrics(
43    client: AuthedClient,
44    Extension(config): Extension<Arc<ClusterProxyConfig>>,
45    Extension(metrics_registry): Extension<MetricsRegistry>,
46) -> Response {
47    let catalog = client.client.catalog_snapshot("metrics_public").await;
48    if !ENABLE_PUBLIC_METRICS_ENDPOINT.get(catalog.system_config().dyncfgs()) {
49        return StatusCode::SERVICE_UNAVAILABLE.into_response();
50    }
51
52    // Start with env's local gather() output.
53    let mut all_families: Vec<prometheus::proto::MetricFamily> = metrics_registry.gather();
54
55    // Fan out to every (cluster, replica, process).
56    let scrapes: Vec<(ClusterId, ReplicaId, usize, String)> = config
57        .locator
58        .list_replicas()
59        .into_iter()
60        .flat_map(|(cluster_id, replica_id, addrs)| {
61            addrs
62                .into_iter()
63                .enumerate()
64                .map(move |(process_id, addr)| (cluster_id, replica_id, process_id, addr))
65        })
66        .collect();
67
68    let results = join_all(scrapes.into_iter().map(scrape_replica_metrics_endpoint)).await;
69
70    for (cluster_id, replica_id, process, result) in results {
71        let bytes = match result {
72            Ok(b) => b,
73            Err(e) => {
74                tracing::warn!(
75                    %cluster_id,
76                    %replica_id,
77                    process,
78                    "federated metrics scrape failed: {e}"
79                );
80                continue;
81            }
82        };
83        let cluster_name = catalog
84            .try_get_cluster(cluster_id)
85            .map(|cluster| cluster.name.clone());
86        let replica_name = catalog
87            .try_get_cluster_replica(cluster_id, replica_id)
88            .map(|replica| replica.name.clone());
89        let families = match mz_prometheus_protobuf::decode_length_delimited(&bytes) {
90            Ok(f) => f,
91            Err(e) => {
92                tracing::warn!(
93                    %cluster_id,
94                    %replica_id,
95                    process,
96                    "federated metrics decode failed: {e}"
97                );
98                continue;
99            }
100        };
101        for mut family in families {
102            add_replica_labels(
103                &mut family,
104                cluster_id,
105                replica_id,
106                process,
107                cluster_name.as_deref(),
108                replica_name.as_deref(),
109            );
110            all_families.push(family);
111        }
112    }
113
114    // env and clusterd both register many of the same metric names
115    // and every replica scrape may also redeclare names. The
116    // text encoder emits one `# HELP`/`# TYPE` block per `MetricFamily`, so
117    // without this we'd produce duplicate header lines and scrapers would
118    // reject the response.
119    let all_families = merge_families_by_name(all_families);
120
121    // Re-emit as Prometheus text.
122    let mut body = Vec::new();
123    if let Err(e) = prometheus::TextEncoder::new().encode(&all_families, &mut body) {
124        return (
125            StatusCode::INTERNAL_SERVER_ERROR,
126            format!("failed to encode metrics: {e}"),
127        )
128            .into_response();
129    }
130    (TypedHeader(ContentType::text()), body).into_response()
131}
132
133async fn scrape_replica_metrics_endpoint(
134    (cluster_id, replica_id, process, addr): (ClusterId, ReplicaId, usize, String),
135) -> (ClusterId, ReplicaId, usize, Result<Vec<u8>, String>) {
136    let result =
137        scrape_replica_metrics_endpoint_inner(cluster_id, replica_id, process, &addr).await;
138    (cluster_id, replica_id, process, result)
139}
140
141async fn scrape_replica_metrics_endpoint_inner(
142    cluster_id: ClusterId,
143    replica_id: ReplicaId,
144    process: usize,
145    addr: &str,
146) -> Result<Vec<u8>, String> {
147    let mut req = Request::builder()
148        .method(Method::GET)
149        .uri("/metrics")
150        .header(
151            http::header::ACCEPT,
152            mz_http_util::PROMETHEUS_PROTOBUF_CONTENT_TYPE,
153        )
154        .body(Body::empty())
155        .map_err(|e| format!("building request: {e}"))?;
156    rewrite_request_for_replica(&mut req, addr, cluster_id, replica_id, process, "/metrics")
157        .map_err(|(status, msg)| format!("{status}: {msg}"))?;
158    let resp = proxy_replica_request(addr, req)
159        .await
160        .map_err(|(status, msg)| format!("{status}: {msg}"))?;
161    if !resp.status().is_success() {
162        return Err(format!("upstream status {}", resp.status()));
163    }
164    let body = resp.into_body();
165    let collected = body
166        .collect()
167        .await
168        .map_err(|e| format!("collecting body: {e}"))?
169        .to_bytes();
170
171    Ok(collected.to_vec())
172}
173
174fn add_replica_labels(
175    family: &mut prometheus::proto::MetricFamily,
176    cluster_id: ClusterId,
177    replica_id: ReplicaId,
178    process: usize,
179    cluster_name: Option<&str>,
180    replica_name: Option<&str>,
181) {
182    for metric in family.mut_metric() {
183        let mut labels = metric.take_label();
184        labels.push(label_pair("cluster_id", cluster_id.to_string()));
185        labels.push(label_pair("replica_id", replica_id.to_string()));
186        labels.push(label_pair("process", process.to_string()));
187        if let Some(n) = cluster_name {
188            labels.push(label_pair("cluster_name", n.to_owned()));
189        }
190        if let Some(n) = replica_name {
191            labels.push(label_pair("replica_name", n.to_owned()));
192        }
193        metric.set_label(labels);
194    }
195}
196
197/// Merges `MetricFamily` entries that share a name into a single entry, keeping
198/// the first occurrence's metadata (help text, type) and appending every
199/// subsequent same-named family's metrics into it.
200fn merge_families_by_name(
201    families: Vec<prometheus::proto::MetricFamily>,
202) -> Vec<prometheus::proto::MetricFamily> {
203    let mut merged_families: Vec<prometheus::proto::MetricFamily> =
204        Vec::with_capacity(families.len());
205    let mut first_family_index_by_name: BTreeMap<String, usize> = BTreeMap::new();
206    for mut family in families {
207        let name = family.name().to_owned();
208        if let Some(&index) = first_family_index_by_name.get(&name) {
209            for metric in family.take_metric() {
210                merged_families[index].mut_metric().push(metric);
211            }
212        } else {
213            first_family_index_by_name.insert(name, merged_families.len());
214            merged_families.push(family);
215        }
216    }
217    merged_families
218}
219
220fn label_pair(name: &str, value: String) -> prometheus::proto::LabelPair {
221    let mut pair = prometheus::proto::LabelPair::default();
222    pair.set_name(name.to_owned());
223    pair.set_value(value);
224    pair
225}
226
227#[cfg(test)]
228mod tests {
229    use super::*;
230
231    fn make_family(name: &str) -> prometheus::proto::MetricFamily {
232        let mut family = prometheus::proto::MetricFamily::default();
233        family.set_name(name.to_owned());
234        family.set_help(format!("help for {name}"));
235        family.set_field_type(prometheus::proto::MetricType::COUNTER);
236        let mut metric = prometheus::proto::Metric::default();
237        let mut counter = prometheus::proto::Counter::default();
238        counter.set_value(1.0);
239        metric.set_counter(counter);
240        family.set_metric(vec![metric]);
241        family
242    }
243
244    #[mz_ore::test]
245    fn adds_three_labels_when_names_unknown() {
246        let mut family = make_family("test");
247        let cluster_id: ClusterId = "u1".parse().expect("cluster id");
248        let replica_id: ReplicaId = "u2".parse().expect("replica id");
249        add_replica_labels(&mut family, cluster_id, replica_id, 7, None, None);
250        let labels = family.get_metric()[0].get_label();
251        let names: Vec<_> = labels.iter().map(|l| l.name()).collect();
252        assert_eq!(names, vec!["cluster_id", "replica_id", "process"]);
253        let values: Vec<_> = labels.iter().map(|l| l.value()).collect();
254        assert_eq!(values, vec!["u1", "u2", "7"]);
255    }
256
257    #[mz_ore::test]
258    fn shared_metric_name_emits_single_help_and_type() {
259        let env_family = make_family("mz_persist_test");
260        let mut clusterd_family = make_family("mz_persist_test");
261        add_replica_labels(
262            &mut clusterd_family,
263            "u1".parse().unwrap(),
264            "u2".parse().unwrap(),
265            0,
266            Some("quickstart"),
267            Some("r1"),
268        );
269
270        let merged = merge_families_by_name(vec![env_family, clusterd_family]);
271        let mut body = Vec::new();
272        prometheus::TextEncoder::new()
273            .encode(&merged, &mut body)
274            .unwrap();
275        let text = String::from_utf8(body).unwrap();
276
277        let help = text
278            .lines()
279            .filter(|l| l.starts_with("# HELP mz_persist_test "))
280            .count();
281        let type_ = text
282            .lines()
283            .filter(|l| l.starts_with("# TYPE mz_persist_test "))
284            .count();
285        assert_eq!(help, 1, "{text}");
286        assert_eq!(type_, 1, "{text}");
287    }
288
289    #[mz_ore::test]
290    fn adds_five_labels_when_names_known() {
291        let mut family = make_family("test");
292        let cluster_id: ClusterId = "u1".parse().expect("cluster id");
293        let replica_id: ReplicaId = "u2".parse().expect("replica id");
294        add_replica_labels(
295            &mut family,
296            cluster_id,
297            replica_id,
298            0,
299            Some("quickstart"),
300            Some("r1"),
301        );
302        let labels = family.get_metric()[0].get_label();
303        let names: Vec<_> = labels.iter().map(|l| l.name()).collect();
304        assert_eq!(
305            names,
306            vec![
307                "cluster_id",
308                "replica_id",
309                "process",
310                "cluster_name",
311                "replica_name",
312            ]
313        );
314        let values: Vec<_> = labels.iter().map(|l| l.value()).collect();
315        assert_eq!(values, vec!["u1", "u2", "0", "quickstart", "r1"]);
316    }
317}