hyper_util/client/legacy/connect/
capture.rs

1use std::{ops::Deref, sync::Arc};
2
3use http::Request;
4use tokio::sync::watch;
5
6use super::Connected;
7
8/// [`CaptureConnection`] allows callers to capture [`Connected`] information
9///
10/// To capture a connection for a request, use [`capture_connection`].
11#[derive(Debug, Clone)]
12pub struct CaptureConnection {
13    rx: watch::Receiver<Option<Connected>>,
14}
15
16/// Capture the connection for a given request
17///
18/// When making a request with Hyper, the underlying connection must implement the [`Connection`] trait.
19/// [`capture_connection`] allows a caller to capture the returned [`Connected`] structure as soon
20/// as the connection is established.
21///
22/// [`Connection`]: crate::client::legacy::connect::Connection
23///
24/// *Note*: If establishing a connection fails, [`CaptureConnection::connection_metadata`] will always return none.
25///
26/// # Examples
27///
28/// **Synchronous access**:
29/// The [`CaptureConnection::connection_metadata`] method allows callers to check if a connection has been
30/// established. This is ideal for situations where you are certain the connection has already
31/// been established (e.g. after the response future has already completed).
32/// ```rust
33/// use hyper_util::client::legacy::connect::capture_connection;
34/// let mut request = http::Request::builder()
35///   .uri("http://foo.com")
36///   .body(())
37///   .unwrap();
38///
39/// let captured_connection = capture_connection(&mut request);
40/// // some time later after the request has been sent...
41/// let connection_info = captured_connection.connection_metadata();
42/// println!("we are connected! {:?}", connection_info.as_ref());
43/// ```
44///
45/// **Asynchronous access**:
46/// The [`CaptureConnection::wait_for_connection_metadata`] method returns a future resolves as soon as the
47/// connection is available.
48///
49/// ```rust
50/// # #[cfg(feature  = "tokio")]
51/// # async fn example() {
52/// use hyper_util::client::legacy::connect::capture_connection;
53/// use hyper_util::client::legacy::Client;
54/// use hyper_util::rt::TokioExecutor;
55/// use bytes::Bytes;
56/// use http_body_util::Empty;
57/// let mut request = http::Request::builder()
58///   .uri("http://foo.com")
59///   .body(Empty::<Bytes>::new())
60///   .unwrap();
61///
62/// let mut captured = capture_connection(&mut request);
63/// tokio::task::spawn(async move {
64///     let connection_info = captured.wait_for_connection_metadata().await;
65///     println!("we are connected! {:?}", connection_info.as_ref());
66/// });
67///
68/// let client = Client::builder(TokioExecutor::new()).build_http();
69/// client.request(request).await.expect("request failed");
70/// # }
71/// ```
72pub fn capture_connection<B>(request: &mut Request<B>) -> CaptureConnection {
73    let (tx, rx) = CaptureConnection::new();
74    request.extensions_mut().insert(tx);
75    rx
76}
77
78/// TxSide for [`CaptureConnection`]
79///
80/// This is inserted into `Extensions` to allow Hyper to back channel connection info
81#[derive(Clone)]
82pub(crate) struct CaptureConnectionExtension {
83    tx: Arc<watch::Sender<Option<Connected>>>,
84}
85
86impl CaptureConnectionExtension {
87    pub(crate) fn set(&self, connected: &Connected) {
88        self.tx.send_replace(Some(connected.clone()));
89    }
90}
91
92impl CaptureConnection {
93    /// Internal API to create the tx and rx half of [`CaptureConnection`]
94    pub(crate) fn new() -> (CaptureConnectionExtension, Self) {
95        let (tx, rx) = watch::channel(None);
96        (
97            CaptureConnectionExtension { tx: Arc::new(tx) },
98            CaptureConnection { rx },
99        )
100    }
101
102    /// Retrieve the connection metadata, if available
103    pub fn connection_metadata(&self) -> impl Deref<Target = Option<Connected>> + '_ {
104        self.rx.borrow()
105    }
106
107    /// Wait for the connection to be established
108    ///
109    /// If a connection was established, this will always return `Some(...)`. If the request never
110    /// successfully connected (e.g. DNS resolution failure), this method will never return.
111    pub async fn wait_for_connection_metadata(
112        &mut self,
113    ) -> impl Deref<Target = Option<Connected>> + '_ {
114        if self.rx.borrow().is_some() {
115            return self.rx.borrow();
116        }
117        let _ = self.rx.changed().await;
118        self.rx.borrow()
119    }
120}
121
122#[cfg(all(test, not(miri)))]
123mod test {
124    use super::*;
125
126    #[test]
127    fn test_sync_capture_connection() {
128        let (tx, rx) = CaptureConnection::new();
129        assert!(
130            rx.connection_metadata().is_none(),
131            "connection has not been set"
132        );
133        tx.set(&Connected::new().proxy(true));
134        assert_eq!(
135            rx.connection_metadata()
136                .as_ref()
137                .expect("connected should be set")
138                .is_proxied(),
139            true
140        );
141
142        // ensure it can be called multiple times
143        assert_eq!(
144            rx.connection_metadata()
145                .as_ref()
146                .expect("connected should be set")
147                .is_proxied(),
148            true
149        );
150    }
151
152    #[tokio::test]
153    async fn async_capture_connection() {
154        let (tx, mut rx) = CaptureConnection::new();
155        assert!(
156            rx.connection_metadata().is_none(),
157            "connection has not been set"
158        );
159        let test_task = tokio::spawn(async move {
160            assert_eq!(
161                rx.wait_for_connection_metadata()
162                    .await
163                    .as_ref()
164                    .expect("connection should be set")
165                    .is_proxied(),
166                true
167            );
168            // can be awaited multiple times
169            assert!(
170                rx.wait_for_connection_metadata().await.is_some(),
171                "should be awaitable multiple times"
172            );
173
174            assert_eq!(rx.connection_metadata().is_some(), true);
175        });
176        // can't be finished, we haven't set the connection yet
177        assert_eq!(test_task.is_finished(), false);
178        tx.set(&Connected::new().proxy(true));
179
180        assert!(test_task.await.is_ok());
181    }
182
183    #[tokio::test]
184    async fn capture_connection_sender_side_dropped() {
185        let (tx, mut rx) = CaptureConnection::new();
186        assert!(
187            rx.connection_metadata().is_none(),
188            "connection has not been set"
189        );
190        drop(tx);
191        assert!(rx.wait_for_connection_metadata().await.is_none());
192    }
193}