tonic/transport/channel/service/
reconnect.rs

1use crate::Error;
2use pin_project::pin_project;
3use std::fmt;
4use std::{
5    future::Future,
6    pin::Pin,
7    task::{Context, Poll},
8};
9use tower::make::MakeService;
10use tower_service::Service;
11use tracing::trace;
12
13pub(crate) struct Reconnect<M, Target>
14where
15    M: Service<Target>,
16    M::Error: Into<Error>,
17{
18    mk_service: M,
19    state: State<M::Future, M::Response>,
20    target: Target,
21    error: Option<crate::Error>,
22    has_been_connected: bool,
23    is_lazy: bool,
24}
25
26#[derive(Debug)]
27enum State<F, S> {
28    Idle,
29    Connecting(F),
30    Connected(S),
31}
32
33impl<M, Target> Reconnect<M, Target>
34where
35    M: Service<Target>,
36    M::Error: Into<Error>,
37{
38    pub(crate) fn new(mk_service: M, target: Target, is_lazy: bool) -> Self {
39        Reconnect {
40            mk_service,
41            state: State::Idle,
42            target,
43            error: None,
44            has_been_connected: false,
45            is_lazy,
46        }
47    }
48}
49
50impl<M, Target, S, Request> Service<Request> for Reconnect<M, Target>
51where
52    M: Service<Target, Response = S>,
53    S: Service<Request>,
54    M::Future: Unpin,
55    Error: From<M::Error> + From<S::Error>,
56    Target: Clone,
57    <M as tower_service::Service<Target>>::Error: Into<crate::Error>,
58{
59    type Response = S::Response;
60    type Error = Error;
61    type Future = ResponseFuture<S::Future>;
62
63    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
64        let mut state;
65
66        if self.error.is_some() {
67            return Poll::Ready(Ok(()));
68        }
69
70        loop {
71            match self.state {
72                State::Idle => {
73                    trace!("poll_ready; idle");
74                    match self.mk_service.poll_ready(cx) {
75                        Poll::Ready(r) => r?,
76                        Poll::Pending => {
77                            trace!("poll_ready; MakeService not ready");
78                            return Poll::Pending;
79                        }
80                    }
81
82                    let fut = self.mk_service.make_service(self.target.clone());
83                    self.state = State::Connecting(fut);
84                    continue;
85                }
86                State::Connecting(ref mut f) => {
87                    trace!("poll_ready; connecting");
88                    match Pin::new(f).poll(cx) {
89                        Poll::Ready(Ok(service)) => {
90                            state = State::Connected(service);
91                        }
92                        Poll::Pending => {
93                            trace!("poll_ready; not ready");
94                            return Poll::Pending;
95                        }
96                        Poll::Ready(Err(e)) => {
97                            trace!("poll_ready; error");
98
99                            state = State::Idle;
100
101                            if !(self.has_been_connected || self.is_lazy) {
102                                return Poll::Ready(Err(e.into()));
103                            } else {
104                                let error = e.into();
105                                tracing::debug!("reconnect::poll_ready: {:?}", error);
106                                self.error = Some(error);
107                                break;
108                            }
109                        }
110                    }
111                }
112                State::Connected(ref mut inner) => {
113                    trace!("poll_ready; connected");
114
115                    self.has_been_connected = true;
116
117                    match inner.poll_ready(cx) {
118                        Poll::Ready(Ok(())) => {
119                            trace!("poll_ready; ready");
120                            return Poll::Ready(Ok(()));
121                        }
122                        Poll::Pending => {
123                            trace!("poll_ready; not ready");
124                            return Poll::Pending;
125                        }
126                        Poll::Ready(Err(_)) => {
127                            trace!("poll_ready; error");
128                            state = State::Idle;
129                        }
130                    }
131                }
132            }
133
134            self.state = state;
135        }
136
137        self.state = state;
138        Poll::Ready(Ok(()))
139    }
140
141    fn call(&mut self, request: Request) -> Self::Future {
142        tracing::trace!("Reconnect::call");
143        if let Some(error) = self.error.take() {
144            tracing::debug!("error: {}", error);
145            return ResponseFuture::error(error);
146        }
147
148        let State::Connected(service) = &mut self.state else {
149            panic!("service not ready; poll_ready must be called first");
150        };
151
152        let fut = service.call(request);
153        ResponseFuture::new(fut)
154    }
155}
156
157impl<M, Target> fmt::Debug for Reconnect<M, Target>
158where
159    M: Service<Target> + fmt::Debug,
160    M::Future: fmt::Debug,
161    M::Response: fmt::Debug,
162    Target: fmt::Debug,
163    <M as tower_service::Service<Target>>::Error: Into<Error>,
164{
165    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
166        fmt.debug_struct("Reconnect")
167            .field("mk_service", &self.mk_service)
168            .field("state", &self.state)
169            .field("target", &self.target)
170            .finish()
171    }
172}
173
174/// Future that resolves to the response or failure to connect.
175#[pin_project]
176#[derive(Debug)]
177pub(crate) struct ResponseFuture<F> {
178    #[pin]
179    inner: Inner<F>,
180}
181
182#[pin_project(project = InnerProj)]
183#[derive(Debug)]
184enum Inner<F> {
185    Future(#[pin] F),
186    Error(Option<crate::Error>),
187}
188
189impl<F> ResponseFuture<F> {
190    pub(crate) fn new(inner: F) -> Self {
191        ResponseFuture {
192            inner: Inner::Future(inner),
193        }
194    }
195
196    pub(crate) fn error(error: crate::Error) -> Self {
197        ResponseFuture {
198            inner: Inner::Error(Some(error)),
199        }
200    }
201}
202
203impl<F, T, E> Future for ResponseFuture<F>
204where
205    F: Future<Output = Result<T, E>>,
206    E: Into<Error>,
207{
208    type Output = Result<T, Error>;
209
210    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
211        //self.project().inner.poll(cx).map_err(Into::into)
212        let me = self.project();
213        match me.inner.project() {
214            InnerProj::Future(fut) => fut.poll(cx).map_err(Into::into),
215            InnerProj::Error(e) => {
216                let e = e.take().expect("Polled after ready.");
217                Poll::Ready(Err(e))
218            }
219        }
220    }
221}