Skip to main content

mz_orchestratord/
metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::time::Duration;
12
13use axum::{Extension, Router, body::Body, routing::get};
14use http::{HeaderMap, Method, Request, Response, StatusCode};
15use prometheus::{Encoder, TextEncoder};
16use tower_http::{classify::ServerErrorsFailureClass, trace::TraceLayer};
17use tracing::{Level, Span};
18
19use mz_ore::metric;
20use mz_ore::metrics::{MetricsRegistry, UIntGauge};
21
22#[derive(Debug)]
23pub struct Metrics {
24    pub environmentd_needs_update: UIntGauge,
25}
26
27impl Metrics {
28    pub fn register_into(registry: &MetricsRegistry) -> Self {
29        Self {
30            environmentd_needs_update: registry.register(
31                metric! {
32                    name: "environmentd_needs_update",
33                    help: "Count of organizations in this cluster which are running outdated pod templates",
34                }),
35        }
36    }
37}
38
39pub fn router(registry: MetricsRegistry) -> Router {
40    add_tracing_layer(
41        Router::new()
42            .route("/metrics", get(metrics))
43            .layer(Extension(registry)),
44    )
45}
46
47#[allow(clippy::unused_async)]
48async fn metrics(Extension(registry): Extension<MetricsRegistry>) -> (StatusCode, Vec<u8>) {
49    let mut buf = vec![];
50    let encoder = TextEncoder::new();
51    let metric_families = registry.gather();
52    encoder.encode(&metric_families, &mut buf).unwrap();
53    (StatusCode::OK, buf)
54}
55
56///   Adds a tracing layer that reports an `INFO` level span per
57///   request and reports a `WARN` event when a handler returns a
58///   server error to the given Axum Router
59///
60///   This accepts a router instead of returning a layer itself
61///   to avoid dealing with defining generics over a bunch of closures
62///   (see <https://users.rust-lang.org/t/how-to-encapsulate-a-builder-that-depends-on-a-closure/71139/6>)
63///
64///   And this also can't be returned as a Router::new()::layer(TraceLayer)...
65///   because the TraceLayer needs to be added to a Router after
66///   all routes are defined, as it won't trace any routes defined
67///   on the router after it's attached.
68fn add_tracing_layer<S>(router: Router<S>) -> Router<S>
69where
70    S: Clone + Send + Sync + 'static,
71{
72    router.layer(
73        TraceLayer::new_for_http()
74            .make_span_with(|request: &Request<Body>| {
75                // This ugly macro is needed, unfortunately (and
76                // copied from tower-http), because
77                // `tracing::span!` required the level argument to
78                // be static. Meaning we can't just pass
79                // `self.level`.
80                macro_rules! make_span {
81                        ($level:expr) => {
82                            tracing::span!(
83                                $level,
84                                "HTTP request",
85                                "request.uri" = %request.uri(),
86                                "request.version" = ?request.version(),
87                                "request.method" = %request.method(),
88                                "request.headers" = tracing::field::Empty,
89                                "response.status" = tracing::field::Empty,
90                                "response.status_code" = tracing::field::Empty,
91                                "response.headers" = tracing::field::Empty,
92                            )
93                        }
94                    }
95                let span = if ["/api/health", "/metrics"].contains(&request.uri().path())
96                    || request.method() == Method::OPTIONS
97                {
98                    make_span!(Level::DEBUG)
99                } else {
100                    make_span!(Level::INFO)
101                };
102
103                if let Ok(s) = serde_json::to_string(&display_headers(request.headers().clone())) {
104                    span.record("request.headers", s);
105                }
106
107                span
108            })
109            .on_response(|response: &Response<Body>, _latency, span: &Span| {
110                span.record(
111                    "response.status",
112                    &tracing::field::display(response.status()),
113                );
114                span.record("response.status_code", response.status().as_u16());
115                if let Ok(s) = serde_json::to_string(&display_headers(response.headers().clone())) {
116                    span.record("response.headers", s);
117                }
118
119                // Emit an event at the same level as the span. For the same reason as noted in the comment
120                // above we can't use `tracing::event!(dynamic_level, ...)` since the level argument
121                // needs to be static
122                if span
123                    .metadata()
124                    .and_then(|m| Some(m.level()))
125                    .unwrap_or(&Level::DEBUG)
126                    == &Level::DEBUG
127                {
128                    tracing::debug!("HTTP response generated");
129                } else {
130                    tracing::info!("HTTP response generated");
131                }
132            })
133            .on_failure(
134                |error: ServerErrorsFailureClass, _latency: Duration, _span: &Span| {
135                    tracing::warn!(error = ?error, "HTTP request handling error");
136                },
137            ),
138    )
139}
140
141fn display_headers(mut headers: HeaderMap) -> BTreeMap<String, String> {
142    // Don't log Authorization headers
143    _ = headers.remove(http::header::AUTHORIZATION);
144
145    headers
146        .into_iter()
147        .filter_map(|(k, v)| {
148            k.map(|k| {
149                (
150                    k.to_string(),
151                    String::from_utf8_lossy(v.as_bytes()).to_string(),
152                )
153            })
154        })
155        .collect()
156}