mz_service/transport/
metrics.rs1use std::io;
13use std::marker::PhantomData;
14use std::pin::Pin;
15use std::task::{Context, Poll};
16
17use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
18
19pub trait Metrics<Out, In>: Clone + Send + 'static {
21 fn bytes_sent(&mut self, len: usize);
23 fn bytes_received(&mut self, len: usize);
25 fn message_sent(&mut self, msg: &Out);
27 fn message_received(&mut self, msg: &In);
29}
30
31#[derive(Clone)]
33pub struct NoopMetrics;
34
35impl<Out, In> Metrics<Out, In> for NoopMetrics {
36 fn bytes_sent(&mut self, _len: usize) {}
37 fn bytes_received(&mut self, _len: usize) {}
38 fn message_sent(&mut self, _msg: &Out) {}
39 fn message_received(&mut self, _msg: &In) {}
40}
41
42#[pin_project::pin_project]
44pub(super) struct Reader<R, M, Out, In> {
45 #[pin]
46 reader: R,
47 metrics: M,
48 _phantom: PhantomData<(Out, In)>,
49}
50
51impl<R, M, Out, In> Reader<R, M, Out, In> {
52 pub fn new(reader: R, metrics: M) -> Self {
53 Self {
54 reader,
55 metrics,
56 _phantom: PhantomData,
57 }
58 }
59}
60
61impl<R, M, Out, In> AsyncRead for Reader<R, M, Out, In>
62where
63 R: AsyncRead,
64 M: Metrics<Out, In>,
65{
66 fn poll_read(
67 self: Pin<&mut Self>,
68 cx: &mut Context<'_>,
69 buf: &mut ReadBuf<'_>,
70 ) -> Poll<io::Result<()>> {
71 let initial_len = buf.filled().len();
72
73 let this = self.project();
74 let poll = this.reader.poll_read(cx, buf);
75
76 let len = buf.filled().len() - initial_len;
77 if len > 0 {
78 this.metrics.bytes_received(len);
79 }
80
81 poll
82 }
83}
84
85#[pin_project::pin_project]
87pub(super) struct Writer<W, M, Out, In> {
88 #[pin]
89 writer: W,
90 metrics: M,
91 _phantom: PhantomData<(Out, In)>,
92}
93
94impl<W, M, Out, In> Writer<W, M, Out, In> {
95 pub fn new(writer: W, metrics: M) -> Self {
96 Self {
97 writer,
98 metrics,
99 _phantom: PhantomData,
100 }
101 }
102}
103
104impl<W, M, Out, In> AsyncWrite for Writer<W, M, Out, In>
105where
106 W: AsyncWrite,
107 M: Metrics<Out, In>,
108{
109 fn poll_write(
110 self: Pin<&mut Self>,
111 cx: &mut Context<'_>,
112 buf: &[u8],
113 ) -> Poll<Result<usize, io::Error>> {
114 let this = self.project();
115 let poll = this.writer.poll_write(cx, buf);
116
117 if let Poll::Ready(Ok(len)) = &poll
118 && *len > 0
119 {
120 this.metrics.bytes_sent(*len);
121 }
122
123 poll
124 }
125
126 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
127 self.project().writer.poll_flush(cx)
128 }
129
130 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
131 self.project().writer.poll_shutdown(cx)
132 }
133}