1use super::{Policy, Retry};
4use futures_core::ready;
5use pin_project_lite::pin_project;
6use std::future::Future;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9use tower_service::Service;
10
11pin_project! {
12 #[derive(Debug)]
14 pub struct ResponseFuture<P, S, Request>
15 where
16 P: Policy<Request, S::Response, S::Error>,
17 S: Service<Request>,
18 {
19 request: Option<Request>,
20 #[pin]
21 retry: Retry<P, S>,
22 #[pin]
23 state: State<S::Future, P::Future>,
24 }
25}
26
27pin_project! {
28 #[project = StateProj]
29 #[derive(Debug)]
30 enum State<F, P> {
31 Called {
33 #[pin]
34 future: F
35 },
36 Waiting {
38 #[pin]
39 waiting: P
40 },
41 Retrying,
43 }
44}
45
46impl<P, S, Request> ResponseFuture<P, S, Request>
47where
48 P: Policy<Request, S::Response, S::Error>,
49 S: Service<Request>,
50{
51 pub(crate) fn new(
52 request: Option<Request>,
53 retry: Retry<P, S>,
54 future: S::Future,
55 ) -> ResponseFuture<P, S, Request> {
56 ResponseFuture {
57 request,
58 retry,
59 state: State::Called { future },
60 }
61 }
62}
63
64impl<P, S, Request> Future for ResponseFuture<P, S, Request>
65where
66 P: Policy<Request, S::Response, S::Error>,
67 S: Service<Request>,
68{
69 type Output = Result<S::Response, S::Error>;
70
71 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
72 let mut this = self.project();
73
74 loop {
75 match this.state.as_mut().project() {
76 StateProj::Called { future } => {
77 let mut result = ready!(future.poll(cx));
78 if let Some(req) = &mut this.request {
79 match this.retry.policy.retry(req, &mut result) {
80 Some(waiting) => {
81 this.state.set(State::Waiting { waiting });
82 }
83 None => return Poll::Ready(result),
84 }
85 } else {
86 return Poll::Ready(result);
88 }
89 }
90 StateProj::Waiting { waiting } => {
91 ready!(waiting.poll(cx));
92
93 this.state.set(State::Retrying);
94 }
95 StateProj::Retrying => {
96 ready!(this.retry.as_mut().project().service.poll_ready(cx))?;
108 let req = this
109 .request
110 .take()
111 .expect("retrying requires cloned request");
112 *this.request = this.retry.policy.clone_request(&req);
113 this.state.set(State::Called {
114 future: this.retry.as_mut().project().service.call(req),
115 });
116 }
117 }
118 }
119 }
120}