tonic/transport/channel/service/
connection.rs
1use super::{AddOrigin, Reconnect, SharedExec, UserAgent};
2use crate::{
3 body::{boxed, BoxBody},
4 transport::{channel::BoxFuture, service::GrpcTimeout, Endpoint},
5};
6use http::{Request, Response, Uri};
7use hyper::rt;
8use hyper::{client::conn::http2::Builder, rt::Executor};
9use hyper_util::rt::TokioTimer;
10use std::{
11 fmt,
12 task::{Context, Poll},
13};
14use tower::load::Load;
15use tower::{
16 layer::Layer,
17 limit::{concurrency::ConcurrencyLimitLayer, rate::RateLimitLayer},
18 util::BoxService,
19 ServiceBuilder, ServiceExt,
20};
21use tower_service::Service;
22
23pub(crate) struct Connection {
24 inner: BoxService<Request<BoxBody>, Response<BoxBody>, crate::Error>,
25}
26
27impl Connection {
28 fn new<C>(connector: C, endpoint: Endpoint, is_lazy: bool) -> Self
29 where
30 C: Service<Uri> + Send + 'static,
31 C::Error: Into<crate::Error> + Send,
32 C::Future: Send,
33 C::Response: rt::Read + rt::Write + Unpin + Send + 'static,
34 {
35 let mut settings: Builder<SharedExec> = Builder::new(endpoint.executor.clone())
36 .initial_stream_window_size(endpoint.init_stream_window_size)
37 .initial_connection_window_size(endpoint.init_connection_window_size)
38 .keep_alive_interval(endpoint.http2_keep_alive_interval)
39 .timer(TokioTimer::new())
40 .clone();
41
42 if let Some(val) = endpoint.http2_keep_alive_timeout {
43 settings.keep_alive_timeout(val);
44 }
45
46 if let Some(val) = endpoint.http2_keep_alive_while_idle {
47 settings.keep_alive_while_idle(val);
48 }
49
50 if let Some(val) = endpoint.http2_adaptive_window {
51 settings.adaptive_window(val);
52 }
53
54 if let Some(val) = endpoint.http2_max_header_list_size {
55 settings.max_header_list_size(val);
56 }
57
58 let stack = ServiceBuilder::new()
59 .layer_fn(|s| {
60 let origin = endpoint.origin.as_ref().unwrap_or(&endpoint.uri).clone();
61
62 AddOrigin::new(s, origin)
63 })
64 .layer_fn(|s| UserAgent::new(s, endpoint.user_agent.clone()))
65 .layer_fn(|s| GrpcTimeout::new(s, endpoint.timeout))
66 .option_layer(endpoint.concurrency_limit.map(ConcurrencyLimitLayer::new))
67 .option_layer(endpoint.rate_limit.map(|(l, d)| RateLimitLayer::new(l, d)))
68 .into_inner();
69
70 let make_service =
71 MakeSendRequestService::new(connector, endpoint.executor.clone(), settings);
72
73 let conn = Reconnect::new(make_service, endpoint.uri.clone(), is_lazy);
74
75 Self {
76 inner: BoxService::new(stack.layer(conn)),
77 }
78 }
79
80 pub(crate) async fn connect<C>(connector: C, endpoint: Endpoint) -> Result<Self, crate::Error>
81 where
82 C: Service<Uri> + Send + 'static,
83 C::Error: Into<crate::Error> + Send,
84 C::Future: Unpin + Send,
85 C::Response: rt::Read + rt::Write + Unpin + Send + 'static,
86 {
87 Self::new(connector, endpoint, false).ready_oneshot().await
88 }
89
90 pub(crate) fn lazy<C>(connector: C, endpoint: Endpoint) -> Self
91 where
92 C: Service<Uri> + Send + 'static,
93 C::Error: Into<crate::Error> + Send,
94 C::Future: Send,
95 C::Response: rt::Read + rt::Write + Unpin + Send + 'static,
96 {
97 Self::new(connector, endpoint, true)
98 }
99}
100
101impl Service<Request<BoxBody>> for Connection {
102 type Response = Response<BoxBody>;
103 type Error = crate::Error;
104 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
105
106 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
107 Service::poll_ready(&mut self.inner, cx).map_err(Into::into)
108 }
109
110 fn call(&mut self, req: Request<BoxBody>) -> Self::Future {
111 self.inner.call(req)
112 }
113}
114
115impl Load for Connection {
116 type Metric = usize;
117
118 fn load(&self) -> Self::Metric {
119 0
120 }
121}
122
123impl fmt::Debug for Connection {
124 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125 f.debug_struct("Connection").finish()
126 }
127}
128
129struct SendRequest {
130 inner: hyper::client::conn::http2::SendRequest<BoxBody>,
131}
132
133impl From<hyper::client::conn::http2::SendRequest<BoxBody>> for SendRequest {
134 fn from(inner: hyper::client::conn::http2::SendRequest<BoxBody>) -> Self {
135 Self { inner }
136 }
137}
138
139impl tower::Service<Request<BoxBody>> for SendRequest {
140 type Response = Response<BoxBody>;
141 type Error = crate::Error;
142 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
143
144 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
145 self.inner.poll_ready(cx).map_err(Into::into)
146 }
147
148 fn call(&mut self, req: Request<BoxBody>) -> Self::Future {
149 let fut = self.inner.send_request(req);
150
151 Box::pin(async move { fut.await.map_err(Into::into).map(|res| res.map(boxed)) })
152 }
153}
154
155struct MakeSendRequestService<C> {
156 connector: C,
157 executor: SharedExec,
158 settings: Builder<SharedExec>,
159}
160
161impl<C> MakeSendRequestService<C> {
162 fn new(connector: C, executor: SharedExec, settings: Builder<SharedExec>) -> Self {
163 Self {
164 connector,
165 executor,
166 settings,
167 }
168 }
169}
170
171impl<C> tower::Service<Uri> for MakeSendRequestService<C>
172where
173 C: Service<Uri> + Send + 'static,
174 C::Error: Into<crate::Error> + Send,
175 C::Future: Send,
176 C::Response: rt::Read + rt::Write + Unpin + Send,
177{
178 type Response = SendRequest;
179 type Error = crate::Error;
180 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
181
182 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
183 self.connector.poll_ready(cx).map_err(Into::into)
184 }
185
186 fn call(&mut self, req: Uri) -> Self::Future {
187 let fut = self.connector.call(req);
188 let builder = self.settings.clone();
189 let executor = self.executor.clone();
190
191 Box::pin(async move {
192 let io = fut.await.map_err(Into::into)?;
193 let (send_request, conn) = builder.handshake(io).await?;
194
195 Executor::<BoxFuture<'static, ()>>::execute(
196 &executor,
197 Box::pin(async move {
198 if let Err(e) = conn.await {
199 tracing::debug!("connection task error: {:?}", e);
200 }
201 }) as _,
202 );
203
204 Ok(SendRequest::from(send_request))
205 })
206 }
207}