1use std::io;
13use std::marker::PhantomData;
14use std::pin::Pin;
15use std::task::{Context, Poll};
16use std::time::UNIX_EPOCH;
17
18use mz_ore::metric;
19use mz_ore::metrics::{DeleteOnDropGauge, MetricsRegistry, UIntGaugeVec};
20use prometheus::core::AtomicU64;
21use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
22use tracing::error;
23
24pub trait Metrics<Out, In>: Clone + Send + 'static {
26 fn bytes_sent(&mut self, len: usize);
28 fn bytes_received(&mut self, len: usize);
30 fn message_sent(&mut self, msg: &Out);
32 fn message_received(&mut self, msg: &In);
34}
35
36#[derive(Clone)]
38pub struct NoopMetrics;
39
40impl<Out, In> Metrics<Out, In> for NoopMetrics {
41 fn bytes_sent(&mut self, _len: usize) {}
42 fn bytes_received(&mut self, _len: usize) {}
43 fn message_sent(&mut self, _msg: &Out) {}
44 fn message_received(&mut self, _msg: &In) {}
45}
46
47pub struct ClusterServerMetrics {
49 last_command_received: UIntGaugeVec,
50}
51
52impl ClusterServerMetrics {
53 pub fn register_with(registry: &MetricsRegistry) -> Self {
55 Self {
56 last_command_received: registry.register(metric!(
57 name: "mz_cluster_server_last_command_received",
58 help: "The time (in seconds since the Unix epoch) at which the server last \
59 received data from the controller, including CTP keepalives. Used to \
60 detect controller connections that are no longer reachable.",
61 var_labels: ["server_name"],
62 )),
63 }
64 }
65
66 pub fn for_server(&self, name: &'static str) -> PerClusterServerMetrics {
68 PerClusterServerMetrics {
69 last_command_received: self
70 .last_command_received
71 .get_delete_on_drop_metric(vec![name]),
72 }
73 }
74}
75
76#[derive(Clone, Debug)]
78pub struct PerClusterServerMetrics {
79 last_command_received: DeleteOnDropGauge<AtomicU64, Vec<&'static str>>,
80}
81
82impl<Out, In> Metrics<Out, In> for PerClusterServerMetrics {
83 fn bytes_sent(&mut self, _len: usize) {}
84
85 fn bytes_received(&mut self, _len: usize) {
86 match UNIX_EPOCH.elapsed() {
95 Ok(ts) => self.last_command_received.set(ts.as_secs()),
96 Err(e) => error!("failed to get system time: {e}"),
97 }
98 }
99
100 fn message_sent(&mut self, _msg: &Out) {}
101 fn message_received(&mut self, _msg: &In) {}
102}
103
104#[pin_project::pin_project]
106pub(super) struct Reader<R, M, Out, In> {
107 #[pin]
108 reader: R,
109 metrics: M,
110 _phantom: PhantomData<(Out, In)>,
111}
112
113impl<R, M, Out, In> Reader<R, M, Out, In> {
114 pub fn new(reader: R, metrics: M) -> Self {
115 Self {
116 reader,
117 metrics,
118 _phantom: PhantomData,
119 }
120 }
121}
122
123impl<R, M, Out, In> AsyncRead for Reader<R, M, Out, In>
124where
125 R: AsyncRead,
126 M: Metrics<Out, In>,
127{
128 fn poll_read(
129 self: Pin<&mut Self>,
130 cx: &mut Context<'_>,
131 buf: &mut ReadBuf<'_>,
132 ) -> Poll<io::Result<()>> {
133 let initial_len = buf.filled().len();
134
135 let this = self.project();
136 let poll = this.reader.poll_read(cx, buf);
137
138 let len = buf.filled().len() - initial_len;
139 if len > 0 {
140 this.metrics.bytes_received(len);
141 }
142
143 poll
144 }
145}
146
147#[pin_project::pin_project]
149pub(super) struct Writer<W, M, Out, In> {
150 #[pin]
151 writer: W,
152 metrics: M,
153 _phantom: PhantomData<(Out, In)>,
154}
155
156impl<W, M, Out, In> Writer<W, M, Out, In> {
157 pub fn new(writer: W, metrics: M) -> Self {
158 Self {
159 writer,
160 metrics,
161 _phantom: PhantomData,
162 }
163 }
164}
165
166impl<W, M, Out, In> AsyncWrite for Writer<W, M, Out, In>
167where
168 W: AsyncWrite,
169 M: Metrics<Out, In>,
170{
171 fn poll_write(
172 self: Pin<&mut Self>,
173 cx: &mut Context<'_>,
174 buf: &[u8],
175 ) -> Poll<Result<usize, io::Error>> {
176 let this = self.project();
177 let poll = this.writer.poll_write(cx, buf);
178
179 if let Poll::Ready(Ok(len)) = &poll
180 && *len > 0
181 {
182 this.metrics.bytes_sent(*len);
183 }
184
185 poll
186 }
187
188 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
189 self.project().writer.poll_flush(cx)
190 }
191
192 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
193 self.project().writer.poll_shutdown(cx)
194 }
195}