1use crate::hyper_legacy::timeout_middleware::HttpTimeoutError;
7use aws_smithy_async::future::timeout::TimedOutError;
8use aws_smithy_async::rt::sleep::{default_async_sleep, AsyncSleep, SharedAsyncSleep};
9use aws_smithy_runtime_api::box_error::BoxError;
10use aws_smithy_runtime_api::client::connection::CaptureSmithyConnection;
11use aws_smithy_runtime_api::client::connection::ConnectionMetadata;
12use aws_smithy_runtime_api::client::connector_metadata::ConnectorMetadata;
13use aws_smithy_runtime_api::client::http::{
14    HttpClient, HttpConnector, HttpConnectorFuture, HttpConnectorSettings, SharedHttpClient,
15    SharedHttpConnector,
16};
17use aws_smithy_runtime_api::client::orchestrator::{HttpRequest, HttpResponse};
18use aws_smithy_runtime_api::client::result::ConnectorError;
19use aws_smithy_runtime_api::client::runtime_components::{
20    RuntimeComponents, RuntimeComponentsBuilder,
21};
22use aws_smithy_runtime_api::shared::IntoShared;
23use aws_smithy_types::body::SdkBody;
24use aws_smithy_types::config_bag::ConfigBag;
25use aws_smithy_types::error::display::DisplayErrorContext;
26use aws_smithy_types::retry::ErrorKind;
27use h2_0_3::Reason;
28use hyper_0_14::client::connect::{capture_connection, CaptureConnection, Connection, HttpInfo};
29use std::borrow::Cow;
30use std::collections::HashMap;
31use std::error::Error;
32use std::fmt;
33use std::sync::RwLock;
34use std::time::Duration;
35use tokio::io::{AsyncRead, AsyncWrite};
36
37#[cfg(feature = "legacy-rustls-ring")]
38mod default_connector {
39    use aws_smithy_async::rt::sleep::SharedAsyncSleep;
40    use aws_smithy_runtime_api::client::http::HttpConnectorSettings;
41    use legacy_hyper_rustls as hyper_rustls;
42    use legacy_rustls as rustls;
43    use std::sync::LazyLock;
44
45    pub(crate) static HTTPS_NATIVE_ROOTS: LazyLock<
48        hyper_rustls::HttpsConnector<hyper_0_14::client::HttpConnector>,
49    > = LazyLock::new(default_tls);
50
51    fn default_tls() -> hyper_rustls::HttpsConnector<hyper_0_14::client::HttpConnector> {
52        use hyper_rustls::ConfigBuilderExt;
53        hyper_rustls::HttpsConnectorBuilder::new()
54               .with_tls_config(
55                rustls::ClientConfig::builder()
56                    .with_cipher_suites(&[
57                        rustls::cipher_suite::TLS13_AES_256_GCM_SHA384,
59                        rustls::cipher_suite::TLS13_AES_128_GCM_SHA256,
60                        rustls::cipher_suite::TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
62                        rustls::cipher_suite::TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
63                        rustls::cipher_suite::TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
64                        rustls::cipher_suite::TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
65                        rustls::cipher_suite::TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256,
66                    ])
67                    .with_safe_default_kx_groups()
68                    .with_safe_default_protocol_versions()
69                    .expect("Error with the TLS configuration. Please file a bug report under https://github.com/smithy-lang/smithy-rs/issues.")
70                    .with_native_roots()
71                    .with_no_client_auth()
72            )
73            .https_or_http()
74            .enable_http1()
75            .enable_http2()
76            .build()
77    }
78
79    pub(super) fn base(
80        settings: &HttpConnectorSettings,
81        sleep: Option<SharedAsyncSleep>,
82    ) -> super::HyperConnectorBuilder {
83        let mut hyper = super::HyperConnector::builder().connector_settings(settings.clone());
84        if let Some(sleep) = sleep {
85            hyper = hyper.sleep_impl(sleep);
86        }
87        hyper
88    }
89
90    pub(super) fn https() -> hyper_rustls::HttpsConnector<hyper_0_14::client::HttpConnector> {
95        HTTPS_NATIVE_ROOTS.clone()
96    }
97}
98
99pub fn default_connector(
101    settings: &HttpConnectorSettings,
102    sleep: Option<SharedAsyncSleep>,
103) -> Option<SharedHttpConnector> {
104    #[cfg(feature = "legacy-rustls-ring")]
105    {
106        tracing::trace!(settings = ?settings, sleep = ?sleep, "creating a new default connector");
107        let hyper = default_connector::base(settings, sleep).build_https();
108        Some(SharedHttpConnector::new(hyper))
109    }
110    #[cfg(not(feature = "legacy-rustls-ring"))]
111    {
112        tracing::trace!(settings = ?settings, sleep = ?sleep, "no default connector available");
113        None
114    }
115}
116
117pub fn default_client() -> Option<SharedHttpClient> {
119    #[cfg(feature = "legacy-rustls-ring")]
120    {
121        tracing::trace!("creating a new default hyper 0.14.x client");
122        Some(HyperClientBuilder::new().build_https())
123    }
124    #[cfg(not(feature = "legacy-rustls-ring"))]
125    {
126        tracing::trace!("no default connector available");
127        None
128    }
129}
130
131#[derive(Debug)]
139pub struct HyperConnector {
140    adapter: Box<dyn HttpConnector>,
141}
142
143impl HyperConnector {
144    pub fn builder() -> HyperConnectorBuilder {
146        Default::default()
147    }
148}
149
150impl HttpConnector for HyperConnector {
151    fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
152        self.adapter.call(request)
153    }
154}
155
156#[derive(Default, Debug)]
158pub struct HyperConnectorBuilder {
159    connector_settings: Option<HttpConnectorSettings>,
160    sleep_impl: Option<SharedAsyncSleep>,
161    client_builder: Option<hyper_0_14::client::Builder>,
162}
163
164impl HyperConnectorBuilder {
165    pub fn build<C>(self, tcp_connector: C) -> HyperConnector
167    where
168        C: Clone + Send + Sync + 'static,
169        C: hyper_0_14::service::Service<http_02x::Uri>,
170        C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
171        C::Future: Unpin + Send + 'static,
172        C::Error: Into<BoxError>,
173    {
174        let client_builder = self.client_builder.unwrap_or_default();
175        let sleep_impl = self.sleep_impl.or_else(default_async_sleep);
176        let (connect_timeout, read_timeout) = self
177            .connector_settings
178            .map(|c| (c.connect_timeout(), c.read_timeout()))
179            .unwrap_or((None, None));
180
181        let connector = match connect_timeout {
182            Some(duration) => timeout_middleware::ConnectTimeout::new(
183                tcp_connector,
184                sleep_impl
185                    .clone()
186                    .expect("a sleep impl must be provided in order to have a connect timeout"),
187                duration,
188            ),
189            None => timeout_middleware::ConnectTimeout::no_timeout(tcp_connector),
190        };
191        let base = client_builder.build(connector);
192        let read_timeout = match read_timeout {
193            Some(duration) => timeout_middleware::HttpReadTimeout::new(
194                base,
195                sleep_impl.expect("a sleep impl must be provided in order to have a read timeout"),
196                duration,
197            ),
198            None => timeout_middleware::HttpReadTimeout::no_timeout(base),
199        };
200        HyperConnector {
201            adapter: Box::new(Adapter {
202                client: read_timeout,
203            }),
204        }
205    }
206
207    #[cfg(feature = "legacy-rustls-ring")]
209    pub fn build_https(self) -> HyperConnector {
210        self.build(default_connector::https())
211    }
212
213    pub fn sleep_impl(mut self, sleep_impl: impl AsyncSleep + 'static) -> Self {
218        self.sleep_impl = Some(sleep_impl.into_shared());
219        self
220    }
221
222    pub fn set_sleep_impl(&mut self, sleep_impl: Option<SharedAsyncSleep>) -> &mut Self {
227        self.sleep_impl = sleep_impl;
228        self
229    }
230
231    pub fn connector_settings(mut self, connector_settings: HttpConnectorSettings) -> Self {
233        self.connector_settings = Some(connector_settings);
234        self
235    }
236
237    pub fn set_connector_settings(
239        &mut self,
240        connector_settings: Option<HttpConnectorSettings>,
241    ) -> &mut Self {
242        self.connector_settings = connector_settings;
243        self
244    }
245
246    pub fn hyper_builder(mut self, hyper_builder: hyper_0_14::client::Builder) -> Self {
250        self.client_builder = Some(hyper_builder);
251        self
252    }
253
254    pub fn set_hyper_builder(
258        &mut self,
259        hyper_builder: Option<hyper_0_14::client::Builder>,
260    ) -> &mut Self {
261        self.client_builder = hyper_builder;
262        self
263    }
264}
265
266struct Adapter<C> {
270    client: timeout_middleware::HttpReadTimeout<
271        hyper_0_14::Client<timeout_middleware::ConnectTimeout<C>, SdkBody>,
272    >,
273}
274
275impl<C> fmt::Debug for Adapter<C> {
276    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
277        f.debug_struct("Adapter")
278            .field("client", &"** hyper client **")
279            .finish()
280    }
281}
282
283fn extract_smithy_connection(capture_conn: &CaptureConnection) -> Option<ConnectionMetadata> {
285    let capture_conn = capture_conn.clone();
286    if let Some(conn) = capture_conn.clone().connection_metadata().as_ref() {
287        let mut extensions = http_02x::Extensions::new();
288        conn.get_extras(&mut extensions);
289        let http_info = extensions.get::<HttpInfo>();
290        let mut builder = ConnectionMetadata::builder()
291            .proxied(conn.is_proxied())
292            .poison_fn(move || match capture_conn.connection_metadata().as_ref() {
293                Some(conn) => conn.poison(),
294                None => tracing::trace!("no connection existed to poison"),
295            });
296
297        builder
298            .set_local_addr(http_info.map(|info| info.local_addr()))
299            .set_remote_addr(http_info.map(|info| info.remote_addr()));
300
301        let smithy_connection = builder.build();
302
303        Some(smithy_connection)
304    } else {
305        None
306    }
307}
308
309impl<C> HttpConnector for Adapter<C>
310where
311    C: Clone + Send + Sync + 'static,
312    C: hyper_0_14::service::Service<http_02x::Uri>,
313    C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
314    C::Future: Unpin + Send + 'static,
315    C::Error: Into<BoxError>,
316{
317    fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
318        use hyper_0_14::service::Service;
319
320        let mut request = match request.try_into_http02x() {
321            Ok(request) => request,
322            Err(err) => {
323                return HttpConnectorFuture::ready(Err(ConnectorError::other(err.into(), None)));
324            }
325        };
326        let capture_connection = capture_connection(&mut request);
327        if let Some(capture_smithy_connection) =
328            request.extensions().get::<CaptureSmithyConnection>()
329        {
330            capture_smithy_connection
331                .set_connection_retriever(move || extract_smithy_connection(&capture_connection));
332        }
333        let mut client = self.client.clone();
334        let fut = client.call(request);
335        HttpConnectorFuture::new(async move {
336            let response = fut
337                .await
338                .map_err(downcast_error)?
339                .map(SdkBody::from_body_0_4);
340            match HttpResponse::try_from(response) {
341                Ok(response) => Ok(response),
342                Err(err) => Err(ConnectorError::other(err.into(), None)),
343            }
344        })
345    }
346}
347
348fn downcast_error(err: BoxError) -> ConnectorError {
350    if find_source::<TimedOutError>(err.as_ref()).is_some() {
352        return ConnectorError::timeout(err);
353    }
354    let err = match err.downcast::<ConnectorError>() {
356        Ok(connector_error) => return *connector_error,
357        Err(box_error) => box_error,
358    };
359    let err = match err.downcast::<hyper_0_14::Error>() {
362        Ok(hyper_error) => return to_connector_error(*hyper_error),
363        Err(box_error) => box_error,
364    };
365
366    ConnectorError::other(err, None)
368}
369
370fn to_connector_error(err: hyper_0_14::Error) -> ConnectorError {
372    if err.is_timeout() || find_source::<HttpTimeoutError>(&err).is_some() {
373        return ConnectorError::timeout(err.into());
374    }
375    if err.is_user() {
376        return ConnectorError::user(err.into());
377    }
378    if err.is_closed() || err.is_canceled() || find_source::<std::io::Error>(&err).is_some() {
379        return ConnectorError::io(err.into());
380    }
381    if err.is_incomplete_message() {
383        return ConnectorError::other(err.into(), Some(ErrorKind::TransientError));
384    }
385    if let Some(h2_err) = find_source::<h2_0_3::Error>(&err) {
386        if h2_err.is_go_away()
387            || (h2_err.is_reset() && h2_err.reason() == Some(Reason::REFUSED_STREAM))
388        {
389            return ConnectorError::io(err.into());
390        }
391    }
392
393    tracing::warn!(err = %DisplayErrorContext(&err), "unrecognized error from Hyper. If this error should be retried, please file an issue.");
394    ConnectorError::other(err.into(), None)
395}
396
397fn find_source<'a, E: Error + 'static>(err: &'a (dyn Error + 'static)) -> Option<&'a E> {
398    let mut next = Some(err);
399    while let Some(err) = next {
400        if let Some(matching_err) = err.downcast_ref::<E>() {
401            return Some(matching_err);
402        }
403        next = err.source();
404    }
405    None
406}
407
408#[derive(Clone, Debug, Eq, PartialEq, Hash)]
409struct CacheKey {
410    connect_timeout: Option<Duration>,
411    read_timeout: Option<Duration>,
412}
413
414impl From<&HttpConnectorSettings> for CacheKey {
415    fn from(value: &HttpConnectorSettings) -> Self {
416        Self {
417            connect_timeout: value.connect_timeout(),
418            read_timeout: value.read_timeout(),
419        }
420    }
421}
422
423struct HyperClient<F> {
424    connector_cache: RwLock<HashMap<CacheKey, SharedHttpConnector>>,
425    client_builder: hyper_0_14::client::Builder,
426    tcp_connector_fn: F,
427}
428
429impl<F> fmt::Debug for HyperClient<F> {
430    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
431        f.debug_struct("HyperClient")
432            .field("connector_cache", &self.connector_cache)
433            .field("client_builder", &self.client_builder)
434            .finish()
435    }
436}
437
438impl<C, F> HttpClient for HyperClient<F>
439where
440    F: Fn() -> C + Send + Sync,
441    C: Clone + Send + Sync + 'static,
442    C: hyper_0_14::service::Service<http_02x::Uri>,
443    C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
444    C::Future: Unpin + Send + 'static,
445    C::Error: Into<BoxError>,
446{
447    fn http_connector(
448        &self,
449        settings: &HttpConnectorSettings,
450        components: &RuntimeComponents,
451    ) -> SharedHttpConnector {
452        let key = CacheKey::from(settings);
453        let mut connector = self.connector_cache.read().unwrap().get(&key).cloned();
454        if connector.is_none() {
455            let mut cache = self.connector_cache.write().unwrap();
456            if !cache.contains_key(&key) {
458                let mut builder = HyperConnector::builder()
459                    .hyper_builder(self.client_builder.clone())
460                    .connector_settings(settings.clone());
461                builder.set_sleep_impl(components.sleep_impl());
462
463                let start = components.time_source().map(|ts| ts.now());
464                let tcp_connector = (self.tcp_connector_fn)();
465                let end = components.time_source().map(|ts| ts.now());
466                if let (Some(start), Some(end)) = (start, end) {
467                    if let Ok(elapsed) = end.duration_since(start) {
468                        tracing::debug!("new TCP connector created in {:?}", elapsed);
469                    }
470                }
471                let connector = SharedHttpConnector::new(builder.build(tcp_connector));
472                cache.insert(key.clone(), connector);
473            }
474            connector = cache.get(&key).cloned();
475        }
476
477        connector.expect("cache populated above")
478    }
479
480    fn validate_base_client_config(
481        &self,
482        _: &RuntimeComponentsBuilder,
483        _: &ConfigBag,
484    ) -> Result<(), BoxError> {
485        let _ = (self.tcp_connector_fn)();
491        Ok(())
492    }
493
494    fn connector_metadata(&self) -> Option<ConnectorMetadata> {
495        Some(ConnectorMetadata::new("hyper", Some(Cow::Borrowed("0.x"))))
496    }
497}
498
499#[derive(Clone, Default, Debug)]
553pub struct HyperClientBuilder {
554    client_builder: Option<hyper_0_14::client::Builder>,
555}
556
557impl HyperClientBuilder {
558    pub fn new() -> Self {
560        Self::default()
561    }
562
563    pub fn hyper_builder(mut self, hyper_builder: hyper_0_14::client::Builder) -> Self {
567        self.client_builder = Some(hyper_builder);
568        self
569    }
570
571    pub fn set_hyper_builder(
575        &mut self,
576        hyper_builder: Option<hyper_0_14::client::Builder>,
577    ) -> &mut Self {
578        self.client_builder = hyper_builder;
579        self
580    }
581
582    #[cfg(feature = "legacy-rustls-ring")]
587    pub fn build_https(self) -> SharedHttpClient {
588        self.build_with_fn(default_connector::https)
589    }
590
591    #[cfg_attr(
594        feature = "legacy-rustls-ring",
595        doc = "Use [`build_https`](HyperClientBuilder::build_https) if you don't want to provide a custom TCP connector."
596    )]
597    pub fn build<C>(self, tcp_connector: C) -> SharedHttpClient
598    where
599        C: Clone + Send + Sync + 'static,
600        C: hyper_0_14::service::Service<http_02x::Uri>,
601        C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
602        C::Future: Unpin + Send + 'static,
603        C::Error: Into<BoxError>,
604    {
605        self.build_with_fn(move || tcp_connector.clone())
606    }
607
608    fn build_with_fn<C, F>(self, tcp_connector_fn: F) -> SharedHttpClient
609    where
610        F: Fn() -> C + Send + Sync + 'static,
611        C: Clone + Send + Sync + 'static,
612        C: hyper_0_14::service::Service<http_02x::Uri>,
613        C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin + 'static,
614        C::Future: Unpin + Send + 'static,
615        C::Error: Into<BoxError>,
616    {
617        SharedHttpClient::new(HyperClient {
618            connector_cache: RwLock::new(HashMap::new()),
619            client_builder: self.client_builder.unwrap_or_default(),
620            tcp_connector_fn,
621        })
622    }
623}
624
625mod timeout_middleware {
626    use aws_smithy_async::future::timeout::{TimedOutError, Timeout};
627    use aws_smithy_async::rt::sleep::Sleep;
628    use aws_smithy_async::rt::sleep::{AsyncSleep, SharedAsyncSleep};
629    use aws_smithy_runtime_api::box_error::BoxError;
630    use pin_project_lite::pin_project;
631    use std::error::Error;
632    use std::fmt::Formatter;
633    use std::future::Future;
634    use std::pin::Pin;
635    use std::task::{Context, Poll};
636    use std::time::Duration;
637
638    #[derive(Debug)]
639    pub(crate) struct HttpTimeoutError {
640        kind: &'static str,
641        duration: Duration,
642    }
643
644    impl std::fmt::Display for HttpTimeoutError {
645        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
646            write!(
647                f,
648                "{} timeout occurred after {:?}",
649                self.kind, self.duration
650            )
651        }
652    }
653
654    impl Error for HttpTimeoutError {
655        fn source(&self) -> Option<&(dyn Error + 'static)> {
659            Some(&TimedOutError)
660        }
661    }
662
663    #[derive(Clone, Debug)]
668    pub(super) struct ConnectTimeout<I> {
669        inner: I,
670        timeout: Option<(SharedAsyncSleep, Duration)>,
671    }
672
673    impl<I> ConnectTimeout<I> {
674        pub(crate) fn new(inner: I, sleep: SharedAsyncSleep, timeout: Duration) -> Self {
678            Self {
679                inner,
680                timeout: Some((sleep, timeout)),
681            }
682        }
683
684        pub(crate) fn no_timeout(inner: I) -> Self {
685            Self {
686                inner,
687                timeout: None,
688            }
689        }
690    }
691
692    #[derive(Clone, Debug)]
693    pub(crate) struct HttpReadTimeout<I> {
694        inner: I,
695        timeout: Option<(SharedAsyncSleep, Duration)>,
696    }
697
698    impl<I> HttpReadTimeout<I> {
699        pub(crate) fn new(inner: I, sleep: SharedAsyncSleep, timeout: Duration) -> Self {
703            Self {
704                inner,
705                timeout: Some((sleep, timeout)),
706            }
707        }
708
709        pub(crate) fn no_timeout(inner: I) -> Self {
710            Self {
711                inner,
712                timeout: None,
713            }
714        }
715    }
716
717    pin_project! {
718        #[project = MaybeTimeoutFutureProj]
723        pub enum MaybeTimeoutFuture<F> {
724            Timeout {
725                #[pin]
726                timeout: Timeout<F, Sleep>,
727                error_type: &'static str,
728                duration: Duration,
729            },
730            NoTimeout {
731                #[pin]
732                future: F
733            }
734        }
735    }
736
737    impl<F, T, E> Future for MaybeTimeoutFuture<F>
738    where
739        F: Future<Output = Result<T, E>>,
740        E: Into<BoxError>,
741    {
742        type Output = Result<T, BoxError>;
743
744        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
745            let (timeout_future, kind, &mut duration) = match self.project() {
746                MaybeTimeoutFutureProj::NoTimeout { future } => {
747                    return future.poll(cx).map_err(|err| err.into());
748                }
749                MaybeTimeoutFutureProj::Timeout {
750                    timeout,
751                    error_type,
752                    duration,
753                } => (timeout, error_type, duration),
754            };
755            match timeout_future.poll(cx) {
756                Poll::Ready(Ok(response)) => Poll::Ready(response.map_err(|err| err.into())),
757                Poll::Ready(Err(_timeout)) => {
758                    Poll::Ready(Err(HttpTimeoutError { kind, duration }.into()))
759                }
760                Poll::Pending => Poll::Pending,
761            }
762        }
763    }
764
765    impl<I> hyper_0_14::service::Service<http_02x::Uri> for ConnectTimeout<I>
766    where
767        I: hyper_0_14::service::Service<http_02x::Uri>,
768        I::Error: Into<BoxError>,
769    {
770        type Response = I::Response;
771        type Error = BoxError;
772        type Future = MaybeTimeoutFuture<I::Future>;
773
774        fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
775            self.inner.poll_ready(cx).map_err(|err| err.into())
776        }
777
778        fn call(&mut self, req: http_02x::Uri) -> Self::Future {
779            match &self.timeout {
780                Some((sleep, duration)) => {
781                    let sleep = sleep.sleep(*duration);
782                    MaybeTimeoutFuture::Timeout {
783                        timeout: Timeout::new(self.inner.call(req), sleep),
784                        error_type: "HTTP connect",
785                        duration: *duration,
786                    }
787                }
788                None => MaybeTimeoutFuture::NoTimeout {
789                    future: self.inner.call(req),
790                },
791            }
792        }
793    }
794
795    impl<I, B> hyper_0_14::service::Service<http_02x::Request<B>> for HttpReadTimeout<I>
796    where
797        I: hyper_0_14::service::Service<http_02x::Request<B>, Error = hyper_0_14::Error>,
798    {
799        type Response = I::Response;
800        type Error = BoxError;
801        type Future = MaybeTimeoutFuture<I::Future>;
802
803        fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
804            self.inner.poll_ready(cx).map_err(|err| err.into())
805        }
806
807        fn call(&mut self, req: http_02x::Request<B>) -> Self::Future {
808            match &self.timeout {
809                Some((sleep, duration)) => {
810                    let sleep = sleep.sleep(*duration);
811                    MaybeTimeoutFuture::Timeout {
812                        timeout: Timeout::new(self.inner.call(req), sleep),
813                        error_type: "HTTP read",
814                        duration: *duration,
815                    }
816                }
817                None => MaybeTimeoutFuture::NoTimeout {
818                    future: self.inner.call(req),
819                },
820            }
821        }
822    }
823
824    #[cfg(test)]
825    pub(crate) mod test {
826        use crate::hyper_014::HyperConnector;
827        use aws_smithy_async::assert_elapsed;
828        use aws_smithy_async::future::never::Never;
829        use aws_smithy_async::rt::sleep::{SharedAsyncSleep, TokioSleep};
830        use aws_smithy_runtime_api::box_error::BoxError;
831        use aws_smithy_runtime_api::client::http::HttpConnectorSettings;
832        use aws_smithy_runtime_api::client::orchestrator::HttpRequest;
833        use aws_smithy_runtime_api::client::result::ConnectorError;
834        use aws_smithy_types::error::display::DisplayErrorContext;
835        use hyper_0_14::client::connect::{Connected, Connection};
836        use std::future::Future;
837        use std::pin::Pin;
838        use std::task::{Context, Poll};
839        use std::time::Duration;
840        use tokio::io::ReadBuf;
841        use tokio::io::{AsyncRead, AsyncWrite};
842        use tokio::net::TcpStream;
843
844        #[allow(unused)]
845        fn connect_timeout_is_correct<T: Send + Sync + Clone + 'static>() {
846            is_send_sync::<super::ConnectTimeout<T>>();
847        }
848
849        #[allow(unused)]
850        fn is_send_sync<T: Send + Sync>() {}
851
852        #[non_exhaustive]
856        #[derive(Clone, Default, Debug)]
857        pub(crate) struct NeverConnects;
858        impl hyper_0_14::service::Service<http_02x::Uri> for NeverConnects {
859            type Response = TcpStream;
860            type Error = ConnectorError;
861            type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
862
863            fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
864                Poll::Ready(Ok(()))
865            }
866
867            fn call(&mut self, _uri: http_02x::Uri) -> Self::Future {
868                Box::pin(async move {
869                    Never::new().await;
870                    unreachable!()
871                })
872            }
873        }
874
875        #[derive(Clone, Debug, Default)]
877        struct NeverReplies;
878        impl hyper_0_14::service::Service<http_02x::Uri> for NeverReplies {
879            type Response = EmptyStream;
880            type Error = BoxError;
881            type Future = std::future::Ready<Result<Self::Response, Self::Error>>;
882
883            fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
884                Poll::Ready(Ok(()))
885            }
886
887            fn call(&mut self, _req: http_02x::Uri) -> Self::Future {
888                std::future::ready(Ok(EmptyStream))
889            }
890        }
891
892        #[non_exhaustive]
894        #[derive(Debug, Default)]
895        struct EmptyStream;
896        impl AsyncRead for EmptyStream {
897            fn poll_read(
898                self: Pin<&mut Self>,
899                _cx: &mut Context<'_>,
900                _buf: &mut ReadBuf<'_>,
901            ) -> Poll<std::io::Result<()>> {
902                Poll::Pending
903            }
904        }
905        impl AsyncWrite for EmptyStream {
906            fn poll_write(
907                self: Pin<&mut Self>,
908                _cx: &mut Context<'_>,
909                _buf: &[u8],
910            ) -> Poll<Result<usize, std::io::Error>> {
911                Poll::Pending
912            }
913
914            fn poll_flush(
915                self: Pin<&mut Self>,
916                _cx: &mut Context<'_>,
917            ) -> Poll<Result<(), std::io::Error>> {
918                Poll::Pending
919            }
920
921            fn poll_shutdown(
922                self: Pin<&mut Self>,
923                _cx: &mut Context<'_>,
924            ) -> Poll<Result<(), std::io::Error>> {
925                Poll::Pending
926            }
927        }
928        impl Connection for EmptyStream {
929            fn connected(&self) -> Connected {
930                Connected::new()
931            }
932        }
933
934        #[tokio::test]
935        async fn http_connect_timeout_works() {
936            let tcp_connector = NeverConnects::default();
937            let connector_settings = HttpConnectorSettings::builder()
938                .connect_timeout(Duration::from_secs(1))
939                .build();
940            let hyper = HyperConnector::builder()
941                .connector_settings(connector_settings)
942                .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
943                .build(tcp_connector)
944                .adapter;
945            let now = tokio::time::Instant::now();
946            tokio::time::pause();
947            let resp = hyper
948                .call(HttpRequest::get("https://static-uri.com").unwrap())
949                .await
950                .unwrap_err();
951            assert!(
952                resp.is_timeout(),
953                "expected resp.is_timeout() to be true but it was false, resp == {:?}",
954                resp
955            );
956            let message = DisplayErrorContext(&resp).to_string();
957            let expected =
958                "timeout: error trying to connect: HTTP connect timeout occurred after 1s";
959            assert!(
960                message.contains(expected),
961                "expected '{message}' to contain '{expected}'"
962            );
963            assert_elapsed!(now, Duration::from_secs(1));
964        }
965
966        #[tokio::test]
967        async fn http_read_timeout_works() {
968            let tcp_connector = NeverReplies;
969            let connector_settings = HttpConnectorSettings::builder()
970                .connect_timeout(Duration::from_secs(1))
971                .read_timeout(Duration::from_secs(2))
972                .build();
973            let hyper = HyperConnector::builder()
974                .connector_settings(connector_settings)
975                .sleep_impl(SharedAsyncSleep::new(TokioSleep::new()))
976                .build(tcp_connector)
977                .adapter;
978            let now = tokio::time::Instant::now();
979            tokio::time::pause();
980            let err = hyper
981                .call(HttpRequest::get("https://fake-uri.com").unwrap())
982                .await
983                .unwrap_err();
984            assert!(
985                err.is_timeout(),
986                "expected err.is_timeout() to be true but it was false, err == {err:?}",
987            );
988            let message = format!("{}", DisplayErrorContext(&err));
989            let expected = "timeout: HTTP read timeout occurred after 2s";
990            assert!(
991                message.contains(expected),
992                "expected '{message}' to contain '{expected}'"
993            );
994            assert_elapsed!(now, Duration::from_secs(2));
995        }
996    }
997}
998
999#[cfg(test)]
1000mod test {
1001    use crate::hyper_legacy::timeout_middleware::test::NeverConnects;
1002    use crate::hyper_legacy::{HyperClientBuilder, HyperConnector};
1003    use aws_smithy_async::time::SystemTimeSource;
1004    use aws_smithy_runtime_api::box_error::BoxError;
1005    use aws_smithy_runtime_api::client::http::{HttpClient, HttpConnectorSettings};
1006    use aws_smithy_runtime_api::client::orchestrator::HttpRequest;
1007    use aws_smithy_runtime_api::client::runtime_components::RuntimeComponentsBuilder;
1008    use hyper_0_14::client::connect::{Connected, Connection};
1009    use std::io::{Error, ErrorKind};
1010    use std::pin::Pin;
1011    use std::sync::atomic::{AtomicU32, Ordering};
1012    use std::sync::Arc;
1013    use std::task::{Context, Poll};
1014    use std::time::Duration;
1015    use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
1016
1017    #[tokio::test]
1018    async fn connector_selection() {
1019        let creation_count = Arc::new(AtomicU32::new(0));
1021        let http_client = HyperClientBuilder::new().build_with_fn({
1022            let count = creation_count.clone();
1023            move || {
1024                count.fetch_add(1, Ordering::Relaxed);
1025                NeverConnects::default()
1026            }
1027        });
1028
1029        let settings = [
1031            HttpConnectorSettings::builder()
1032                .connect_timeout(Duration::from_secs(3))
1033                .build(),
1034            HttpConnectorSettings::builder()
1035                .read_timeout(Duration::from_secs(3))
1036                .build(),
1037            HttpConnectorSettings::builder()
1038                .connect_timeout(Duration::from_secs(3))
1039                .read_timeout(Duration::from_secs(3))
1040                .build(),
1041            HttpConnectorSettings::builder()
1042                .connect_timeout(Duration::from_secs(5))
1043                .read_timeout(Duration::from_secs(3))
1044                .build(),
1045        ];
1046
1047        let components = RuntimeComponentsBuilder::for_tests()
1049            .with_time_source(Some(SystemTimeSource::new()))
1050            .build()
1051            .unwrap();
1052        let mut handles = Vec::new();
1053        for setting in &settings {
1054            for _ in 0..1000 {
1055                let client = http_client.clone();
1056                handles.push(tokio::spawn({
1057                    let setting = setting.clone();
1058                    let components = components.clone();
1059                    async move {
1060                        let _ = client.http_connector(&setting, &components);
1061                    }
1062                }));
1063            }
1064        }
1065        for handle in handles {
1066            handle.await.unwrap();
1067        }
1068
1069        assert_eq!(4, creation_count.load(Ordering::Relaxed));
1071    }
1072
1073    #[tokio::test]
1074    async fn hyper_io_error() {
1075        let connector = TestConnection {
1076            inner: HangupStream,
1077        };
1078        let adapter = HyperConnector::builder().build(connector).adapter;
1079        let err = adapter
1080            .call(HttpRequest::get("https://socket-hangup.com").unwrap())
1081            .await
1082            .expect_err("socket hangup");
1083        assert!(err.is_io(), "{:?}", err);
1084    }
1085
1086    #[derive(Clone)]
1088    struct HangupStream;
1089
1090    impl Connection for HangupStream {
1091        fn connected(&self) -> Connected {
1092            Connected::new()
1093        }
1094    }
1095
1096    impl AsyncRead for HangupStream {
1097        fn poll_read(
1098            self: Pin<&mut Self>,
1099            _cx: &mut Context<'_>,
1100            _buf: &mut ReadBuf<'_>,
1101        ) -> Poll<std::io::Result<()>> {
1102            Poll::Ready(Err(Error::new(
1103                ErrorKind::ConnectionReset,
1104                "connection reset",
1105            )))
1106        }
1107    }
1108
1109    impl AsyncWrite for HangupStream {
1110        fn poll_write(
1111            self: Pin<&mut Self>,
1112            _cx: &mut Context<'_>,
1113            _buf: &[u8],
1114        ) -> Poll<Result<usize, Error>> {
1115            Poll::Pending
1116        }
1117
1118        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
1119            Poll::Pending
1120        }
1121
1122        fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
1123            Poll::Pending
1124        }
1125    }
1126
1127    #[derive(Clone)]
1128    struct TestConnection<T> {
1129        inner: T,
1130    }
1131
1132    impl<T> hyper_0_14::service::Service<http_02x::Uri> for TestConnection<T>
1133    where
1134        T: Clone + Connection,
1135    {
1136        type Response = T;
1137        type Error = BoxError;
1138        type Future = std::future::Ready<Result<Self::Response, Self::Error>>;
1139
1140        fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1141            Poll::Ready(Ok(()))
1142        }
1143
1144        fn call(&mut self, _req: http_02x::Uri) -> Self::Future {
1145            std::future::ready(Ok(self.inner.clone()))
1146        }
1147    }
1148}