mz_orchestratord/
metrics.rs1use std::collections::BTreeMap;
11use std::time::Duration;
12
13use axum::{Extension, Router, body::Body, routing::get};
14use http::{HeaderMap, Method, Request, Response, StatusCode};
15use prometheus::{Encoder, TextEncoder};
16use tower_http::{classify::ServerErrorsFailureClass, trace::TraceLayer};
17use tracing::{Level, Span};
18
19use mz_ore::metric;
20use mz_ore::metrics::{MetricsRegistry, UIntGauge};
21
22#[derive(Debug)]
23pub struct Metrics {
24 pub environmentd_needs_update: UIntGauge,
25}
26
27impl Metrics {
28 pub fn register_into(registry: &MetricsRegistry) -> Self {
29 Self {
30 environmentd_needs_update: registry.register(
31 metric! {
32 name: "environmentd_needs_update",
33 help: "Count of organizations in this cluster which are running outdated pod templates",
34 }),
35 }
36 }
37}
38
39pub fn router(registry: MetricsRegistry) -> Router {
40 add_tracing_layer(
41 Router::new()
42 .route("/metrics", get(metrics))
43 .layer(Extension(registry)),
44 )
45}
46
47#[allow(clippy::unused_async)]
48async fn metrics(Extension(registry): Extension<MetricsRegistry>) -> (StatusCode, Vec<u8>) {
49 let mut buf = vec![];
50 let encoder = TextEncoder::new();
51 let metric_families = registry.gather();
52 encoder.encode(&metric_families, &mut buf).unwrap();
53 (StatusCode::OK, buf)
54}
55
56fn add_tracing_layer<S>(router: Router<S>) -> Router<S>
69where
70 S: Clone + Send + Sync + 'static,
71{
72 router.layer(
73 TraceLayer::new_for_http()
74 .make_span_with(|request: &Request<Body>| {
75 macro_rules! make_span {
81 ($level:expr) => {
82 tracing::span!(
83 $level,
84 "HTTP request",
85 "request.uri" = %request.uri(),
86 "request.version" = ?request.version(),
87 "request.method" = %request.method(),
88 "request.headers" = tracing::field::Empty,
89 "response.status" = tracing::field::Empty,
90 "response.status_code" = tracing::field::Empty,
91 "response.headers" = tracing::field::Empty,
92 )
93 }
94 }
95 let span = if ["/api/health", "/metrics"].contains(&request.uri().path())
96 || request.method() == Method::OPTIONS
97 {
98 make_span!(Level::DEBUG)
99 } else {
100 make_span!(Level::INFO)
101 };
102
103 if let Ok(s) = serde_json::to_string(&display_headers(request.headers().clone())) {
104 span.record("request.headers", s);
105 }
106
107 span
108 })
109 .on_response(|response: &Response<Body>, _latency, span: &Span| {
110 span.record(
111 "response.status",
112 &tracing::field::display(response.status()),
113 );
114 span.record("response.status_code", response.status().as_u16());
115 if let Ok(s) = serde_json::to_string(&display_headers(response.headers().clone())) {
116 span.record("response.headers", s);
117 }
118
119 if span
123 .metadata()
124 .and_then(|m| Some(m.level()))
125 .unwrap_or(&Level::DEBUG)
126 == &Level::DEBUG
127 {
128 tracing::debug!("HTTP response generated");
129 } else {
130 tracing::info!("HTTP response generated");
131 }
132 })
133 .on_failure(
134 |error: ServerErrorsFailureClass, _latency: Duration, _span: &Span| {
135 tracing::warn!(error = ?error, "HTTP request handling error");
136 },
137 ),
138 )
139}
140
141fn display_headers(mut headers: HeaderMap) -> BTreeMap<String, String> {
142 _ = headers.remove(http::header::AUTHORIZATION);
144
145 headers
146 .into_iter()
147 .filter_map(|(k, v)| {
148 k.map(|k| {
149 (
150 k.to_string(),
151 String::from_utf8_lossy(v.as_bytes()).to_string(),
152 )
153 })
154 })
155 .collect()
156}