mz_orchestratord/metrics.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
use std::time::Duration;
use axum::{body::Body, routing::get, Extension, Router};
use http::{Method, Request, Response, StatusCode};
use prometheus::{Encoder, TextEncoder};
use tower_http::{classify::ServerErrorsFailureClass, trace::TraceLayer};
use tracing::{Level, Span};
use mz_ore::metric;
use mz_ore::metrics::{MetricsRegistry, UIntGauge};
#[derive(Debug)]
pub struct Metrics {
pub needs_update: UIntGauge,
}
impl Metrics {
pub fn register_into(registry: &MetricsRegistry) -> Self {
Self {
needs_update: registry.register(
metric! {
name: "needs_update",
help: "Count of organizations in this cluster which are running outdated pod templates",
}),
}
}
}
pub fn router(registry: MetricsRegistry) -> Router {
add_tracing_layer(
Router::new()
.route("/metrics", get(metrics))
.layer(Extension(registry)),
)
}
#[allow(clippy::unused_async)]
async fn metrics(Extension(registry): Extension<MetricsRegistry>) -> (StatusCode, Vec<u8>) {
let mut buf = vec![];
let encoder = TextEncoder::new();
let metric_families = registry.gather();
encoder.encode(&metric_families, &mut buf).unwrap();
(StatusCode::OK, buf)
}
/// Adds a tracing layer that reports an `INFO` level span per
/// request and reports a `WARN` event when a handler returns a
/// server error to the given Axum Router
///
/// This accepts a router instead of returning a layer itself
/// to avoid dealing with defining generics over a bunch of closures
/// (see <https://users.rust-lang.org/t/how-to-encapsulate-a-builder-that-depends-on-a-closure/71139/6>)
///
/// And this also can't be returned as a Router::new()::layer(TraceLayer)...
/// because the TraceLayer needs to be added to a Router after
/// all routes are defined, as it won't trace any routes defined
/// on the router after it's attached.
fn add_tracing_layer<S>(router: Router<S>) -> Router<S>
where
S: Clone + Send + Sync + 'static,
{
router.layer(TraceLayer::new_for_http()
.make_span_with(|request: &Request<Body>| {
// This ugly macro is needed, unfortunately (and
// copied from tower-http), because
// `tracing::span!` required the level argument to
// be static. Meaning we can't just pass
// `self.level`.
// Don't log Authorization headers
let mut headers = request.headers().clone();
_ = headers.remove(http::header::AUTHORIZATION);
macro_rules! make_span {
($level:expr) => {
tracing::span!(
$level,
"HTTP request",
"request.uri" = %request.uri(),
"request.version" = ?request.version(),
"request.method" = %request.method(),
"request.headers" = ?headers,
"response.status" = tracing::field::Empty,
"response.status_code" = tracing::field::Empty,
"response.headers" = tracing::field::Empty,
)
}
}
if request.uri().path() == "/api/health" || request.method() == Method::OPTIONS {
return make_span!(Level::DEBUG);
}
make_span!(Level::INFO)
})
.on_response(|response: &Response<Body>, _latency, span: &Span| {
span.record(
"response.status",
&tracing::field::display(response.status()),
);
span.record(
"response.status_code",
&tracing::field::display(response.status().as_u16()),
);
span.record(
"response.headers",
&tracing::field::debug(response.headers()),
);
// Emit an event at the same level as the span. For the same reason as noted in the comment
// above we can't use `tracing::event!(dynamic_level, ...)` since the level argument
// needs to be static
if span.metadata().and_then(|m| Some(m.level())).unwrap_or(&Level::DEBUG) == &Level::DEBUG {
tracing::debug!(msg = "HTTP response generated", response = ?response, status_code = response.status().as_u16());
} else {
tracing::info!(msg = "HTTP response generated", response = ?response, status_code = response.status().as_u16());
}
})
.on_failure(
|error: ServerErrorsFailureClass, _latency: Duration, _span: &Span| {
tracing::warn!(msg = "HTTP request handling error", error = ?error);
},
))
}