tonic/transport/channel/service/
discover.rs

1use super::super::{Connection, Endpoint};
2
3use hyper_util::client::legacy::connect::HttpConnector;
4use std::{
5    hash::Hash,
6    pin::Pin,
7    task::{Context, Poll},
8};
9use tokio::sync::mpsc::Receiver;
10
11use tokio_stream::Stream;
12use tower::discover::Change;
13
14type DiscoverResult<K, S, E> = Result<Change<K, S>, E>;
15
16pub(crate) struct DynamicServiceStream<K: Hash + Eq + Clone> {
17    changes: Receiver<Change<K, Endpoint>>,
18}
19
20impl<K: Hash + Eq + Clone> DynamicServiceStream<K> {
21    pub(crate) fn new(changes: Receiver<Change<K, Endpoint>>) -> Self {
22        Self { changes }
23    }
24}
25
26impl<K: Hash + Eq + Clone> Stream for DynamicServiceStream<K> {
27    type Item = DiscoverResult<K, Connection, crate::Error>;
28
29    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
30        let c = &mut self.changes;
31        match Pin::new(&mut *c).poll_recv(cx) {
32            Poll::Pending | Poll::Ready(None) => Poll::Pending,
33            Poll::Ready(Some(change)) => match change {
34                Change::Insert(k, endpoint) => {
35                    let mut http = HttpConnector::new();
36                    http.set_nodelay(endpoint.tcp_nodelay);
37                    http.set_keepalive(endpoint.tcp_keepalive);
38                    http.set_connect_timeout(endpoint.connect_timeout);
39                    http.enforce_http(false);
40
41                    let connection = Connection::lazy(endpoint.connector(http), endpoint);
42                    let change = Ok(Change::Insert(k, connection));
43                    Poll::Ready(Some(change))
44                }
45                Change::Remove(k) => Poll::Ready(Some(Ok(Change::Remove(k)))),
46            },
47        }
48    }
49}
50
51impl<K: Hash + Eq + Clone> Unpin for DynamicServiceStream<K> {}