Skip to main content

mz_service/transport/
metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Metrics support for the Cluster Transport Protocol.
11
12use std::io;
13use std::marker::PhantomData;
14use std::pin::Pin;
15use std::task::{Context, Poll};
16use std::time::UNIX_EPOCH;
17
18use mz_ore::metric;
19use mz_ore::metrics::{DeleteOnDropGauge, MetricsRegistry, UIntGaugeVec};
20use prometheus::core::AtomicU64;
21use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
22use tracing::error;
23
24/// A trait for types that observe connection metric events.
25pub trait Metrics<Out, In>: Clone + Send + 'static {
26    /// Callback reporting numbers of bytes sent.
27    fn bytes_sent(&mut self, len: usize);
28    /// Callback reporting numbers of bytes received.
29    fn bytes_received(&mut self, len: usize);
30    /// Callback reporting messages sent.
31    fn message_sent(&mut self, msg: &Out);
32    /// Callback reporting messages received.
33    fn message_received(&mut self, msg: &In);
34}
35
36/// No-op [`Metrics`] implementation that ignores all events.
37#[derive(Clone)]
38pub struct NoopMetrics;
39
40impl<Out, In> Metrics<Out, In> for NoopMetrics {
41    fn bytes_sent(&mut self, _len: usize) {}
42    fn bytes_received(&mut self, _len: usize) {}
43    fn message_sent(&mut self, _msg: &Out) {}
44    fn message_received(&mut self, _msg: &In) {}
45}
46
47/// Metrics for a cluster (CTP) server.
48pub struct ClusterServerMetrics {
49    last_command_received: UIntGaugeVec,
50}
51
52impl ClusterServerMetrics {
53    /// Registers the cluster server metrics into a `registry`.
54    pub fn register_with(registry: &MetricsRegistry) -> Self {
55        Self {
56            last_command_received: registry.register(metric!(
57                name: "mz_cluster_server_last_command_received",
58                help: "The time (in seconds since the Unix epoch) at which the server last \
59                       received data from the controller, including CTP keepalives. Used to \
60                       detect controller connections that are no longer reachable.",
61                var_labels: ["server_name"],
62            )),
63        }
64    }
65
66    /// Returns the metrics for the server with the given name.
67    pub fn for_server(&self, name: &'static str) -> PerClusterServerMetrics {
68        PerClusterServerMetrics {
69            last_command_received: self
70                .last_command_received
71                .get_delete_on_drop_metric(vec![name]),
72        }
73    }
74}
75
76/// Metrics for a single named cluster (CTP) server.
77#[derive(Clone, Debug)]
78pub struct PerClusterServerMetrics {
79    last_command_received: DeleteOnDropGauge<AtomicU64, Vec<&'static str>>,
80}
81
82impl<Out, In> Metrics<Out, In> for PerClusterServerMetrics {
83    fn bytes_sent(&mut self, _len: usize) {}
84
85    fn bytes_received(&mut self, _len: usize) {
86        // We bump the "last command received" timestamp on any inbound bytes, not just on fully
87        // decoded command messages (`message_received`). CTP exchanges keepalives on otherwise
88        // idle connections, and those show up here but never as messages (empty keepalive frames
89        // are dropped before decoding). Counting them as activity makes this metric a
90        // connection-liveness signal: it stays fresh as long as the controller connection is
91        // healthy and only goes stale once the connection is actually broken. Bumping it only on
92        // command messages instead produces false positives on healthy-but-idle clusters that
93        // don't receive a command for a while.
94        match UNIX_EPOCH.elapsed() {
95            Ok(ts) => self.last_command_received.set(ts.as_secs()),
96            Err(e) => error!("failed to get system time: {e}"),
97        }
98    }
99
100    fn message_sent(&mut self, _msg: &Out) {}
101    fn message_received(&mut self, _msg: &In) {}
102}
103
104/// [`AsyncRead`] wrapper that transparently logs `bytes_received` metrics.
105#[pin_project::pin_project]
106pub(super) struct Reader<R, M, Out, In> {
107    #[pin]
108    reader: R,
109    metrics: M,
110    _phantom: PhantomData<(Out, In)>,
111}
112
113impl<R, M, Out, In> Reader<R, M, Out, In> {
114    pub fn new(reader: R, metrics: M) -> Self {
115        Self {
116            reader,
117            metrics,
118            _phantom: PhantomData,
119        }
120    }
121}
122
123impl<R, M, Out, In> AsyncRead for Reader<R, M, Out, In>
124where
125    R: AsyncRead,
126    M: Metrics<Out, In>,
127{
128    fn poll_read(
129        self: Pin<&mut Self>,
130        cx: &mut Context<'_>,
131        buf: &mut ReadBuf<'_>,
132    ) -> Poll<io::Result<()>> {
133        let initial_len = buf.filled().len();
134
135        let this = self.project();
136        let poll = this.reader.poll_read(cx, buf);
137
138        let len = buf.filled().len() - initial_len;
139        if len > 0 {
140            this.metrics.bytes_received(len);
141        }
142
143        poll
144    }
145}
146
147/// [`AsyncWrite`] wrapper that transparently logs `bytes_sent` metrics.
148#[pin_project::pin_project]
149pub(super) struct Writer<W, M, Out, In> {
150    #[pin]
151    writer: W,
152    metrics: M,
153    _phantom: PhantomData<(Out, In)>,
154}
155
156impl<W, M, Out, In> Writer<W, M, Out, In> {
157    pub fn new(writer: W, metrics: M) -> Self {
158        Self {
159            writer,
160            metrics,
161            _phantom: PhantomData,
162        }
163    }
164}
165
166impl<W, M, Out, In> AsyncWrite for Writer<W, M, Out, In>
167where
168    W: AsyncWrite,
169    M: Metrics<Out, In>,
170{
171    fn poll_write(
172        self: Pin<&mut Self>,
173        cx: &mut Context<'_>,
174        buf: &[u8],
175    ) -> Poll<Result<usize, io::Error>> {
176        let this = self.project();
177        let poll = this.writer.poll_write(cx, buf);
178
179        if let Poll::Ready(Ok(len)) = &poll
180            && *len > 0
181        {
182            this.metrics.bytes_sent(*len);
183        }
184
185        poll
186    }
187
188    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
189        self.project().writer.poll_flush(cx)
190    }
191
192    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
193        self.project().writer.poll_shutdown(cx)
194    }
195}