aws_smithy_runtime/client/
defaults.rs

1/*
2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6//! Runtime plugins that provide defaults for clients.
7//!
8//! Note: these are the absolute base-level defaults. They may not be the defaults
9//! for _your_ client, since many things can change these defaults on the way to
10//! code generating and constructing a full client.
11
12use crate::client::http::body::content_length_enforcement::EnforceContentLengthRuntimePlugin;
13use crate::client::identity::IdentityCache;
14use crate::client::retries::strategy::StandardRetryStrategy;
15use crate::client::retries::RetryPartition;
16use aws_smithy_async::rt::sleep::default_async_sleep;
17use aws_smithy_async::time::SystemTimeSource;
18use aws_smithy_runtime_api::box_error::BoxError;
19use aws_smithy_runtime_api::client::behavior_version::BehaviorVersion;
20use aws_smithy_runtime_api::client::http::SharedHttpClient;
21use aws_smithy_runtime_api::client::runtime_components::{
22    RuntimeComponentsBuilder, SharedConfigValidator,
23};
24use aws_smithy_runtime_api::client::runtime_plugin::{
25    Order, SharedRuntimePlugin, StaticRuntimePlugin,
26};
27use aws_smithy_runtime_api::client::stalled_stream_protection::StalledStreamProtectionConfig;
28use aws_smithy_runtime_api::shared::IntoShared;
29use aws_smithy_types::config_bag::{ConfigBag, FrozenLayer, Layer};
30use aws_smithy_types::retry::RetryConfig;
31use aws_smithy_types::timeout::TimeoutConfig;
32use std::borrow::Cow;
33use std::time::Duration;
34
35fn default_plugin<CompFn>(name: &'static str, components_fn: CompFn) -> StaticRuntimePlugin
36where
37    CompFn: FnOnce(RuntimeComponentsBuilder) -> RuntimeComponentsBuilder,
38{
39    StaticRuntimePlugin::new()
40        .with_order(Order::Defaults)
41        .with_runtime_components((components_fn)(RuntimeComponentsBuilder::new(name)))
42}
43
44fn layer<LayerFn>(name: &'static str, layer_fn: LayerFn) -> FrozenLayer
45where
46    LayerFn: FnOnce(&mut Layer),
47{
48    let mut layer = Layer::new(name);
49    (layer_fn)(&mut layer);
50    layer.freeze()
51}
52
53/// Runtime plugin that provides a default connector.
54pub fn default_http_client_plugin() -> Option<SharedRuntimePlugin> {
55    let _default: Option<SharedHttpClient> = None;
56    #[cfg(feature = "connector-hyper-0-14-x")]
57    let _default = crate::client::http::hyper_014::default_client();
58
59    _default.map(|default| {
60        default_plugin("default_http_client_plugin", |components| {
61            components.with_http_client(Some(default))
62        })
63        .into_shared()
64    })
65}
66
67/// Runtime plugin that provides a default async sleep implementation.
68pub fn default_sleep_impl_plugin() -> Option<SharedRuntimePlugin> {
69    default_async_sleep().map(|default| {
70        default_plugin("default_sleep_impl_plugin", |components| {
71            components.with_sleep_impl(Some(default))
72        })
73        .into_shared()
74    })
75}
76
77/// Runtime plugin that provides a default time source.
78pub fn default_time_source_plugin() -> Option<SharedRuntimePlugin> {
79    Some(
80        default_plugin("default_time_source_plugin", |components| {
81            components.with_time_source(Some(SystemTimeSource::new()))
82        })
83        .into_shared(),
84    )
85}
86
87/// Runtime plugin that sets the default retry strategy, config (disabled), and partition.
88pub fn default_retry_config_plugin(
89    default_partition_name: impl Into<Cow<'static, str>>,
90) -> Option<SharedRuntimePlugin> {
91    Some(
92        default_plugin("default_retry_config_plugin", |components| {
93            components
94                .with_retry_strategy(Some(StandardRetryStrategy::new()))
95                .with_config_validator(SharedConfigValidator::base_client_config_fn(
96                    validate_retry_config,
97                ))
98        })
99        .with_config(layer("default_retry_config", |layer| {
100            layer.store_put(RetryConfig::disabled());
101            layer.store_put(RetryPartition::new(default_partition_name));
102        }))
103        .into_shared(),
104    )
105}
106
107fn validate_retry_config(
108    components: &RuntimeComponentsBuilder,
109    cfg: &ConfigBag,
110) -> Result<(), BoxError> {
111    if let Some(retry_config) = cfg.load::<RetryConfig>() {
112        if retry_config.has_retry() && components.sleep_impl().is_none() {
113            Err("An async sleep implementation is required for retry to work. Please provide a `sleep_impl` on \
114                 the config, or disable timeouts.".into())
115        } else {
116            Ok(())
117        }
118    } else {
119        Err(
120            "The default retry config was removed, and no other config was put in its place."
121                .into(),
122        )
123    }
124}
125
126/// Runtime plugin that sets the default timeout config (no timeouts).
127pub fn default_timeout_config_plugin() -> Option<SharedRuntimePlugin> {
128    Some(
129        default_plugin("default_timeout_config_plugin", |components| {
130            components.with_config_validator(SharedConfigValidator::base_client_config_fn(
131                validate_timeout_config,
132            ))
133        })
134        .with_config(layer("default_timeout_config", |layer| {
135            layer.store_put(TimeoutConfig::disabled());
136        }))
137        .into_shared(),
138    )
139}
140
141fn validate_timeout_config(
142    components: &RuntimeComponentsBuilder,
143    cfg: &ConfigBag,
144) -> Result<(), BoxError> {
145    if let Some(timeout_config) = cfg.load::<TimeoutConfig>() {
146        if timeout_config.has_timeouts() && components.sleep_impl().is_none() {
147            Err("An async sleep implementation is required for timeouts to work. Please provide a `sleep_impl` on \
148                 the config, or disable timeouts.".into())
149        } else {
150            Ok(())
151        }
152    } else {
153        Err(
154            "The default timeout config was removed, and no other config was put in its place."
155                .into(),
156        )
157    }
158}
159
160/// Runtime plugin that registers the default identity cache implementation.
161pub fn default_identity_cache_plugin() -> Option<SharedRuntimePlugin> {
162    Some(
163        default_plugin("default_identity_cache_plugin", |components| {
164            components.with_identity_cache(Some(IdentityCache::lazy().build()))
165        })
166        .into_shared(),
167    )
168}
169
170/// Runtime plugin that sets the default stalled stream protection config.
171///
172/// By default, when throughput falls below 1/Bs for more than 5 seconds, the
173/// stream is cancelled.
174#[deprecated(
175    since = "1.2.0",
176    note = "This function wasn't intended to be public, and didn't take the behavior major version as an argument, so it couldn't be evolved over time."
177)]
178pub fn default_stalled_stream_protection_config_plugin() -> Option<SharedRuntimePlugin> {
179    #[allow(deprecated)]
180    default_stalled_stream_protection_config_plugin_v2(BehaviorVersion::v2023_11_09())
181}
182fn default_stalled_stream_protection_config_plugin_v2(
183    behavior_version: BehaviorVersion,
184) -> Option<SharedRuntimePlugin> {
185    Some(
186        default_plugin(
187            "default_stalled_stream_protection_config_plugin",
188            |components| {
189                components.with_config_validator(SharedConfigValidator::base_client_config_fn(
190                    validate_stalled_stream_protection_config,
191                ))
192            },
193        )
194        .with_config(layer("default_stalled_stream_protection_config", |layer| {
195            let mut config =
196                StalledStreamProtectionConfig::enabled().grace_period(Duration::from_secs(5));
197            // Before v2024_03_28, upload streams did not have stalled stream protection by default
198            if !behavior_version.is_at_least(BehaviorVersion::v2024_03_28()) {
199                config = config.upload_enabled(false);
200            }
201            layer.store_put(config.build());
202        }))
203        .into_shared(),
204    )
205}
206
207fn enforce_content_length_runtime_plugin() -> Option<SharedRuntimePlugin> {
208    Some(EnforceContentLengthRuntimePlugin::new().into_shared())
209}
210
211fn validate_stalled_stream_protection_config(
212    components: &RuntimeComponentsBuilder,
213    cfg: &ConfigBag,
214) -> Result<(), BoxError> {
215    if let Some(stalled_stream_protection_config) = cfg.load::<StalledStreamProtectionConfig>() {
216        if stalled_stream_protection_config.is_enabled() {
217            if components.sleep_impl().is_none() {
218                return Err(
219                    "An async sleep implementation is required for stalled stream protection to work. \
220                     Please provide a `sleep_impl` on the config, or disable stalled stream protection.".into());
221            }
222
223            if components.time_source().is_none() {
224                return Err(
225                    "A time source is required for stalled stream protection to work.\
226                     Please provide a `time_source` on the config, or disable stalled stream protection.".into());
227            }
228        }
229
230        Ok(())
231    } else {
232        Err(
233            "The default stalled stream protection config was removed, and no other config was put in its place."
234                .into(),
235        )
236    }
237}
238
239/// Arguments for the [`default_plugins`] method.
240///
241/// This is a struct to enable adding new parameters in the future without breaking the API.
242#[non_exhaustive]
243#[derive(Debug, Default)]
244pub struct DefaultPluginParams {
245    retry_partition_name: Option<Cow<'static, str>>,
246    behavior_version: Option<BehaviorVersion>,
247}
248
249impl DefaultPluginParams {
250    /// Creates a new [`DefaultPluginParams`].
251    pub fn new() -> Self {
252        Default::default()
253    }
254
255    /// Sets the retry partition name.
256    pub fn with_retry_partition_name(mut self, name: impl Into<Cow<'static, str>>) -> Self {
257        self.retry_partition_name = Some(name.into());
258        self
259    }
260
261    /// Sets the behavior major version.
262    pub fn with_behavior_version(mut self, version: BehaviorVersion) -> Self {
263        self.behavior_version = Some(version);
264        self
265    }
266}
267
268/// All default plugins.
269pub fn default_plugins(
270    params: DefaultPluginParams,
271) -> impl IntoIterator<Item = SharedRuntimePlugin> {
272    let behavior_version = params
273        .behavior_version
274        .unwrap_or_else(BehaviorVersion::latest);
275
276    [
277        default_http_client_plugin(),
278        default_identity_cache_plugin(),
279        default_retry_config_plugin(
280            params
281                .retry_partition_name
282                .expect("retry_partition_name is required"),
283        ),
284        default_sleep_impl_plugin(),
285        default_time_source_plugin(),
286        default_timeout_config_plugin(),
287        enforce_content_length_runtime_plugin(),
288        default_stalled_stream_protection_config_plugin_v2(behavior_version),
289    ]
290    .into_iter()
291    .flatten()
292    .collect::<Vec<SharedRuntimePlugin>>()
293}
294
295#[cfg(test)]
296mod tests {
297    use super::*;
298    use aws_smithy_runtime_api::client::runtime_plugin::RuntimePlugins;
299
300    fn test_plugin_params(version: BehaviorVersion) -> DefaultPluginParams {
301        DefaultPluginParams::new()
302            .with_behavior_version(version)
303            .with_retry_partition_name("dontcare")
304    }
305    fn config_for(plugins: impl IntoIterator<Item = SharedRuntimePlugin>) -> ConfigBag {
306        let mut config = ConfigBag::base();
307        let plugins = RuntimePlugins::new().with_client_plugins(plugins);
308        plugins.apply_client_configuration(&mut config).unwrap();
309        config
310    }
311
312    #[test]
313    #[allow(deprecated)]
314    fn v2024_03_28_stalled_stream_protection_difference() {
315        let latest = config_for(default_plugins(test_plugin_params(
316            BehaviorVersion::latest(),
317        )));
318        let v2023 = config_for(default_plugins(test_plugin_params(
319            BehaviorVersion::v2023_11_09(),
320        )));
321
322        assert!(
323            latest
324                .load::<StalledStreamProtectionConfig>()
325                .unwrap()
326                .upload_enabled(),
327            "stalled stream protection on uploads MUST be enabled after v2024_03_28"
328        );
329        assert!(
330            !v2023
331                .load::<StalledStreamProtectionConfig>()
332                .unwrap()
333                .upload_enabled(),
334            "stalled stream protection on uploads MUST NOT be enabled before v2024_03_28"
335        );
336    }
337}