tower/buffer/
future.rs

1//! Future types for the [`Buffer`] middleware.
2//!
3//! [`Buffer`]: crate::buffer::Buffer
4
5use super::{error::Closed, message};
6use futures_core::ready;
7use pin_project_lite::pin_project;
8use std::{
9    future::Future,
10    pin::Pin,
11    task::{Context, Poll},
12};
13
14pin_project! {
15    /// Future that completes when the buffered service eventually services the submitted request.
16    #[derive(Debug)]
17    pub struct ResponseFuture<T> {
18        #[pin]
19        state: ResponseState<T>,
20    }
21}
22
23pin_project! {
24    #[project = ResponseStateProj]
25    #[derive(Debug)]
26    enum ResponseState<T> {
27        Failed {
28            error: Option<crate::BoxError>,
29        },
30        Rx {
31            #[pin]
32            rx: message::Rx<T>,
33        },
34        Poll {
35            #[pin]
36            fut: T,
37        },
38    }
39}
40
41impl<T> ResponseFuture<T> {
42    pub(crate) fn new(rx: message::Rx<T>) -> Self {
43        ResponseFuture {
44            state: ResponseState::Rx { rx },
45        }
46    }
47
48    pub(crate) fn failed(err: crate::BoxError) -> Self {
49        ResponseFuture {
50            state: ResponseState::Failed { error: Some(err) },
51        }
52    }
53}
54
55impl<F, T, E> Future for ResponseFuture<F>
56where
57    F: Future<Output = Result<T, E>>,
58    E: Into<crate::BoxError>,
59{
60    type Output = Result<T, crate::BoxError>;
61
62    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
63        let mut this = self.project();
64
65        loop {
66            match this.state.as_mut().project() {
67                ResponseStateProj::Failed { error } => {
68                    return Poll::Ready(Err(error.take().expect("polled after error")));
69                }
70                ResponseStateProj::Rx { rx } => match ready!(rx.poll(cx)) {
71                    Ok(Ok(fut)) => this.state.set(ResponseState::Poll { fut }),
72                    Ok(Err(e)) => return Poll::Ready(Err(e.into())),
73                    Err(_) => return Poll::Ready(Err(Closed::new().into())),
74                },
75                ResponseStateProj::Poll { fut } => return fut.poll(cx).map_err(Into::into),
76            }
77        }
78    }
79}