aws_smithy_runtime/client/
defaults.rs
1use crate::client::http::body::content_length_enforcement::EnforceContentLengthRuntimePlugin;
13use crate::client::identity::IdentityCache;
14use crate::client::retries::strategy::StandardRetryStrategy;
15use crate::client::retries::RetryPartition;
16use aws_smithy_async::rt::sleep::default_async_sleep;
17use aws_smithy_async::time::SystemTimeSource;
18use aws_smithy_runtime_api::box_error::BoxError;
19use aws_smithy_runtime_api::client::behavior_version::BehaviorVersion;
20use aws_smithy_runtime_api::client::http::SharedHttpClient;
21use aws_smithy_runtime_api::client::runtime_components::{
22 RuntimeComponentsBuilder, SharedConfigValidator,
23};
24use aws_smithy_runtime_api::client::runtime_plugin::{
25 Order, SharedRuntimePlugin, StaticRuntimePlugin,
26};
27use aws_smithy_runtime_api::client::stalled_stream_protection::StalledStreamProtectionConfig;
28use aws_smithy_runtime_api::shared::IntoShared;
29use aws_smithy_types::config_bag::{ConfigBag, FrozenLayer, Layer};
30use aws_smithy_types::retry::RetryConfig;
31use aws_smithy_types::timeout::TimeoutConfig;
32use std::borrow::Cow;
33use std::time::Duration;
34
35fn default_plugin<CompFn>(name: &'static str, components_fn: CompFn) -> StaticRuntimePlugin
36where
37 CompFn: FnOnce(RuntimeComponentsBuilder) -> RuntimeComponentsBuilder,
38{
39 StaticRuntimePlugin::new()
40 .with_order(Order::Defaults)
41 .with_runtime_components((components_fn)(RuntimeComponentsBuilder::new(name)))
42}
43
44fn layer<LayerFn>(name: &'static str, layer_fn: LayerFn) -> FrozenLayer
45where
46 LayerFn: FnOnce(&mut Layer),
47{
48 let mut layer = Layer::new(name);
49 (layer_fn)(&mut layer);
50 layer.freeze()
51}
52
53pub fn default_http_client_plugin() -> Option<SharedRuntimePlugin> {
55 let _default: Option<SharedHttpClient> = None;
56 #[cfg(feature = "connector-hyper-0-14-x")]
57 let _default = crate::client::http::hyper_014::default_client();
58
59 _default.map(|default| {
60 default_plugin("default_http_client_plugin", |components| {
61 components.with_http_client(Some(default))
62 })
63 .into_shared()
64 })
65}
66
67pub fn default_sleep_impl_plugin() -> Option<SharedRuntimePlugin> {
69 default_async_sleep().map(|default| {
70 default_plugin("default_sleep_impl_plugin", |components| {
71 components.with_sleep_impl(Some(default))
72 })
73 .into_shared()
74 })
75}
76
77pub fn default_time_source_plugin() -> Option<SharedRuntimePlugin> {
79 Some(
80 default_plugin("default_time_source_plugin", |components| {
81 components.with_time_source(Some(SystemTimeSource::new()))
82 })
83 .into_shared(),
84 )
85}
86
87pub fn default_retry_config_plugin(
89 default_partition_name: impl Into<Cow<'static, str>>,
90) -> Option<SharedRuntimePlugin> {
91 Some(
92 default_plugin("default_retry_config_plugin", |components| {
93 components
94 .with_retry_strategy(Some(StandardRetryStrategy::new()))
95 .with_config_validator(SharedConfigValidator::base_client_config_fn(
96 validate_retry_config,
97 ))
98 })
99 .with_config(layer("default_retry_config", |layer| {
100 layer.store_put(RetryConfig::disabled());
101 layer.store_put(RetryPartition::new(default_partition_name));
102 }))
103 .into_shared(),
104 )
105}
106
107fn validate_retry_config(
108 components: &RuntimeComponentsBuilder,
109 cfg: &ConfigBag,
110) -> Result<(), BoxError> {
111 if let Some(retry_config) = cfg.load::<RetryConfig>() {
112 if retry_config.has_retry() && components.sleep_impl().is_none() {
113 Err("An async sleep implementation is required for retry to work. Please provide a `sleep_impl` on \
114 the config, or disable timeouts.".into())
115 } else {
116 Ok(())
117 }
118 } else {
119 Err(
120 "The default retry config was removed, and no other config was put in its place."
121 .into(),
122 )
123 }
124}
125
126pub fn default_timeout_config_plugin() -> Option<SharedRuntimePlugin> {
128 Some(
129 default_plugin("default_timeout_config_plugin", |components| {
130 components.with_config_validator(SharedConfigValidator::base_client_config_fn(
131 validate_timeout_config,
132 ))
133 })
134 .with_config(layer("default_timeout_config", |layer| {
135 layer.store_put(TimeoutConfig::disabled());
136 }))
137 .into_shared(),
138 )
139}
140
141fn validate_timeout_config(
142 components: &RuntimeComponentsBuilder,
143 cfg: &ConfigBag,
144) -> Result<(), BoxError> {
145 if let Some(timeout_config) = cfg.load::<TimeoutConfig>() {
146 if timeout_config.has_timeouts() && components.sleep_impl().is_none() {
147 Err("An async sleep implementation is required for timeouts to work. Please provide a `sleep_impl` on \
148 the config, or disable timeouts.".into())
149 } else {
150 Ok(())
151 }
152 } else {
153 Err(
154 "The default timeout config was removed, and no other config was put in its place."
155 .into(),
156 )
157 }
158}
159
160pub fn default_identity_cache_plugin() -> Option<SharedRuntimePlugin> {
162 Some(
163 default_plugin("default_identity_cache_plugin", |components| {
164 components.with_identity_cache(Some(IdentityCache::lazy().build()))
165 })
166 .into_shared(),
167 )
168}
169
170#[deprecated(
175 since = "1.2.0",
176 note = "This function wasn't intended to be public, and didn't take the behavior major version as an argument, so it couldn't be evolved over time."
177)]
178pub fn default_stalled_stream_protection_config_plugin() -> Option<SharedRuntimePlugin> {
179 #[allow(deprecated)]
180 default_stalled_stream_protection_config_plugin_v2(BehaviorVersion::v2023_11_09())
181}
182fn default_stalled_stream_protection_config_plugin_v2(
183 behavior_version: BehaviorVersion,
184) -> Option<SharedRuntimePlugin> {
185 Some(
186 default_plugin(
187 "default_stalled_stream_protection_config_plugin",
188 |components| {
189 components.with_config_validator(SharedConfigValidator::base_client_config_fn(
190 validate_stalled_stream_protection_config,
191 ))
192 },
193 )
194 .with_config(layer("default_stalled_stream_protection_config", |layer| {
195 let mut config =
196 StalledStreamProtectionConfig::enabled().grace_period(Duration::from_secs(5));
197 if !behavior_version.is_at_least(BehaviorVersion::v2024_03_28()) {
199 config = config.upload_enabled(false);
200 }
201 layer.store_put(config.build());
202 }))
203 .into_shared(),
204 )
205}
206
207fn enforce_content_length_runtime_plugin() -> Option<SharedRuntimePlugin> {
208 Some(EnforceContentLengthRuntimePlugin::new().into_shared())
209}
210
211fn validate_stalled_stream_protection_config(
212 components: &RuntimeComponentsBuilder,
213 cfg: &ConfigBag,
214) -> Result<(), BoxError> {
215 if let Some(stalled_stream_protection_config) = cfg.load::<StalledStreamProtectionConfig>() {
216 if stalled_stream_protection_config.is_enabled() {
217 if components.sleep_impl().is_none() {
218 return Err(
219 "An async sleep implementation is required for stalled stream protection to work. \
220 Please provide a `sleep_impl` on the config, or disable stalled stream protection.".into());
221 }
222
223 if components.time_source().is_none() {
224 return Err(
225 "A time source is required for stalled stream protection to work.\
226 Please provide a `time_source` on the config, or disable stalled stream protection.".into());
227 }
228 }
229
230 Ok(())
231 } else {
232 Err(
233 "The default stalled stream protection config was removed, and no other config was put in its place."
234 .into(),
235 )
236 }
237}
238
239#[non_exhaustive]
243#[derive(Debug, Default)]
244pub struct DefaultPluginParams {
245 retry_partition_name: Option<Cow<'static, str>>,
246 behavior_version: Option<BehaviorVersion>,
247}
248
249impl DefaultPluginParams {
250 pub fn new() -> Self {
252 Default::default()
253 }
254
255 pub fn with_retry_partition_name(mut self, name: impl Into<Cow<'static, str>>) -> Self {
257 self.retry_partition_name = Some(name.into());
258 self
259 }
260
261 pub fn with_behavior_version(mut self, version: BehaviorVersion) -> Self {
263 self.behavior_version = Some(version);
264 self
265 }
266}
267
268pub fn default_plugins(
270 params: DefaultPluginParams,
271) -> impl IntoIterator<Item = SharedRuntimePlugin> {
272 let behavior_version = params
273 .behavior_version
274 .unwrap_or_else(BehaviorVersion::latest);
275
276 [
277 default_http_client_plugin(),
278 default_identity_cache_plugin(),
279 default_retry_config_plugin(
280 params
281 .retry_partition_name
282 .expect("retry_partition_name is required"),
283 ),
284 default_sleep_impl_plugin(),
285 default_time_source_plugin(),
286 default_timeout_config_plugin(),
287 enforce_content_length_runtime_plugin(),
288 default_stalled_stream_protection_config_plugin_v2(behavior_version),
289 ]
290 .into_iter()
291 .flatten()
292 .collect::<Vec<SharedRuntimePlugin>>()
293}
294
295#[cfg(test)]
296mod tests {
297 use super::*;
298 use aws_smithy_runtime_api::client::runtime_plugin::RuntimePlugins;
299
300 fn test_plugin_params(version: BehaviorVersion) -> DefaultPluginParams {
301 DefaultPluginParams::new()
302 .with_behavior_version(version)
303 .with_retry_partition_name("dontcare")
304 }
305 fn config_for(plugins: impl IntoIterator<Item = SharedRuntimePlugin>) -> ConfigBag {
306 let mut config = ConfigBag::base();
307 let plugins = RuntimePlugins::new().with_client_plugins(plugins);
308 plugins.apply_client_configuration(&mut config).unwrap();
309 config
310 }
311
312 #[test]
313 #[allow(deprecated)]
314 fn v2024_03_28_stalled_stream_protection_difference() {
315 let latest = config_for(default_plugins(test_plugin_params(
316 BehaviorVersion::latest(),
317 )));
318 let v2023 = config_for(default_plugins(test_plugin_params(
319 BehaviorVersion::v2023_11_09(),
320 )));
321
322 assert!(
323 latest
324 .load::<StalledStreamProtectionConfig>()
325 .unwrap()
326 .upload_enabled(),
327 "stalled stream protection on uploads MUST be enabled after v2024_03_28"
328 );
329 assert!(
330 !v2023
331 .load::<StalledStreamProtectionConfig>()
332 .unwrap()
333 .upload_enabled(),
334 "stalled stream protection on uploads MUST NOT be enabled before v2024_03_28"
335 );
336 }
337}