tonic/transport/channel/
mod.rs

1//! Client implementation and builder.
2
3mod endpoint;
4pub(crate) mod service;
5#[cfg(feature = "tls")]
6mod tls;
7
8pub use endpoint::Endpoint;
9#[cfg(feature = "tls")]
10pub use tls::ClientTlsConfig;
11
12use self::service::{Connection, DynamicServiceStream, Executor, SharedExec};
13use crate::body::BoxBody;
14use bytes::Bytes;
15use http::{
16    uri::{InvalidUri, Uri},
17    Request, Response,
18};
19use hyper_util::client::legacy::connect::Connection as HyperConnection;
20use std::{
21    fmt,
22    future::Future,
23    hash::Hash,
24    pin::Pin,
25    task::{Context, Poll},
26};
27use tokio::sync::mpsc::{channel, Sender};
28
29use hyper::rt;
30use tower::balance::p2c::Balance;
31use tower::{
32    buffer::{self, Buffer},
33    discover::{Change, Discover},
34    util::{BoxService, Either},
35    Service,
36};
37
38type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
39type Svc = Either<Connection, BoxService<Request<BoxBody>, Response<BoxBody>, crate::Error>>;
40
41const DEFAULT_BUFFER_SIZE: usize = 1024;
42
43/// A default batteries included `transport` channel.
44///
45/// This provides a fully featured http2 gRPC client based on `hyper`
46/// and `tower` services.
47///
48/// # Multiplexing requests
49///
50/// Sending a request on a channel requires a `&mut self` and thus can only send
51/// one request in flight. This is intentional and is required to follow the `Service`
52/// contract from the `tower` library which this channel implementation is built on
53/// top of.
54///
55/// `tower` itself has a concept of `poll_ready` which is the main mechanism to apply
56/// back pressure. `poll_ready` takes a `&mut self` and when it returns `Poll::Ready`
57/// we know the `Service` is able to accept only one request before we must `poll_ready`
58/// again. Due to this fact any `async fn` that wants to poll for readiness and submit
59/// the request must have a `&mut self` reference.
60///
61/// To work around this and to ease the use of the channel, `Channel` provides a
62/// `Clone` implementation that is _cheap_. This is because at the very top level
63/// the channel is backed by a `tower_buffer::Buffer` which runs the connection
64/// in a background task and provides a `mpsc` channel interface. Due to this
65/// cloning the `Channel` type is cheap and encouraged.
66#[derive(Clone)]
67pub struct Channel {
68    svc: Buffer<Svc, Request<BoxBody>>,
69}
70
71/// A future that resolves to an HTTP response.
72///
73/// This is returned by the `Service::call` on [`Channel`].
74pub struct ResponseFuture {
75    inner: buffer::future::ResponseFuture<<Svc as Service<Request<BoxBody>>>::Future>,
76}
77
78impl Channel {
79    /// Create an [`Endpoint`] builder that can create [`Channel`]s.
80    pub fn builder(uri: Uri) -> Endpoint {
81        Endpoint::from(uri)
82    }
83
84    /// Create an [`Endpoint`] from a static string.
85    ///
86    /// ```
87    /// # use tonic::transport::Channel;
88    /// Channel::from_static("https://example.com");
89    /// ```
90    pub fn from_static(s: &'static str) -> Endpoint {
91        let uri = Uri::from_static(s);
92        Self::builder(uri)
93    }
94
95    /// Create an [`Endpoint`] from shared bytes.
96    ///
97    /// ```
98    /// # use tonic::transport::Channel;
99    /// Channel::from_shared("https://example.com");
100    /// ```
101    pub fn from_shared(s: impl Into<Bytes>) -> Result<Endpoint, InvalidUri> {
102        let uri = Uri::from_maybe_shared(s.into())?;
103        Ok(Self::builder(uri))
104    }
105
106    /// Balance a list of [`Endpoint`]'s.
107    ///
108    /// This creates a [`Channel`] that will load balance across all the
109    /// provided endpoints.
110    pub fn balance_list(list: impl Iterator<Item = Endpoint>) -> Self {
111        let (channel, tx) = Self::balance_channel(DEFAULT_BUFFER_SIZE);
112        list.for_each(|endpoint| {
113            tx.try_send(Change::Insert(endpoint.uri.clone(), endpoint))
114                .unwrap();
115        });
116
117        channel
118    }
119
120    /// Balance a list of [`Endpoint`]'s.
121    ///
122    /// This creates a [`Channel`] that will listen to a stream of change events and will add or remove provided endpoints.
123    pub fn balance_channel<K>(capacity: usize) -> (Self, Sender<Change<K, Endpoint>>)
124    where
125        K: Hash + Eq + Send + Clone + 'static,
126    {
127        Self::balance_channel_with_executor(capacity, SharedExec::tokio())
128    }
129
130    /// Balance a list of [`Endpoint`]'s.
131    ///
132    /// This creates a [`Channel`] that will listen to a stream of change events and will add or remove provided endpoints.
133    ///
134    /// The [`Channel`] will use the given executor to spawn async tasks.
135    pub fn balance_channel_with_executor<K, E>(
136        capacity: usize,
137        executor: E,
138    ) -> (Self, Sender<Change<K, Endpoint>>)
139    where
140        K: Hash + Eq + Send + Clone + 'static,
141        E: Executor<Pin<Box<dyn Future<Output = ()> + Send>>> + Send + Sync + 'static,
142    {
143        let (tx, rx) = channel(capacity);
144        let list = DynamicServiceStream::new(rx);
145        (Self::balance(list, DEFAULT_BUFFER_SIZE, executor), tx)
146    }
147
148    pub(crate) fn new<C>(connector: C, endpoint: Endpoint) -> Self
149    where
150        C: Service<Uri> + Send + 'static,
151        C::Error: Into<crate::Error> + Send,
152        C::Future: Send,
153        C::Response: rt::Read + rt::Write + HyperConnection + Unpin + Send + 'static,
154    {
155        let buffer_size = endpoint.buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE);
156        let executor = endpoint.executor.clone();
157
158        let svc = Connection::lazy(connector, endpoint);
159        let (svc, worker) = Buffer::pair(Either::A(svc), buffer_size);
160        executor.execute(worker);
161
162        Channel { svc }
163    }
164
165    pub(crate) async fn connect<C>(connector: C, endpoint: Endpoint) -> Result<Self, super::Error>
166    where
167        C: Service<Uri> + Send + 'static,
168        C::Error: Into<crate::Error> + Send,
169        C::Future: Unpin + Send,
170        C::Response: rt::Read + rt::Write + HyperConnection + Unpin + Send + 'static,
171    {
172        let buffer_size = endpoint.buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE);
173        let executor = endpoint.executor.clone();
174
175        let svc = Connection::connect(connector, endpoint)
176            .await
177            .map_err(super::Error::from_source)?;
178        let (svc, worker) = Buffer::pair(Either::A(svc), buffer_size);
179        executor.execute(worker);
180
181        Ok(Channel { svc })
182    }
183
184    pub(crate) fn balance<D, E>(discover: D, buffer_size: usize, executor: E) -> Self
185    where
186        D: Discover<Service = Connection> + Unpin + Send + 'static,
187        D::Error: Into<crate::Error>,
188        D::Key: Hash + Send + Clone,
189        E: Executor<BoxFuture<'static, ()>> + Send + Sync + 'static,
190    {
191        let svc = Balance::new(discover);
192
193        let svc = BoxService::new(svc);
194        let (svc, worker) = Buffer::pair(Either::B(svc), buffer_size);
195        executor.execute(Box::pin(worker));
196
197        Channel { svc }
198    }
199}
200
201impl Service<http::Request<BoxBody>> for Channel {
202    type Response = http::Response<BoxBody>;
203    type Error = super::Error;
204    type Future = ResponseFuture;
205
206    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
207        Service::poll_ready(&mut self.svc, cx).map_err(super::Error::from_source)
208    }
209
210    fn call(&mut self, request: http::Request<BoxBody>) -> Self::Future {
211        let inner = Service::call(&mut self.svc, request);
212
213        ResponseFuture { inner }
214    }
215}
216
217impl Future for ResponseFuture {
218    type Output = Result<Response<BoxBody>, super::Error>;
219
220    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
221        Pin::new(&mut self.inner)
222            .poll(cx)
223            .map_err(super::Error::from_source)
224    }
225}
226
227impl fmt::Debug for Channel {
228    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
229        f.debug_struct("Channel").finish()
230    }
231}
232
233impl fmt::Debug for ResponseFuture {
234    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
235        f.debug_struct("ResponseFuture").finish()
236    }
237}