mz_service/transport/
metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Metrics support for the Cluster Transport Protocol.
11
12use std::io;
13use std::marker::PhantomData;
14use std::pin::Pin;
15use std::task::{Context, Poll};
16
17use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
18
19/// A trait for types that observe connection metric events.
20pub trait Metrics<Out, In>: Clone + Send + 'static {
21    /// Callback reporting numbers of bytes sent.
22    fn bytes_sent(&mut self, len: usize);
23    /// Callback reporting numbers of bytes received.
24    fn bytes_received(&mut self, len: usize);
25    /// Callback reporting messages sent.
26    fn message_sent(&mut self, msg: &Out);
27    /// Callback reporting messages received.
28    fn message_received(&mut self, msg: &In);
29}
30
31/// No-op [`Metrics`] implementation that ignores all events.
32#[derive(Clone)]
33pub struct NoopMetrics;
34
35impl<Out, In> Metrics<Out, In> for NoopMetrics {
36    fn bytes_sent(&mut self, _len: usize) {}
37    fn bytes_received(&mut self, _len: usize) {}
38    fn message_sent(&mut self, _msg: &Out) {}
39    fn message_received(&mut self, _msg: &In) {}
40}
41
42/// [`AsyncRead`] wrapper that transparently logs `bytes_received` metrics.
43#[pin_project::pin_project]
44pub(super) struct Reader<R, M, Out, In> {
45    #[pin]
46    reader: R,
47    metrics: M,
48    _phantom: PhantomData<(Out, In)>,
49}
50
51impl<R, M, Out, In> Reader<R, M, Out, In> {
52    pub fn new(reader: R, metrics: M) -> Self {
53        Self {
54            reader,
55            metrics,
56            _phantom: PhantomData,
57        }
58    }
59}
60
61impl<R, M, Out, In> AsyncRead for Reader<R, M, Out, In>
62where
63    R: AsyncRead,
64    M: Metrics<Out, In>,
65{
66    fn poll_read(
67        self: Pin<&mut Self>,
68        cx: &mut Context<'_>,
69        buf: &mut ReadBuf<'_>,
70    ) -> Poll<io::Result<()>> {
71        let initial_len = buf.filled().len();
72
73        let this = self.project();
74        let poll = this.reader.poll_read(cx, buf);
75
76        let len = buf.filled().len() - initial_len;
77        if len > 0 {
78            this.metrics.bytes_received(len);
79        }
80
81        poll
82    }
83}
84
85/// [`AsyncWrite`] wrapper that transparently logs `bytes_sent` metrics.
86#[pin_project::pin_project]
87pub(super) struct Writer<W, M, Out, In> {
88    #[pin]
89    writer: W,
90    metrics: M,
91    _phantom: PhantomData<(Out, In)>,
92}
93
94impl<W, M, Out, In> Writer<W, M, Out, In> {
95    pub fn new(writer: W, metrics: M) -> Self {
96        Self {
97            writer,
98            metrics,
99            _phantom: PhantomData,
100        }
101    }
102}
103
104impl<W, M, Out, In> AsyncWrite for Writer<W, M, Out, In>
105where
106    W: AsyncWrite,
107    M: Metrics<Out, In>,
108{
109    fn poll_write(
110        self: Pin<&mut Self>,
111        cx: &mut Context<'_>,
112        buf: &[u8],
113    ) -> Poll<Result<usize, io::Error>> {
114        let this = self.project();
115        let poll = this.writer.poll_write(cx, buf);
116
117        if let Poll::Ready(Ok(len)) = &poll
118            && *len > 0
119        {
120            this.metrics.bytes_sent(*len);
121        }
122
123        poll
124    }
125
126    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
127        self.project().writer.poll_flush(cx)
128    }
129
130    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
131        self.project().writer.poll_shutdown(cx)
132    }
133}