tower/buffer/
future.rs
1use super::{error::Closed, message};
6use futures_core::ready;
7use pin_project_lite::pin_project;
8use std::{
9 future::Future,
10 pin::Pin,
11 task::{Context, Poll},
12};
13
14pin_project! {
15 #[derive(Debug)]
17 pub struct ResponseFuture<T> {
18 #[pin]
19 state: ResponseState<T>,
20 }
21}
22
23pin_project! {
24 #[project = ResponseStateProj]
25 #[derive(Debug)]
26 enum ResponseState<T> {
27 Failed {
28 error: Option<crate::BoxError>,
29 },
30 Rx {
31 #[pin]
32 rx: message::Rx<T>,
33 },
34 Poll {
35 #[pin]
36 fut: T,
37 },
38 }
39}
40
41impl<T> ResponseFuture<T> {
42 pub(crate) fn new(rx: message::Rx<T>) -> Self {
43 ResponseFuture {
44 state: ResponseState::Rx { rx },
45 }
46 }
47
48 pub(crate) fn failed(err: crate::BoxError) -> Self {
49 ResponseFuture {
50 state: ResponseState::Failed { error: Some(err) },
51 }
52 }
53}
54
55impl<F, T, E> Future for ResponseFuture<F>
56where
57 F: Future<Output = Result<T, E>>,
58 E: Into<crate::BoxError>,
59{
60 type Output = Result<T, crate::BoxError>;
61
62 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
63 let mut this = self.project();
64
65 loop {
66 match this.state.as_mut().project() {
67 ResponseStateProj::Failed { error } => {
68 return Poll::Ready(Err(error.take().expect("polled after error")));
69 }
70 ResponseStateProj::Rx { rx } => match ready!(rx.poll(cx)) {
71 Ok(Ok(fut)) => this.state.set(ResponseState::Poll { fut }),
72 Ok(Err(e)) => return Poll::Ready(Err(e.into())),
73 Err(_) => return Poll::Ready(Err(Closed::new().into())),
74 },
75 ResponseStateProj::Poll { fut } => return fut.poll(cx).map_err(Into::into),
76 }
77 }
78 }
79}