tower/filter/
future.rs
1use super::AsyncPredicate;
4use crate::BoxError;
5use futures_core::ready;
6use pin_project_lite::pin_project;
7use std::{
8 future::Future,
9 pin::Pin,
10 task::{Context, Poll},
11};
12use tower_service::Service;
13
14pin_project! {
15 #[derive(Debug)]
19 pub struct AsyncResponseFuture<P, S, Request>
20 where
21 P: AsyncPredicate<Request>,
22 S: Service<P::Request>,
23 {
24 #[pin]
25 state: State<P::Future, S::Future>,
26
27 service: S,
29 }
30}
31
32opaque_future! {
33 pub type ResponseFuture<R, F> =
37 futures_util::future::Either<
38 futures_util::future::Ready<Result<R, crate::BoxError>>,
39 futures_util::future::ErrInto<F, crate::BoxError>
40 >;
41}
42
43pin_project! {
44 #[project = StateProj]
45 #[derive(Debug)]
46 enum State<F, G> {
47 Check {
49 #[pin]
50 check: F
51 },
52 WaitResponse {
54 #[pin]
55 response: G
56 },
57 }
58}
59
60impl<P, S, Request> AsyncResponseFuture<P, S, Request>
61where
62 P: AsyncPredicate<Request>,
63 S: Service<P::Request>,
64 S::Error: Into<BoxError>,
65{
66 pub(crate) fn new(check: P::Future, service: S) -> Self {
67 Self {
68 state: State::Check { check },
69 service,
70 }
71 }
72}
73
74impl<P, S, Request> Future for AsyncResponseFuture<P, S, Request>
75where
76 P: AsyncPredicate<Request>,
77 S: Service<P::Request>,
78 S::Error: Into<crate::BoxError>,
79{
80 type Output = Result<S::Response, crate::BoxError>;
81
82 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
83 let mut this = self.project();
84
85 loop {
86 match this.state.as_mut().project() {
87 StateProj::Check { mut check } => {
88 let request = ready!(check.as_mut().poll(cx))?;
89 let response = this.service.call(request);
90 this.state.set(State::WaitResponse { response });
91 }
92 StateProj::WaitResponse { response } => {
93 return response.poll(cx).map_err(Into::into);
94 }
95 }
96 }
97 }
98}