mz_orchestratord/
metrics.rs1use std::time::Duration;
11
12use axum::{Extension, Router, body::Body, routing::get};
13use http::{Method, Request, Response, StatusCode};
14use prometheus::{Encoder, TextEncoder};
15use tower_http::{classify::ServerErrorsFailureClass, trace::TraceLayer};
16use tracing::{Level, Span};
17
18use mz_ore::metric;
19use mz_ore::metrics::{MetricsRegistry, UIntGauge};
20
21#[derive(Debug)]
22pub struct Metrics {
23 pub environmentd_needs_update: UIntGauge,
24}
25
26impl Metrics {
27 pub fn register_into(registry: &MetricsRegistry) -> Self {
28 Self {
29 environmentd_needs_update: registry.register(
30 metric! {
31 name: "environmentd_needs_update",
32 help: "Count of organizations in this cluster which are running outdated pod templates",
33 }),
34 }
35 }
36}
37
38pub fn router(registry: MetricsRegistry) -> Router {
39 add_tracing_layer(
40 Router::new()
41 .route("/metrics", get(metrics))
42 .layer(Extension(registry)),
43 )
44}
45
46#[allow(clippy::unused_async)]
47async fn metrics(Extension(registry): Extension<MetricsRegistry>) -> (StatusCode, Vec<u8>) {
48 let mut buf = vec![];
49 let encoder = TextEncoder::new();
50 let metric_families = registry.gather();
51 encoder.encode(&metric_families, &mut buf).unwrap();
52 (StatusCode::OK, buf)
53}
54
55fn add_tracing_layer<S>(router: Router<S>) -> Router<S>
68where
69 S: Clone + Send + Sync + 'static,
70{
71 router.layer(TraceLayer::new_for_http()
72 .make_span_with(|request: &Request<Body>| {
73 let mut headers = request.headers().clone();
80 _ = headers.remove(http::header::AUTHORIZATION);
81 macro_rules! make_span {
82 ($level:expr) => {
83 tracing::span!(
84 $level,
85 "HTTP request",
86 "request.uri" = %request.uri(),
87 "request.version" = ?request.version(),
88 "request.method" = %request.method(),
89 "request.headers" = ?headers,
90 "response.status" = tracing::field::Empty,
91 "response.status_code" = tracing::field::Empty,
92 "response.headers" = tracing::field::Empty,
93 )
94 }
95 }
96 if request.uri().path() == "/api/health" || request.method() == Method::OPTIONS {
97 return make_span!(Level::DEBUG);
98 }
99 make_span!(Level::INFO)
100 })
101 .on_response(|response: &Response<Body>, _latency, span: &Span| {
102 span.record(
103 "response.status",
104 &tracing::field::display(response.status()),
105 );
106 span.record(
107 "response.status_code",
108 &tracing::field::display(response.status().as_u16()),
109 );
110 span.record(
111 "response.headers",
112 &tracing::field::debug(response.headers()),
113 );
114 if span.metadata().and_then(|m| Some(m.level())).unwrap_or(&Level::DEBUG) == &Level::DEBUG {
118 tracing::debug!(msg = "HTTP response generated", response = ?response, status_code = response.status().as_u16());
119 } else {
120 tracing::info!(msg = "HTTP response generated", response = ?response, status_code = response.status().as_u16());
121 }
122 })
123 .on_failure(
124 |error: ServerErrorsFailureClass, _latency: Duration, _span: &Span| {
125 tracing::warn!(msg = "HTTP request handling error", error = ?error);
126 },
127 ))
128}