1use std::collections::BTreeMap;
18use std::sync::Arc;
19
20use axum::Extension;
21use axum::body::Body;
22use axum::response::{IntoResponse, Response};
23use axum_extra::TypedHeader;
24use futures::future::join_all;
25use headers::ContentType;
26use http::{Method, Request, StatusCode};
27use http_body_util::BodyExt;
28use mz_adapter_types::dyncfgs::ENABLE_PUBLIC_METRICS_ENDPOINT;
29use mz_controller_types::{ClusterId, ReplicaId};
30use mz_ore::metrics::MetricsRegistry;
31use prometheus::Encoder;
32
33use crate::http::AuthedClient;
34use crate::http::cluster::{
35 ClusterProxyConfig, proxy_replica_request, rewrite_request_for_replica,
36};
37
38pub(crate) async fn handle_public_metrics(
43 client: AuthedClient,
44 Extension(config): Extension<Arc<ClusterProxyConfig>>,
45 Extension(metrics_registry): Extension<MetricsRegistry>,
46) -> Response {
47 let catalog = client.client.catalog_snapshot("metrics_public").await;
48 if !ENABLE_PUBLIC_METRICS_ENDPOINT.get(catalog.system_config().dyncfgs()) {
49 return StatusCode::SERVICE_UNAVAILABLE.into_response();
50 }
51
52 let mut all_families: Vec<prometheus::proto::MetricFamily> = metrics_registry.gather();
54
55 let scrapes: Vec<(ClusterId, ReplicaId, usize, String)> = config
57 .locator
58 .list_replicas()
59 .into_iter()
60 .flat_map(|(cluster_id, replica_id, addrs)| {
61 addrs
62 .into_iter()
63 .enumerate()
64 .map(move |(process_id, addr)| (cluster_id, replica_id, process_id, addr))
65 })
66 .collect();
67
68 let results = join_all(scrapes.into_iter().map(scrape_replica_metrics_endpoint)).await;
69
70 for (cluster_id, replica_id, process, result) in results {
71 let bytes = match result {
72 Ok(b) => b,
73 Err(e) => {
74 tracing::warn!(
75 %cluster_id,
76 %replica_id,
77 process,
78 "federated metrics scrape failed: {e}"
79 );
80 continue;
81 }
82 };
83 let cluster_name = catalog
84 .try_get_cluster(cluster_id)
85 .map(|cluster| cluster.name.clone());
86 let replica_name = catalog
87 .try_get_cluster_replica(cluster_id, replica_id)
88 .map(|replica| replica.name.clone());
89 let families = match mz_prometheus_protobuf::decode_length_delimited(&bytes) {
90 Ok(f) => f,
91 Err(e) => {
92 tracing::warn!(
93 %cluster_id,
94 %replica_id,
95 process,
96 "federated metrics decode failed: {e}"
97 );
98 continue;
99 }
100 };
101 for mut family in families {
102 add_replica_labels(
103 &mut family,
104 cluster_id,
105 replica_id,
106 process,
107 cluster_name.as_deref(),
108 replica_name.as_deref(),
109 );
110 all_families.push(family);
111 }
112 }
113
114 let all_families = merge_families_by_name(all_families);
120
121 let mut body = Vec::new();
123 if let Err(e) = prometheus::TextEncoder::new().encode(&all_families, &mut body) {
124 return (
125 StatusCode::INTERNAL_SERVER_ERROR,
126 format!("failed to encode metrics: {e}"),
127 )
128 .into_response();
129 }
130 (TypedHeader(ContentType::text()), body).into_response()
131}
132
133async fn scrape_replica_metrics_endpoint(
134 (cluster_id, replica_id, process, addr): (ClusterId, ReplicaId, usize, String),
135) -> (ClusterId, ReplicaId, usize, Result<Vec<u8>, String>) {
136 let result =
137 scrape_replica_metrics_endpoint_inner(cluster_id, replica_id, process, &addr).await;
138 (cluster_id, replica_id, process, result)
139}
140
141async fn scrape_replica_metrics_endpoint_inner(
142 cluster_id: ClusterId,
143 replica_id: ReplicaId,
144 process: usize,
145 addr: &str,
146) -> Result<Vec<u8>, String> {
147 let mut req = Request::builder()
148 .method(Method::GET)
149 .uri("/metrics")
150 .header(
151 http::header::ACCEPT,
152 mz_http_util::PROMETHEUS_PROTOBUF_CONTENT_TYPE,
153 )
154 .body(Body::empty())
155 .map_err(|e| format!("building request: {e}"))?;
156 rewrite_request_for_replica(&mut req, addr, cluster_id, replica_id, process, "/metrics")
157 .map_err(|(status, msg)| format!("{status}: {msg}"))?;
158 let resp = proxy_replica_request(addr, req)
159 .await
160 .map_err(|(status, msg)| format!("{status}: {msg}"))?;
161 if !resp.status().is_success() {
162 return Err(format!("upstream status {}", resp.status()));
163 }
164 let body = resp.into_body();
165 let collected = body
166 .collect()
167 .await
168 .map_err(|e| format!("collecting body: {e}"))?
169 .to_bytes();
170
171 Ok(collected.to_vec())
172}
173
174fn add_replica_labels(
175 family: &mut prometheus::proto::MetricFamily,
176 cluster_id: ClusterId,
177 replica_id: ReplicaId,
178 process: usize,
179 cluster_name: Option<&str>,
180 replica_name: Option<&str>,
181) {
182 for metric in family.mut_metric() {
183 let mut labels = metric.take_label();
184 labels.push(label_pair("cluster_id", cluster_id.to_string()));
185 labels.push(label_pair("replica_id", replica_id.to_string()));
186 labels.push(label_pair("process", process.to_string()));
187 if let Some(n) = cluster_name {
188 labels.push(label_pair("cluster_name", n.to_owned()));
189 }
190 if let Some(n) = replica_name {
191 labels.push(label_pair("replica_name", n.to_owned()));
192 }
193 metric.set_label(labels);
194 }
195}
196
197fn merge_families_by_name(
201 families: Vec<prometheus::proto::MetricFamily>,
202) -> Vec<prometheus::proto::MetricFamily> {
203 let mut merged_families: Vec<prometheus::proto::MetricFamily> =
204 Vec::with_capacity(families.len());
205 let mut first_family_index_by_name: BTreeMap<String, usize> = BTreeMap::new();
206 for mut family in families {
207 let name = family.name().to_owned();
208 if let Some(&index) = first_family_index_by_name.get(&name) {
209 for metric in family.take_metric() {
210 merged_families[index].mut_metric().push(metric);
211 }
212 } else {
213 first_family_index_by_name.insert(name, merged_families.len());
214 merged_families.push(family);
215 }
216 }
217 merged_families
218}
219
220fn label_pair(name: &str, value: String) -> prometheus::proto::LabelPair {
221 let mut pair = prometheus::proto::LabelPair::default();
222 pair.set_name(name.to_owned());
223 pair.set_value(value);
224 pair
225}
226
227#[cfg(test)]
228mod tests {
229 use super::*;
230
231 fn make_family(name: &str) -> prometheus::proto::MetricFamily {
232 let mut family = prometheus::proto::MetricFamily::default();
233 family.set_name(name.to_owned());
234 family.set_help(format!("help for {name}"));
235 family.set_field_type(prometheus::proto::MetricType::COUNTER);
236 let mut metric = prometheus::proto::Metric::default();
237 let mut counter = prometheus::proto::Counter::default();
238 counter.set_value(1.0);
239 metric.set_counter(counter);
240 family.set_metric(vec![metric]);
241 family
242 }
243
244 #[mz_ore::test]
245 fn adds_three_labels_when_names_unknown() {
246 let mut family = make_family("test");
247 let cluster_id: ClusterId = "u1".parse().expect("cluster id");
248 let replica_id: ReplicaId = "u2".parse().expect("replica id");
249 add_replica_labels(&mut family, cluster_id, replica_id, 7, None, None);
250 let labels = family.get_metric()[0].get_label();
251 let names: Vec<_> = labels.iter().map(|l| l.name()).collect();
252 assert_eq!(names, vec!["cluster_id", "replica_id", "process"]);
253 let values: Vec<_> = labels.iter().map(|l| l.value()).collect();
254 assert_eq!(values, vec!["u1", "u2", "7"]);
255 }
256
257 #[mz_ore::test]
258 fn shared_metric_name_emits_single_help_and_type() {
259 let env_family = make_family("mz_persist_test");
260 let mut clusterd_family = make_family("mz_persist_test");
261 add_replica_labels(
262 &mut clusterd_family,
263 "u1".parse().unwrap(),
264 "u2".parse().unwrap(),
265 0,
266 Some("quickstart"),
267 Some("r1"),
268 );
269
270 let merged = merge_families_by_name(vec![env_family, clusterd_family]);
271 let mut body = Vec::new();
272 prometheus::TextEncoder::new()
273 .encode(&merged, &mut body)
274 .unwrap();
275 let text = String::from_utf8(body).unwrap();
276
277 let help = text
278 .lines()
279 .filter(|l| l.starts_with("# HELP mz_persist_test "))
280 .count();
281 let type_ = text
282 .lines()
283 .filter(|l| l.starts_with("# TYPE mz_persist_test "))
284 .count();
285 assert_eq!(help, 1, "{text}");
286 assert_eq!(type_, 1, "{text}");
287 }
288
289 #[mz_ore::test]
290 fn adds_five_labels_when_names_known() {
291 let mut family = make_family("test");
292 let cluster_id: ClusterId = "u1".parse().expect("cluster id");
293 let replica_id: ReplicaId = "u2".parse().expect("replica id");
294 add_replica_labels(
295 &mut family,
296 cluster_id,
297 replica_id,
298 0,
299 Some("quickstart"),
300 Some("r1"),
301 );
302 let labels = family.get_metric()[0].get_label();
303 let names: Vec<_> = labels.iter().map(|l| l.name()).collect();
304 assert_eq!(
305 names,
306 vec![
307 "cluster_id",
308 "replica_id",
309 "process",
310 "cluster_name",
311 "replica_name",
312 ]
313 );
314 let values: Vec<_> = labels.iter().map(|l| l.value()).collect();
315 assert_eq!(values, vec!["u1", "u2", "0", "quickstart", "r1"]);
316 }
317}