tonic/transport/channel/service/
discover.rs
1use super::super::{Connection, Endpoint};
2
3use hyper_util::client::legacy::connect::HttpConnector;
4use std::{
5 hash::Hash,
6 pin::Pin,
7 task::{Context, Poll},
8};
9use tokio::sync::mpsc::Receiver;
10
11use tokio_stream::Stream;
12use tower::discover::Change;
13
14type DiscoverResult<K, S, E> = Result<Change<K, S>, E>;
15
16pub(crate) struct DynamicServiceStream<K: Hash + Eq + Clone> {
17 changes: Receiver<Change<K, Endpoint>>,
18}
19
20impl<K: Hash + Eq + Clone> DynamicServiceStream<K> {
21 pub(crate) fn new(changes: Receiver<Change<K, Endpoint>>) -> Self {
22 Self { changes }
23 }
24}
25
26impl<K: Hash + Eq + Clone> Stream for DynamicServiceStream<K> {
27 type Item = DiscoverResult<K, Connection, crate::Error>;
28
29 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
30 let c = &mut self.changes;
31 match Pin::new(&mut *c).poll_recv(cx) {
32 Poll::Pending | Poll::Ready(None) => Poll::Pending,
33 Poll::Ready(Some(change)) => match change {
34 Change::Insert(k, endpoint) => {
35 let mut http = HttpConnector::new();
36 http.set_nodelay(endpoint.tcp_nodelay);
37 http.set_keepalive(endpoint.tcp_keepalive);
38 http.set_connect_timeout(endpoint.connect_timeout);
39 http.enforce_http(false);
40
41 let connection = Connection::lazy(endpoint.connector(http), endpoint);
42 let change = Ok(Change::Insert(k, connection));
43 Poll::Ready(Some(change))
44 }
45 Change::Remove(k) => Poll::Ready(Some(Ok(Change::Remove(k)))),
46 },
47 }
48 }
49}
50
51impl<K: Hash + Eq + Clone> Unpin for DynamicServiceStream<K> {}