aws_smithy_runtime_api/client/
connection.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6//! Types related to connection monitoring and management.
7
8use aws_smithy_types::config_bag::{Storable, StoreReplace};
9use std::fmt;
10use std::net::SocketAddr;
11use std::sync::{Arc, Mutex};
12
13/// Metadata that tracks the state of an active connection.
14#[derive(Clone)]
15pub struct ConnectionMetadata {
16    is_proxied: bool,
17    remote_addr: Option<SocketAddr>,
18    local_addr: Option<SocketAddr>,
19    poison_fn: Arc<dyn Fn() + Send + Sync>,
20}
21
22impl ConnectionMetadata {
23    /// Poison this connection, ensuring that it won't be reused.
24    pub fn poison(&self) {
25        tracing::info!(
26            see_for_more_info = "https://smithy-lang.github.io/smithy-rs/design/client/detailed_error_explanations.html",
27            "Connection encountered an issue and should not be re-used. Marking it for closure"
28        );
29        (self.poison_fn)()
30    }
31
32    /// Create a new [`ConnectionMetadata`].
33    #[deprecated(
34        since = "1.1.0",
35        note = "`ConnectionMetadata::new` is deprecated in favour of `ConnectionMetadata::builder`."
36    )]
37    pub fn new(
38        is_proxied: bool,
39        remote_addr: Option<SocketAddr>,
40        poison: impl Fn() + Send + Sync + 'static,
41    ) -> Self {
42        Self {
43            is_proxied,
44            remote_addr,
45            // need to use builder to set this field
46            local_addr: None,
47            poison_fn: Arc::new(poison),
48        }
49    }
50
51    /// Builder for this connection metadata
52    pub fn builder() -> ConnectionMetadataBuilder {
53        ConnectionMetadataBuilder::new()
54    }
55
56    /// Get the remote address for this connection, if one is set.
57    pub fn remote_addr(&self) -> Option<SocketAddr> {
58        self.remote_addr
59    }
60
61    /// Get the local address for this connection, if one is set.
62    pub fn local_addr(&self) -> Option<SocketAddr> {
63        self.local_addr
64    }
65}
66
67impl fmt::Debug for ConnectionMetadata {
68    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
69        f.debug_struct("SmithyConnection")
70            .field("is_proxied", &self.is_proxied)
71            .field("remote_addr", &self.remote_addr)
72            .field("local_addr", &self.local_addr)
73            .finish()
74    }
75}
76
77/// Builder type that is used to construct a [`ConnectionMetadata`] value.
78#[derive(Default)]
79pub struct ConnectionMetadataBuilder {
80    is_proxied: Option<bool>,
81    remote_addr: Option<SocketAddr>,
82    local_addr: Option<SocketAddr>,
83    poison_fn: Option<Arc<dyn Fn() + Send + Sync>>,
84}
85
86impl fmt::Debug for ConnectionMetadataBuilder {
87    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88        f.debug_struct("ConnectionMetadataBuilder")
89            .field("is_proxied", &self.is_proxied)
90            .field("remote_addr", &self.remote_addr)
91            .field("local_addr", &self.local_addr)
92            .finish()
93    }
94}
95
96impl ConnectionMetadataBuilder {
97    /// Creates a new builder.
98    pub fn new() -> Self {
99        Self::default()
100    }
101
102    /// Set whether or not the associated connection is to an HTTP proxy.
103    pub fn proxied(mut self, proxied: bool) -> Self {
104        self.set_proxied(Some(proxied));
105        self
106    }
107
108    /// Set whether or not the associated connection is to an HTTP proxy.
109    pub fn set_proxied(&mut self, proxied: Option<bool>) -> &mut Self {
110        self.is_proxied = proxied;
111        self
112    }
113
114    /// Set the remote address of the connection used.
115    pub fn remote_addr(mut self, remote_addr: SocketAddr) -> Self {
116        self.set_remote_addr(Some(remote_addr));
117        self
118    }
119
120    /// Set the remote address of the connection used.
121    pub fn set_remote_addr(&mut self, remote_addr: Option<SocketAddr>) -> &mut Self {
122        self.remote_addr = remote_addr;
123        self
124    }
125
126    /// Set the local address of the connection used.
127    pub fn local_addr(mut self, local_addr: SocketAddr) -> Self {
128        self.set_local_addr(Some(local_addr));
129        self
130    }
131
132    /// Set the local address of the connection used.
133    pub fn set_local_addr(&mut self, local_addr: Option<SocketAddr>) -> &mut Self {
134        self.local_addr = local_addr;
135        self
136    }
137
138    /// Set a closure which will poison the associated connection.
139    ///
140    /// A poisoned connection will not be reused for subsequent requests by the pool
141    pub fn poison_fn(mut self, poison_fn: impl Fn() + Send + Sync + 'static) -> Self {
142        self.set_poison_fn(Some(poison_fn));
143        self
144    }
145
146    /// Set a closure which will poison the associated connection.
147    ///
148    /// A poisoned connection will not be reused for subsequent requests by the pool
149    pub fn set_poison_fn(
150        &mut self,
151        poison_fn: Option<impl Fn() + Send + Sync + 'static>,
152    ) -> &mut Self {
153        self.poison_fn =
154            poison_fn.map(|poison_fn| Arc::new(poison_fn) as Arc<dyn Fn() + Send + Sync>);
155        self
156    }
157
158    /// Build a [`ConnectionMetadata`] value.
159    ///
160    /// # Panics
161    ///
162    /// If either the `is_proxied` or `poison_fn` has not been set, then this method will panic
163    pub fn build(self) -> ConnectionMetadata {
164        ConnectionMetadata {
165            is_proxied: self
166                .is_proxied
167                .expect("is_proxied should be set for ConnectionMetadata"),
168            remote_addr: self.remote_addr,
169            local_addr: self.local_addr,
170            poison_fn: self
171                .poison_fn
172                .expect("poison_fn should be set for ConnectionMetadata"),
173        }
174    }
175}
176
177type LoaderFn = dyn Fn() -> Option<ConnectionMetadata> + Send + Sync;
178
179/// State for a middleware that will monitor and manage connections.
180#[derive(Clone, Default)]
181pub struct CaptureSmithyConnection {
182    loader: Arc<Mutex<Option<Box<LoaderFn>>>>,
183}
184
185impl CaptureSmithyConnection {
186    /// Create a new connection monitor.
187    pub fn new() -> Self {
188        Self {
189            loader: Default::default(),
190        }
191    }
192
193    /// Set the retriever that will capture the `hyper` connection.
194    pub fn set_connection_retriever<F>(&self, f: F)
195    where
196        F: Fn() -> Option<ConnectionMetadata> + Send + Sync + 'static,
197    {
198        *self.loader.lock().unwrap() = Some(Box::new(f));
199    }
200
201    /// Get the associated connection metadata.
202    pub fn get(&self) -> Option<ConnectionMetadata> {
203        match self.loader.lock().unwrap().as_ref() {
204            Some(loader) => loader(),
205            None => {
206                tracing::debug!("no loader was set on the CaptureSmithyConnection");
207                None
208            }
209        }
210    }
211}
212
213impl fmt::Debug for CaptureSmithyConnection {
214    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
215        write!(f, "CaptureSmithyConnection")
216    }
217}
218
219impl Storable for CaptureSmithyConnection {
220    type Storer = StoreReplace<Self>;
221}
222
223#[cfg(test)]
224mod tests {
225    use std::{
226        net::{IpAddr, Ipv6Addr},
227        sync::Mutex,
228    };
229
230    use super::*;
231
232    const TEST_SOCKET_ADDR: SocketAddr = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 100);
233
234    #[test]
235    #[should_panic]
236    fn builder_panic_missing_proxied() {
237        ConnectionMetadataBuilder::new()
238            .poison_fn(|| {})
239            .local_addr(TEST_SOCKET_ADDR)
240            .remote_addr(TEST_SOCKET_ADDR)
241            .build();
242    }
243
244    #[test]
245    #[should_panic]
246    fn builder_panic_missing_poison_fn() {
247        ConnectionMetadataBuilder::new()
248            .proxied(true)
249            .local_addr(TEST_SOCKET_ADDR)
250            .remote_addr(TEST_SOCKET_ADDR)
251            .build();
252    }
253
254    #[test]
255    fn builder_all_fields_successful() {
256        let mutable_flag = Arc::new(Mutex::new(false));
257
258        let connection_metadata = ConnectionMetadataBuilder::new()
259            .proxied(true)
260            .local_addr(TEST_SOCKET_ADDR)
261            .remote_addr(TEST_SOCKET_ADDR)
262            .poison_fn({
263                let mutable_flag = Arc::clone(&mutable_flag);
264                move || {
265                    let mut guard = mutable_flag.lock().unwrap();
266                    *guard = !*guard;
267                }
268            })
269            .build();
270
271        assert!(connection_metadata.is_proxied);
272        assert_eq!(connection_metadata.remote_addr(), Some(TEST_SOCKET_ADDR));
273        assert_eq!(connection_metadata.local_addr(), Some(TEST_SOCKET_ADDR));
274        assert!(!(*mutable_flag.lock().unwrap()));
275        connection_metadata.poison();
276        assert!(*mutable_flag.lock().unwrap());
277    }
278
279    #[test]
280    fn builder_optional_fields_translate() {
281        let metadata1 = ConnectionMetadataBuilder::new()
282            .proxied(true)
283            .poison_fn(|| {})
284            .build();
285
286        assert_eq!(metadata1.local_addr(), None);
287        assert_eq!(metadata1.remote_addr(), None);
288
289        let metadata2 = ConnectionMetadataBuilder::new()
290            .proxied(true)
291            .poison_fn(|| {})
292            .local_addr(TEST_SOCKET_ADDR)
293            .build();
294
295        assert_eq!(metadata2.local_addr(), Some(TEST_SOCKET_ADDR));
296        assert_eq!(metadata2.remote_addr(), None);
297
298        let metadata3 = ConnectionMetadataBuilder::new()
299            .proxied(true)
300            .poison_fn(|| {})
301            .remote_addr(TEST_SOCKET_ADDR)
302            .build();
303
304        assert_eq!(metadata3.local_addr(), None);
305        assert_eq!(metadata3.remote_addr(), Some(TEST_SOCKET_ADDR));
306    }
307
308    #[test]
309    #[allow(clippy::redundant_clone)]
310    fn retrieve_connection_metadata() {
311        let retriever = CaptureSmithyConnection::new();
312        let retriever_clone = retriever.clone();
313        assert!(retriever.get().is_none());
314        retriever.set_connection_retriever(|| {
315            Some(
316                ConnectionMetadata::builder()
317                    .proxied(true)
318                    .poison_fn(|| {})
319                    .build(),
320            )
321        });
322
323        assert!(retriever.get().is_some());
324        assert!(retriever_clone.get().is_some());
325    }
326}