tonic/transport/channel/service/
connection.rs

1use super::{AddOrigin, Reconnect, SharedExec, UserAgent};
2use crate::{
3    body::{boxed, BoxBody},
4    transport::{channel::BoxFuture, service::GrpcTimeout, Endpoint},
5};
6use http::{Request, Response, Uri};
7use hyper::rt;
8use hyper::{client::conn::http2::Builder, rt::Executor};
9use hyper_util::rt::TokioTimer;
10use std::{
11    fmt,
12    task::{Context, Poll},
13};
14use tower::load::Load;
15use tower::{
16    layer::Layer,
17    limit::{concurrency::ConcurrencyLimitLayer, rate::RateLimitLayer},
18    util::BoxService,
19    ServiceBuilder, ServiceExt,
20};
21use tower_service::Service;
22
23pub(crate) struct Connection {
24    inner: BoxService<Request<BoxBody>, Response<BoxBody>, crate::Error>,
25}
26
27impl Connection {
28    fn new<C>(connector: C, endpoint: Endpoint, is_lazy: bool) -> Self
29    where
30        C: Service<Uri> + Send + 'static,
31        C::Error: Into<crate::Error> + Send,
32        C::Future: Send,
33        C::Response: rt::Read + rt::Write + Unpin + Send + 'static,
34    {
35        let mut settings: Builder<SharedExec> = Builder::new(endpoint.executor.clone())
36            .initial_stream_window_size(endpoint.init_stream_window_size)
37            .initial_connection_window_size(endpoint.init_connection_window_size)
38            .keep_alive_interval(endpoint.http2_keep_alive_interval)
39            .timer(TokioTimer::new())
40            .clone();
41
42        if let Some(val) = endpoint.http2_keep_alive_timeout {
43            settings.keep_alive_timeout(val);
44        }
45
46        if let Some(val) = endpoint.http2_keep_alive_while_idle {
47            settings.keep_alive_while_idle(val);
48        }
49
50        if let Some(val) = endpoint.http2_adaptive_window {
51            settings.adaptive_window(val);
52        }
53
54        if let Some(val) = endpoint.http2_max_header_list_size {
55            settings.max_header_list_size(val);
56        }
57
58        let stack = ServiceBuilder::new()
59            .layer_fn(|s| {
60                let origin = endpoint.origin.as_ref().unwrap_or(&endpoint.uri).clone();
61
62                AddOrigin::new(s, origin)
63            })
64            .layer_fn(|s| UserAgent::new(s, endpoint.user_agent.clone()))
65            .layer_fn(|s| GrpcTimeout::new(s, endpoint.timeout))
66            .option_layer(endpoint.concurrency_limit.map(ConcurrencyLimitLayer::new))
67            .option_layer(endpoint.rate_limit.map(|(l, d)| RateLimitLayer::new(l, d)))
68            .into_inner();
69
70        let make_service =
71            MakeSendRequestService::new(connector, endpoint.executor.clone(), settings);
72
73        let conn = Reconnect::new(make_service, endpoint.uri.clone(), is_lazy);
74
75        Self {
76            inner: BoxService::new(stack.layer(conn)),
77        }
78    }
79
80    pub(crate) async fn connect<C>(connector: C, endpoint: Endpoint) -> Result<Self, crate::Error>
81    where
82        C: Service<Uri> + Send + 'static,
83        C::Error: Into<crate::Error> + Send,
84        C::Future: Unpin + Send,
85        C::Response: rt::Read + rt::Write + Unpin + Send + 'static,
86    {
87        Self::new(connector, endpoint, false).ready_oneshot().await
88    }
89
90    pub(crate) fn lazy<C>(connector: C, endpoint: Endpoint) -> Self
91    where
92        C: Service<Uri> + Send + 'static,
93        C::Error: Into<crate::Error> + Send,
94        C::Future: Send,
95        C::Response: rt::Read + rt::Write + Unpin + Send + 'static,
96    {
97        Self::new(connector, endpoint, true)
98    }
99}
100
101impl Service<Request<BoxBody>> for Connection {
102    type Response = Response<BoxBody>;
103    type Error = crate::Error;
104    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
105
106    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
107        Service::poll_ready(&mut self.inner, cx).map_err(Into::into)
108    }
109
110    fn call(&mut self, req: Request<BoxBody>) -> Self::Future {
111        self.inner.call(req)
112    }
113}
114
115impl Load for Connection {
116    type Metric = usize;
117
118    fn load(&self) -> Self::Metric {
119        0
120    }
121}
122
123impl fmt::Debug for Connection {
124    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125        f.debug_struct("Connection").finish()
126    }
127}
128
129struct SendRequest {
130    inner: hyper::client::conn::http2::SendRequest<BoxBody>,
131}
132
133impl From<hyper::client::conn::http2::SendRequest<BoxBody>> for SendRequest {
134    fn from(inner: hyper::client::conn::http2::SendRequest<BoxBody>) -> Self {
135        Self { inner }
136    }
137}
138
139impl tower::Service<Request<BoxBody>> for SendRequest {
140    type Response = Response<BoxBody>;
141    type Error = crate::Error;
142    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
143
144    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
145        self.inner.poll_ready(cx).map_err(Into::into)
146    }
147
148    fn call(&mut self, req: Request<BoxBody>) -> Self::Future {
149        let fut = self.inner.send_request(req);
150
151        Box::pin(async move { fut.await.map_err(Into::into).map(|res| res.map(boxed)) })
152    }
153}
154
155struct MakeSendRequestService<C> {
156    connector: C,
157    executor: SharedExec,
158    settings: Builder<SharedExec>,
159}
160
161impl<C> MakeSendRequestService<C> {
162    fn new(connector: C, executor: SharedExec, settings: Builder<SharedExec>) -> Self {
163        Self {
164            connector,
165            executor,
166            settings,
167        }
168    }
169}
170
171impl<C> tower::Service<Uri> for MakeSendRequestService<C>
172where
173    C: Service<Uri> + Send + 'static,
174    C::Error: Into<crate::Error> + Send,
175    C::Future: Send,
176    C::Response: rt::Read + rt::Write + Unpin + Send,
177{
178    type Response = SendRequest;
179    type Error = crate::Error;
180    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
181
182    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
183        self.connector.poll_ready(cx).map_err(Into::into)
184    }
185
186    fn call(&mut self, req: Uri) -> Self::Future {
187        let fut = self.connector.call(req);
188        let builder = self.settings.clone();
189        let executor = self.executor.clone();
190
191        Box::pin(async move {
192            let io = fut.await.map_err(Into::into)?;
193            let (send_request, conn) = builder.handshake(io).await?;
194
195            Executor::<BoxFuture<'static, ()>>::execute(
196                &executor,
197                Box::pin(async move {
198                    if let Err(e) = conn.await {
199                        tracing::debug!("connection task error: {:?}", e);
200                    }
201                }) as _,
202            );
203
204            Ok(SendRequest::from(send_request))
205        })
206    }
207}