tonic/transport/channel/
mod.rs
1mod endpoint;
4pub(crate) mod service;
5#[cfg(feature = "tls")]
6mod tls;
7
8pub use endpoint::Endpoint;
9#[cfg(feature = "tls")]
10pub use tls::ClientTlsConfig;
11
12use self::service::{Connection, DynamicServiceStream, Executor, SharedExec};
13use crate::body::BoxBody;
14use bytes::Bytes;
15use http::{
16 uri::{InvalidUri, Uri},
17 Request, Response,
18};
19use hyper_util::client::legacy::connect::Connection as HyperConnection;
20use std::{
21 fmt,
22 future::Future,
23 hash::Hash,
24 pin::Pin,
25 task::{Context, Poll},
26};
27use tokio::sync::mpsc::{channel, Sender};
28
29use hyper::rt;
30use tower::balance::p2c::Balance;
31use tower::{
32 buffer::{self, Buffer},
33 discover::{Change, Discover},
34 util::{BoxService, Either},
35 Service,
36};
37
38type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
39type Svc = Either<Connection, BoxService<Request<BoxBody>, Response<BoxBody>, crate::Error>>;
40
41const DEFAULT_BUFFER_SIZE: usize = 1024;
42
43#[derive(Clone)]
67pub struct Channel {
68 svc: Buffer<Svc, Request<BoxBody>>,
69}
70
71pub struct ResponseFuture {
75 inner: buffer::future::ResponseFuture<<Svc as Service<Request<BoxBody>>>::Future>,
76}
77
78impl Channel {
79 pub fn builder(uri: Uri) -> Endpoint {
81 Endpoint::from(uri)
82 }
83
84 pub fn from_static(s: &'static str) -> Endpoint {
91 let uri = Uri::from_static(s);
92 Self::builder(uri)
93 }
94
95 pub fn from_shared(s: impl Into<Bytes>) -> Result<Endpoint, InvalidUri> {
102 let uri = Uri::from_maybe_shared(s.into())?;
103 Ok(Self::builder(uri))
104 }
105
106 pub fn balance_list(list: impl Iterator<Item = Endpoint>) -> Self {
111 let (channel, tx) = Self::balance_channel(DEFAULT_BUFFER_SIZE);
112 list.for_each(|endpoint| {
113 tx.try_send(Change::Insert(endpoint.uri.clone(), endpoint))
114 .unwrap();
115 });
116
117 channel
118 }
119
120 pub fn balance_channel<K>(capacity: usize) -> (Self, Sender<Change<K, Endpoint>>)
124 where
125 K: Hash + Eq + Send + Clone + 'static,
126 {
127 Self::balance_channel_with_executor(capacity, SharedExec::tokio())
128 }
129
130 pub fn balance_channel_with_executor<K, E>(
136 capacity: usize,
137 executor: E,
138 ) -> (Self, Sender<Change<K, Endpoint>>)
139 where
140 K: Hash + Eq + Send + Clone + 'static,
141 E: Executor<Pin<Box<dyn Future<Output = ()> + Send>>> + Send + Sync + 'static,
142 {
143 let (tx, rx) = channel(capacity);
144 let list = DynamicServiceStream::new(rx);
145 (Self::balance(list, DEFAULT_BUFFER_SIZE, executor), tx)
146 }
147
148 pub(crate) fn new<C>(connector: C, endpoint: Endpoint) -> Self
149 where
150 C: Service<Uri> + Send + 'static,
151 C::Error: Into<crate::Error> + Send,
152 C::Future: Send,
153 C::Response: rt::Read + rt::Write + HyperConnection + Unpin + Send + 'static,
154 {
155 let buffer_size = endpoint.buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE);
156 let executor = endpoint.executor.clone();
157
158 let svc = Connection::lazy(connector, endpoint);
159 let (svc, worker) = Buffer::pair(Either::A(svc), buffer_size);
160 executor.execute(worker);
161
162 Channel { svc }
163 }
164
165 pub(crate) async fn connect<C>(connector: C, endpoint: Endpoint) -> Result<Self, super::Error>
166 where
167 C: Service<Uri> + Send + 'static,
168 C::Error: Into<crate::Error> + Send,
169 C::Future: Unpin + Send,
170 C::Response: rt::Read + rt::Write + HyperConnection + Unpin + Send + 'static,
171 {
172 let buffer_size = endpoint.buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE);
173 let executor = endpoint.executor.clone();
174
175 let svc = Connection::connect(connector, endpoint)
176 .await
177 .map_err(super::Error::from_source)?;
178 let (svc, worker) = Buffer::pair(Either::A(svc), buffer_size);
179 executor.execute(worker);
180
181 Ok(Channel { svc })
182 }
183
184 pub(crate) fn balance<D, E>(discover: D, buffer_size: usize, executor: E) -> Self
185 where
186 D: Discover<Service = Connection> + Unpin + Send + 'static,
187 D::Error: Into<crate::Error>,
188 D::Key: Hash + Send + Clone,
189 E: Executor<BoxFuture<'static, ()>> + Send + Sync + 'static,
190 {
191 let svc = Balance::new(discover);
192
193 let svc = BoxService::new(svc);
194 let (svc, worker) = Buffer::pair(Either::B(svc), buffer_size);
195 executor.execute(Box::pin(worker));
196
197 Channel { svc }
198 }
199}
200
201impl Service<http::Request<BoxBody>> for Channel {
202 type Response = http::Response<BoxBody>;
203 type Error = super::Error;
204 type Future = ResponseFuture;
205
206 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
207 Service::poll_ready(&mut self.svc, cx).map_err(super::Error::from_source)
208 }
209
210 fn call(&mut self, request: http::Request<BoxBody>) -> Self::Future {
211 let inner = Service::call(&mut self.svc, request);
212
213 ResponseFuture { inner }
214 }
215}
216
217impl Future for ResponseFuture {
218 type Output = Result<Response<BoxBody>, super::Error>;
219
220 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
221 Pin::new(&mut self.inner)
222 .poll(cx)
223 .map_err(super::Error::from_source)
224 }
225}
226
227impl fmt::Debug for Channel {
228 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
229 f.debug_struct("Channel").finish()
230 }
231}
232
233impl fmt::Debug for ResponseFuture {
234 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
235 f.debug_struct("ResponseFuture").finish()
236 }
237}