tower/buffer/service.rs
1use super::{
2 future::ResponseFuture,
3 message::Message,
4 worker::{Handle, Worker},
5};
6
7use std::{
8 future::Future,
9 task::{Context, Poll},
10};
11use tokio::sync::{mpsc, oneshot};
12use tokio_util::sync::PollSender;
13use tower_service::Service;
14
15/// Adds an mpsc buffer in front of an inner service.
16///
17/// See the module documentation for more details.
18#[derive(Debug)]
19pub struct Buffer<Req, F> {
20 tx: PollSender<Message<Req, F>>,
21 handle: Handle,
22}
23
24impl<Req, F> Buffer<Req, F>
25where
26 F: 'static,
27{
28 /// Creates a new [`Buffer`] wrapping `service`.
29 ///
30 /// `bound` gives the maximal number of requests that can be queued for the service before
31 /// backpressure is applied to callers.
32 ///
33 /// The default Tokio executor is used to run the given service, which means that this method
34 /// must be called while on the Tokio runtime.
35 ///
36 /// # A note on choosing a `bound`
37 ///
38 /// When [`Buffer`]'s implementation of [`poll_ready`] returns [`Poll::Ready`], it reserves a
39 /// slot in the channel for the forthcoming [`call`]. However, if this call doesn't arrive,
40 /// this reserved slot may be held up for a long time. As a result, it's advisable to set
41 /// `bound` to be at least the maximum number of concurrent requests the [`Buffer`] will see.
42 /// If you do not, all the slots in the buffer may be held up by futures that have just called
43 /// [`poll_ready`] but will not issue a [`call`], which prevents other senders from issuing new
44 /// requests.
45 ///
46 /// [`Poll::Ready`]: std::task::Poll::Ready
47 /// [`call`]: crate::Service::call
48 /// [`poll_ready`]: crate::Service::poll_ready
49 pub fn new<S>(service: S, bound: usize) -> Self
50 where
51 S: Service<Req, Future = F> + Send + 'static,
52 F: Send,
53 S::Error: Into<crate::BoxError> + Send + Sync,
54 Req: Send + 'static,
55 {
56 let (service, worker) = Self::pair(service, bound);
57 tokio::spawn(worker);
58 service
59 }
60
61 /// Creates a new [`Buffer`] wrapping `service`, but returns the background worker.
62 ///
63 /// This is useful if you do not want to spawn directly onto the tokio runtime
64 /// but instead want to use your own executor. This will return the [`Buffer`] and
65 /// the background `Worker` that you can then spawn.
66 pub fn pair<S>(service: S, bound: usize) -> (Self, Worker<S, Req>)
67 where
68 S: Service<Req, Future = F> + Send + 'static,
69 F: Send,
70 S::Error: Into<crate::BoxError> + Send + Sync,
71 Req: Send + 'static,
72 {
73 let (tx, rx) = mpsc::channel(bound);
74 let (handle, worker) = Worker::new(service, rx);
75 let buffer = Self {
76 tx: PollSender::new(tx),
77 handle,
78 };
79 (buffer, worker)
80 }
81
82 fn get_worker_error(&self) -> crate::BoxError {
83 self.handle.get_error_on_closed()
84 }
85}
86
87impl<Req, Rsp, F, E> Service<Req> for Buffer<Req, F>
88where
89 F: Future<Output = Result<Rsp, E>> + Send + 'static,
90 E: Into<crate::BoxError>,
91 Req: Send + 'static,
92{
93 type Response = Rsp;
94 type Error = crate::BoxError;
95 type Future = ResponseFuture<F>;
96
97 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
98 // First, check if the worker is still alive.
99 if self.tx.is_closed() {
100 // If the inner service has errored, then we error here.
101 return Poll::Ready(Err(self.get_worker_error()));
102 }
103
104 // Poll the sender to acquire a permit.
105 self.tx
106 .poll_reserve(cx)
107 .map_err(|_| self.get_worker_error())
108 }
109
110 fn call(&mut self, request: Req) -> Self::Future {
111 tracing::trace!("sending request to buffer worker");
112
113 // get the current Span so that we can explicitly propagate it to the worker
114 // if we didn't do this, events on the worker related to this span wouldn't be counted
115 // towards that span since the worker would have no way of entering it.
116 let span = tracing::Span::current();
117
118 // If we've made it here, then a channel permit has already been
119 // acquired, so we can freely allocate a oneshot.
120 let (tx, rx) = oneshot::channel();
121
122 match self.tx.send_item(Message { request, span, tx }) {
123 Ok(_) => ResponseFuture::new(rx),
124 // If the channel is closed, propagate the error from the worker.
125 Err(_) => {
126 tracing::trace!("buffer channel closed");
127 ResponseFuture::failed(self.get_worker_error())
128 }
129 }
130 }
131}
132
133impl<Req, F> Clone for Buffer<Req, F>
134where
135 Req: Send + 'static,
136 F: Send + 'static,
137{
138 fn clone(&self) -> Self {
139 Self {
140 handle: self.handle.clone(),
141 tx: self.tx.clone(),
142 }
143 }
144}