tonic/transport/channel/service/
reconnect.rs
1use crate::Error;
2use pin_project::pin_project;
3use std::fmt;
4use std::{
5 future::Future,
6 pin::Pin,
7 task::{Context, Poll},
8};
9use tower::make::MakeService;
10use tower_service::Service;
11use tracing::trace;
12
13pub(crate) struct Reconnect<M, Target>
14where
15 M: Service<Target>,
16 M::Error: Into<Error>,
17{
18 mk_service: M,
19 state: State<M::Future, M::Response>,
20 target: Target,
21 error: Option<crate::Error>,
22 has_been_connected: bool,
23 is_lazy: bool,
24}
25
26#[derive(Debug)]
27enum State<F, S> {
28 Idle,
29 Connecting(F),
30 Connected(S),
31}
32
33impl<M, Target> Reconnect<M, Target>
34where
35 M: Service<Target>,
36 M::Error: Into<Error>,
37{
38 pub(crate) fn new(mk_service: M, target: Target, is_lazy: bool) -> Self {
39 Reconnect {
40 mk_service,
41 state: State::Idle,
42 target,
43 error: None,
44 has_been_connected: false,
45 is_lazy,
46 }
47 }
48}
49
50impl<M, Target, S, Request> Service<Request> for Reconnect<M, Target>
51where
52 M: Service<Target, Response = S>,
53 S: Service<Request>,
54 M::Future: Unpin,
55 Error: From<M::Error> + From<S::Error>,
56 Target: Clone,
57 <M as tower_service::Service<Target>>::Error: Into<crate::Error>,
58{
59 type Response = S::Response;
60 type Error = Error;
61 type Future = ResponseFuture<S::Future>;
62
63 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
64 let mut state;
65
66 if self.error.is_some() {
67 return Poll::Ready(Ok(()));
68 }
69
70 loop {
71 match self.state {
72 State::Idle => {
73 trace!("poll_ready; idle");
74 match self.mk_service.poll_ready(cx) {
75 Poll::Ready(r) => r?,
76 Poll::Pending => {
77 trace!("poll_ready; MakeService not ready");
78 return Poll::Pending;
79 }
80 }
81
82 let fut = self.mk_service.make_service(self.target.clone());
83 self.state = State::Connecting(fut);
84 continue;
85 }
86 State::Connecting(ref mut f) => {
87 trace!("poll_ready; connecting");
88 match Pin::new(f).poll(cx) {
89 Poll::Ready(Ok(service)) => {
90 state = State::Connected(service);
91 }
92 Poll::Pending => {
93 trace!("poll_ready; not ready");
94 return Poll::Pending;
95 }
96 Poll::Ready(Err(e)) => {
97 trace!("poll_ready; error");
98
99 state = State::Idle;
100
101 if !(self.has_been_connected || self.is_lazy) {
102 return Poll::Ready(Err(e.into()));
103 } else {
104 let error = e.into();
105 tracing::debug!("reconnect::poll_ready: {:?}", error);
106 self.error = Some(error);
107 break;
108 }
109 }
110 }
111 }
112 State::Connected(ref mut inner) => {
113 trace!("poll_ready; connected");
114
115 self.has_been_connected = true;
116
117 match inner.poll_ready(cx) {
118 Poll::Ready(Ok(())) => {
119 trace!("poll_ready; ready");
120 return Poll::Ready(Ok(()));
121 }
122 Poll::Pending => {
123 trace!("poll_ready; not ready");
124 return Poll::Pending;
125 }
126 Poll::Ready(Err(_)) => {
127 trace!("poll_ready; error");
128 state = State::Idle;
129 }
130 }
131 }
132 }
133
134 self.state = state;
135 }
136
137 self.state = state;
138 Poll::Ready(Ok(()))
139 }
140
141 fn call(&mut self, request: Request) -> Self::Future {
142 tracing::trace!("Reconnect::call");
143 if let Some(error) = self.error.take() {
144 tracing::debug!("error: {}", error);
145 return ResponseFuture::error(error);
146 }
147
148 let State::Connected(service) = &mut self.state else {
149 panic!("service not ready; poll_ready must be called first");
150 };
151
152 let fut = service.call(request);
153 ResponseFuture::new(fut)
154 }
155}
156
157impl<M, Target> fmt::Debug for Reconnect<M, Target>
158where
159 M: Service<Target> + fmt::Debug,
160 M::Future: fmt::Debug,
161 M::Response: fmt::Debug,
162 Target: fmt::Debug,
163 <M as tower_service::Service<Target>>::Error: Into<Error>,
164{
165 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
166 fmt.debug_struct("Reconnect")
167 .field("mk_service", &self.mk_service)
168 .field("state", &self.state)
169 .field("target", &self.target)
170 .finish()
171 }
172}
173
174#[pin_project]
176#[derive(Debug)]
177pub(crate) struct ResponseFuture<F> {
178 #[pin]
179 inner: Inner<F>,
180}
181
182#[pin_project(project = InnerProj)]
183#[derive(Debug)]
184enum Inner<F> {
185 Future(#[pin] F),
186 Error(Option<crate::Error>),
187}
188
189impl<F> ResponseFuture<F> {
190 pub(crate) fn new(inner: F) -> Self {
191 ResponseFuture {
192 inner: Inner::Future(inner),
193 }
194 }
195
196 pub(crate) fn error(error: crate::Error) -> Self {
197 ResponseFuture {
198 inner: Inner::Error(Some(error)),
199 }
200 }
201}
202
203impl<F, T, E> Future for ResponseFuture<F>
204where
205 F: Future<Output = Result<T, E>>,
206 E: Into<Error>,
207{
208 type Output = Result<T, Error>;
209
210 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
211 let me = self.project();
213 match me.inner.project() {
214 InnerProj::Future(fut) => fut.poll(cx).map_err(Into::into),
215 InnerProj::Error(e) => {
216 let e = e.take().expect("Polled after ready.");
217 Poll::Ready(Err(e))
218 }
219 }
220 }
221}