1use std::sync::{Mutex, OnceLock};
7use std::time::{Duration, SystemTime};
8
9use tokio::sync::OwnedSemaphorePermit;
10use tracing::{debug, trace};
11
12use aws_smithy_runtime_api::box_error::BoxError;
13use aws_smithy_runtime_api::client::interceptors::context::{
14 BeforeTransmitInterceptorContextMut, InterceptorContext,
15};
16use aws_smithy_runtime_api::client::interceptors::{dyn_dispatch_hint, Intercept};
17use aws_smithy_runtime_api::client::retries::classifiers::{RetryAction, RetryReason};
18use aws_smithy_runtime_api::client::retries::{RequestAttempts, RetryStrategy, ShouldAttempt};
19use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
20use aws_smithy_types::config_bag::{ConfigBag, Layer, Storable, StoreReplace};
21use aws_smithy_types::retry::{ErrorKind, RetryConfig, RetryMode, RetrySpec};
22
23use crate::client::retries::classifiers::run_classifiers_on_ctx;
24use crate::client::retries::client_rate_limiter::{ClientRateLimiter, RequestReason};
25use crate::client::retries::strategy::standard::ReleaseResult::{
26 APermitWasReleased, NoPermitWasReleased,
27};
28use crate::client::retries::token_bucket::TokenBucket;
29use crate::client::retries::{
30 ClientRateLimiterPartition, LongPollingBackoff, RetryPartition, RetryPartitionInner,
31};
32use crate::static_partition_map::StaticPartitionMap;
33
34static CLIENT_RATE_LIMITER: StaticPartitionMap<ClientRateLimiterPartition, ClientRateLimiter> =
35 StaticPartitionMap::new();
36
37static TOKEN_BUCKET: StaticPartitionMap<RetryPartition, TokenBucket> = StaticPartitionMap::new();
39
40#[derive(Debug, Default)]
42pub struct StandardRetryStrategy {
43 retry_permit: Mutex<Option<OwnedSemaphorePermit>>,
44}
45
46impl Storable for StandardRetryStrategy {
47 type Storer = StoreReplace<Self>;
48}
49
50impl StandardRetryStrategy {
51 pub fn new() -> Self {
53 Default::default()
54 }
55
56 fn release_retry_permit(&self, token_bucket: &TokenBucket) -> ReleaseResult {
57 let mut retry_permit = self.retry_permit.lock().unwrap();
58 match retry_permit.take() {
59 Some(p) => {
60 if token_bucket.success_reward() > 0.0 {
62 token_bucket.reward_success();
63 p.forget();
64 } else {
65 drop(p); }
67 APermitWasReleased
68 }
69 None => {
70 if token_bucket.success_reward() > 0.0 {
72 token_bucket.reward_success();
73 } else {
74 token_bucket.regenerate_a_token();
75 }
76 NoPermitWasReleased
77 }
78 }
79 }
80
81 fn set_retry_permit(&self, new_retry_permit: OwnedSemaphorePermit) {
82 let mut old_retry_permit = self.retry_permit.lock().unwrap();
83 if let Some(p) = old_retry_permit.replace(new_retry_permit) {
84 p.forget()
87 }
88 }
89
90 fn adaptive_retry_rate_limiter(
92 runtime_components: &RuntimeComponents,
93 cfg: &ConfigBag,
94 ) -> Option<ClientRateLimiter> {
95 let retry_config = cfg.load::<RetryConfig>().expect("retry config is required");
96 if retry_config.mode() == RetryMode::Adaptive {
97 if let Some(time_source) = runtime_components.time_source() {
98 let retry_partition = cfg.load::<RetryPartition>().expect("set in default config");
99 let seconds_since_unix_epoch = time_source
100 .now()
101 .duration_since(SystemTime::UNIX_EPOCH)
102 .expect("the present takes place after the UNIX_EPOCH")
103 .as_secs_f64();
104 let client_rate_limiter = match &retry_partition.inner {
105 RetryPartitionInner::Default(_) => {
106 let client_rate_limiter_partition =
107 ClientRateLimiterPartition::new(retry_partition.clone());
108 CLIENT_RATE_LIMITER.get_or_init(client_rate_limiter_partition, || {
109 ClientRateLimiter::new(seconds_since_unix_epoch)
110 })
111 }
112 RetryPartitionInner::Custom {
113 client_rate_limiter,
114 ..
115 } => client_rate_limiter.clone(),
116 };
117 return Some(client_rate_limiter);
118 }
119 }
120 None
121 }
122
123 fn calculate_backoff(
124 &self,
125 runtime_components: &RuntimeComponents,
126 cfg: &ConfigBag,
127 retry_cfg: &RetryConfig,
128 retry_reason: &RetryAction,
129 ) -> Result<Duration, ShouldAttempt> {
130 let request_attempts = cfg
131 .load::<RequestAttempts>()
132 .expect("at least one request attempt is made before any retry is attempted")
133 .attempts();
134
135 match retry_reason {
136 RetryAction::RetryIndicated(RetryReason::RetryableError { kind, retry_after }) => {
137 let initial_backoff = if *kind != ErrorKind::ThrottlingError {
138 retry_cfg
139 .retry_spec()
140 .map(|s| s.non_throttling_initial_backoff())
141 .unwrap_or(retry_cfg.initial_backoff())
142 .as_secs_f64()
143 } else {
144 retry_cfg.initial_backoff().as_secs_f64()
145 };
146
147 if let Some(delay) = check_rate_limiter_for_delay(runtime_components, cfg, *kind) {
148 let delay = delay.min(retry_cfg.max_backoff());
149 debug!("rate limiter has requested a {delay:?} delay before retrying");
150 Ok(delay)
151 } else {
152 let base = if retry_cfg.use_static_exponential_base() {
153 1.0
154 } else {
155 fastrand::f64()
156 };
157 let t_i = calculate_exponential_backoff(
158 base,
159 initial_backoff,
160 request_attempts - 1,
161 retry_cfg.max_backoff(),
162 );
163
164 if let Some(retry_after) = *retry_after {
165 if retry_cfg
166 .retry_spec()
167 .is_some_and(|s| s.is_at_least(RetrySpec::V2_1))
168 {
169 let delay = retry_after.clamp(t_i, t_i + Duration::from_secs(5));
170 debug!("x-amz-retry-after bounded to {delay:?} (t_i={t_i:?})");
171 Ok(delay)
172 } else {
173 let delay = retry_after.min(retry_cfg.max_backoff());
174 debug!(
175 "explicit request from server to delay {delay:?} before retrying"
176 );
177 Ok(delay)
178 }
179 } else {
180 Ok(t_i)
181 }
182 }
183 }
184 RetryAction::RetryForbidden | RetryAction::NoActionIndicated => {
185 debug!(
186 attempts = request_attempts,
187 max_attempts = retry_cfg.max_attempts(),
188 "encountered un-retryable error"
189 );
190 Err(ShouldAttempt::No)
191 }
192 _ => unreachable!("RetryAction is non-exhaustive"),
193 }
194 }
195}
196
197enum ReleaseResult {
198 APermitWasReleased,
199 NoPermitWasReleased,
200}
201
202impl RetryStrategy for StandardRetryStrategy {
203 fn should_attempt_initial_request(
204 &self,
205 runtime_components: &RuntimeComponents,
206 cfg: &ConfigBag,
207 ) -> Result<ShouldAttempt, BoxError> {
208 if let Some(crl) = Self::adaptive_retry_rate_limiter(runtime_components, cfg) {
209 let seconds_since_unix_epoch = get_seconds_since_unix_epoch(runtime_components);
210 if let Err(delay) = crl.acquire_permission_to_send_a_request(
211 seconds_since_unix_epoch,
212 RequestReason::InitialRequest,
213 ) {
214 return Ok(ShouldAttempt::YesAfterDelay(delay));
215 }
216 } else {
217 debug!("no client rate limiter configured, so no token is required for the initial request.");
218 }
219
220 Ok(ShouldAttempt::Yes)
221 }
222
223 fn should_attempt_retry(
224 &self,
225 ctx: &InterceptorContext,
226 runtime_components: &RuntimeComponents,
227 cfg: &ConfigBag,
228 ) -> Result<ShouldAttempt, BoxError> {
229 let retry_cfg = cfg.load::<RetryConfig>().expect("retry config is required");
230
231 let token_bucket = cfg.load::<TokenBucket>().expect("token bucket is required");
233 let retry_classifiers = runtime_components.retry_classifiers();
235 let classifier_result = run_classifiers_on_ctx(retry_classifiers, ctx);
236
237 let error_kind = error_kind(&classifier_result);
242 let is_throttling_error = error_kind
243 .map(|kind| kind == ErrorKind::ThrottlingError)
244 .unwrap_or(false);
245 update_rate_limiter_if_exists(runtime_components, cfg, is_throttling_error);
246
247 if !ctx.is_failed() {
249 self.release_retry_permit(token_bucket);
250 }
251 let request_attempts = cfg
254 .load::<RequestAttempts>()
255 .expect("at least one request attempt is made before any retry is attempted")
256 .attempts();
257
258 if !classifier_result.should_retry() {
260 debug!(
261 "attempt #{request_attempts} classified as {:?}, not retrying",
262 classifier_result
263 );
264 return Ok(ShouldAttempt::No);
265 }
266
267 if request_attempts >= retry_cfg.max_attempts() {
269 debug!(
270 attempts = request_attempts,
271 max_attempts = retry_cfg.max_attempts(),
272 "not retrying because we are out of attempts"
273 );
274 return Ok(ShouldAttempt::No);
275 }
276
277 let error_kind = error_kind.expect("result was classified retryable");
279 let is_long_polling = retry_cfg.retry_spec().is_some_and(|s| s.long_polling());
280
281 let backoff =
284 match self.calculate_backoff(runtime_components, cfg, retry_cfg, &classifier_result) {
285 Ok(value) => value,
286 Err(value) => return Ok(value),
287 };
288
289 match token_bucket.acquire(
291 &error_kind,
292 &runtime_components.time_source().unwrap_or_default(),
293 ) {
294 Some(permit) => self.set_retry_permit(permit),
295 None => {
296 debug!("attempt #{request_attempts} failed with {error_kind:?}; not enough retry quota.");
297 if is_long_polling {
298 if let Some(hint) = cfg.load::<LongPollingBackoff>() {
299 hint.set(backoff);
300 }
301 }
302 return Ok(ShouldAttempt::No);
303 }
304 }
305
306 debug!(
307 "attempt #{request_attempts} failed with {:?}; retrying after {:?}",
308 classifier_result, backoff
309 );
310 Ok(ShouldAttempt::YesAfterDelay(backoff))
311 }
312}
313
314fn error_kind(classifier_result: &RetryAction) -> Option<ErrorKind> {
316 match classifier_result {
317 RetryAction::RetryIndicated(RetryReason::RetryableError { kind, .. }) => Some(*kind),
318 _ => None,
319 }
320}
321
322fn update_rate_limiter_if_exists(
323 runtime_components: &RuntimeComponents,
324 cfg: &ConfigBag,
325 is_throttling_error: bool,
326) {
327 if let Some(crl) = StandardRetryStrategy::adaptive_retry_rate_limiter(runtime_components, cfg) {
328 let seconds_since_unix_epoch = get_seconds_since_unix_epoch(runtime_components);
329 crl.update_rate_limiter(seconds_since_unix_epoch, is_throttling_error);
330 }
331}
332
333fn check_rate_limiter_for_delay(
334 runtime_components: &RuntimeComponents,
335 cfg: &ConfigBag,
336 kind: ErrorKind,
337) -> Option<Duration> {
338 if let Some(crl) = StandardRetryStrategy::adaptive_retry_rate_limiter(runtime_components, cfg) {
339 let retry_reason = if kind == ErrorKind::ThrottlingError {
340 RequestReason::RetryTimeout
341 } else {
342 RequestReason::Retry
343 };
344 if let Err(delay) = crl.acquire_permission_to_send_a_request(
345 get_seconds_since_unix_epoch(runtime_components),
346 retry_reason,
347 ) {
348 return Some(delay);
349 }
350 }
351
352 None
353}
354
355pub(super) fn calculate_exponential_backoff(
356 base: f64,
357 initial_backoff: f64,
358 retry_attempts: u32,
359 max_backoff: Duration,
360) -> Duration {
361 let result = match 2_u32
362 .checked_pow(retry_attempts)
363 .map(|power| (power as f64) * initial_backoff)
364 {
365 Some(backoff) => match Duration::try_from_secs_f64(backoff) {
366 Ok(result) => result.min(max_backoff),
367 Err(e) => {
368 tracing::warn!("falling back to {max_backoff:?} as `Duration` could not be created for exponential backoff: {e}");
369 max_backoff
370 }
371 },
372 None => max_backoff,
373 };
374
375 result.mul_f64(base)
378}
379
380pub(super) fn get_seconds_since_unix_epoch(runtime_components: &RuntimeComponents) -> f64 {
381 let request_time = runtime_components
382 .time_source()
383 .expect("time source required for retries");
384 request_time
385 .now()
386 .duration_since(SystemTime::UNIX_EPOCH)
387 .unwrap()
388 .as_secs_f64()
389}
390
391#[derive(Debug)]
395pub(crate) struct TokenBucketProvider {
396 default_partition: RetryPartition,
397 token_bucket: OnceLock<TokenBucket>,
398}
399
400impl TokenBucketProvider {
401 pub(crate) fn new(default_partition: RetryPartition) -> Self {
406 Self {
407 default_partition,
408 token_bucket: OnceLock::new(),
409 }
410 }
411}
412
413fn token_bucket_for_spec(cfg: &ConfigBag) -> TokenBucket {
415 let is_v2_1 = cfg
416 .load::<RetryConfig>()
417 .and_then(|rc| rc.retry_spec())
418 .is_some_and(|s| s.is_at_least(RetrySpec::V2_1));
419 if is_v2_1 {
420 TokenBucket::builder()
421 .retry_cost(14)
422 .throttling_retry_cost(5)
423 .timeout_retry_cost(14)
424 .build()
425 } else {
426 TokenBucket::default()
427 }
428}
429
430#[dyn_dispatch_hint]
431impl Intercept for TokenBucketProvider {
432 fn name(&self) -> &'static str {
433 "TokenBucketProvider"
434 }
435
436 fn modify_before_retry_loop(
437 &self,
438 _context: &mut BeforeTransmitInterceptorContextMut<'_>,
439 _runtime_components: &RuntimeComponents,
440 cfg: &mut ConfigBag,
441 ) -> Result<(), BoxError> {
442 let retry_partition = cfg.load::<RetryPartition>().expect("set in default config");
443
444 let tb = match &retry_partition.inner {
445 RetryPartitionInner::Default(name) => {
446 if name == self.default_partition.name() {
447 self.token_bucket
448 .get_or_init(|| {
449 TOKEN_BUCKET.get_or_init(self.default_partition.clone(), || {
450 token_bucket_for_spec(cfg)
451 })
452 })
453 .clone()
454 } else {
455 TOKEN_BUCKET.get_or_init(retry_partition.clone(), || token_bucket_for_spec(cfg))
456 }
457 }
458 RetryPartitionInner::Custom { token_bucket, .. } => token_bucket.clone(),
459 };
460
461 trace!("token bucket for {retry_partition:?} added to config bag");
462 let mut layer = Layer::new("token_bucket_partition");
463 layer.store_put(tb);
464 cfg.push_layer(layer);
465 Ok(())
466 }
467}
468
469#[cfg(test)]
470mod tests {
471 #[allow(unused_imports)] use std::fmt;
473 use std::sync::Mutex;
474 use std::time::Duration;
475
476 use aws_smithy_async::time::SystemTimeSource;
477 use aws_smithy_runtime_api::client::interceptors::context::{
478 Input, InterceptorContext, Output,
479 };
480 use aws_smithy_runtime_api::client::orchestrator::OrchestratorError;
481 use aws_smithy_runtime_api::client::retries::classifiers::{
482 ClassifyRetry, RetryAction, SharedRetryClassifier,
483 };
484 use aws_smithy_runtime_api::client::retries::{
485 AlwaysRetry, RequestAttempts, RetryStrategy, ShouldAttempt,
486 };
487 use aws_smithy_runtime_api::client::runtime_components::{
488 RuntimeComponents, RuntimeComponentsBuilder,
489 };
490 use aws_smithy_types::config_bag::{ConfigBag, Layer};
491 use aws_smithy_types::retry::{ErrorKind, RetryConfig};
492
493 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
494 use aws_smithy_types::retry::RetrySpec;
495
496 use super::{calculate_exponential_backoff, StandardRetryStrategy};
497 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
498 use crate::client::retries::token_bucket::{
499 DEFAULT_CAPACITY, DEFAULT_RETRY_COST, DEFAULT_RETRY_TIMEOUT_COST, THROTTLING_RETRY_COST,
500 };
501 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
502 use crate::client::retries::LongPollingBackoff;
503 use crate::client::retries::{ClientRateLimiter, RetryPartition, TokenBucket};
504
505 #[test]
506 fn no_retry_necessary_for_ok_result() {
507 let cfg = ConfigBag::of_layers(vec![{
508 let mut layer = Layer::new("test");
509 layer.store_put(RetryConfig::standard());
510 layer.store_put(RequestAttempts::new(1));
511 layer.store_put(TokenBucket::default());
512 layer
513 }]);
514 let rc = RuntimeComponentsBuilder::for_tests().build().unwrap();
515 let mut ctx = InterceptorContext::new(Input::doesnt_matter());
516 let strategy = StandardRetryStrategy::default();
517 ctx.set_output_or_error(Ok(Output::doesnt_matter()));
518
519 let actual = strategy
520 .should_attempt_retry(&ctx, &rc, &cfg)
521 .expect("method is infallible for this use");
522 assert_eq!(ShouldAttempt::No, actual);
523 }
524
525 fn set_up_cfg_and_context(
526 error_kind: ErrorKind,
527 current_request_attempts: u32,
528 retry_config: RetryConfig,
529 ) -> (InterceptorContext, RuntimeComponents, ConfigBag) {
530 let mut ctx = InterceptorContext::new(Input::doesnt_matter());
531 ctx.set_output_or_error(Err(OrchestratorError::other("doesn't matter")));
532 let rc = RuntimeComponentsBuilder::for_tests()
533 .with_retry_classifier(SharedRetryClassifier::new(AlwaysRetry(error_kind)))
534 .build()
535 .unwrap();
536 let mut layer = Layer::new("test");
537 layer.store_put(RequestAttempts::new(current_request_attempts));
538 layer.store_put(retry_config);
539 layer.store_put(TokenBucket::default());
540 let cfg = ConfigBag::of_layers(vec![layer]);
541
542 (ctx, rc, cfg)
543 }
544
545 fn test_should_retry_error_kind(error_kind: ErrorKind) {
548 let (ctx, rc, cfg) = set_up_cfg_and_context(
549 error_kind,
550 3,
551 RetryConfig::standard()
552 .with_use_static_exponential_base(true)
553 .with_max_attempts(4),
554 );
555 let strategy = StandardRetryStrategy::new();
556 let actual = strategy
557 .should_attempt_retry(&ctx, &rc, &cfg)
558 .expect("method is infallible for this use");
559 assert_eq!(ShouldAttempt::YesAfterDelay(Duration::from_secs(4)), actual);
560 }
561
562 #[test]
563 fn should_retry_transient_error_result_after_2s() {
564 test_should_retry_error_kind(ErrorKind::TransientError);
565 }
566
567 #[test]
568 fn should_retry_client_error_result_after_2s() {
569 test_should_retry_error_kind(ErrorKind::ClientError);
570 }
571
572 #[test]
573 fn should_retry_server_error_result_after_2s() {
574 test_should_retry_error_kind(ErrorKind::ServerError);
575 }
576
577 #[test]
578 fn should_retry_throttling_error_result_after_2s() {
579 test_should_retry_error_kind(ErrorKind::ThrottlingError);
580 }
581
582 #[test]
583 fn dont_retry_when_out_of_attempts() {
584 let current_attempts = 4;
585 let max_attempts = current_attempts;
586 let (ctx, rc, cfg) = set_up_cfg_and_context(
587 ErrorKind::TransientError,
588 current_attempts,
589 RetryConfig::standard()
590 .with_use_static_exponential_base(true)
591 .with_max_attempts(max_attempts),
592 );
593 let strategy = StandardRetryStrategy::new();
594 let actual = strategy
595 .should_attempt_retry(&ctx, &rc, &cfg)
596 .expect("method is infallible for this use");
597 assert_eq!(ShouldAttempt::No, actual);
598 }
599
600 #[test]
601 fn should_not_panic_when_exponential_backoff_duration_could_not_be_created() {
602 let (ctx, rc, cfg) = set_up_cfg_and_context(
603 ErrorKind::TransientError,
604 33,
606 RetryConfig::standard()
607 .with_use_static_exponential_base(true)
608 .with_max_attempts(100), );
610 let strategy = StandardRetryStrategy::new();
611 let actual = strategy
612 .should_attempt_retry(&ctx, &rc, &cfg)
613 .expect("method is infallible for this use");
614 assert_eq!(ShouldAttempt::YesAfterDelay(MAX_BACKOFF), actual);
615 }
616
617 #[test]
618 fn should_yield_client_rate_limiter_from_custom_partition() {
619 let expected = ClientRateLimiter::builder().token_refill_rate(3.14).build();
620 let cfg = ConfigBag::of_layers(vec![
621 {
623 let mut layer = Layer::new("default");
624 layer.store_put(RetryPartition::new("default"));
625 layer
626 },
627 {
628 let mut layer = Layer::new("user");
629 layer.store_put(RetryConfig::adaptive());
630 layer.store_put(
631 RetryPartition::custom("user")
632 .client_rate_limiter(expected.clone())
633 .build(),
634 );
635 layer
636 },
637 ]);
638 let rc = RuntimeComponentsBuilder::for_tests()
639 .with_time_source(Some(SystemTimeSource::new()))
640 .build()
641 .unwrap();
642 let actual = StandardRetryStrategy::adaptive_retry_rate_limiter(&rc, &cfg)
643 .expect("should yield client rate limiter from custom partition");
644 assert!(std::sync::Arc::ptr_eq(&expected.inner, &actual.inner));
645 }
646
647 #[allow(dead_code)] #[derive(Debug)]
649 struct PresetReasonRetryClassifier {
650 retry_actions: Mutex<Vec<RetryAction>>,
651 }
652
653 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
654 impl PresetReasonRetryClassifier {
655 fn new(mut retry_reasons: Vec<RetryAction>) -> Self {
656 retry_reasons.reverse();
658 Self {
659 retry_actions: Mutex::new(retry_reasons),
660 }
661 }
662 }
663
664 impl ClassifyRetry for PresetReasonRetryClassifier {
665 fn classify_retry(&self, ctx: &InterceptorContext) -> RetryAction {
666 let output_or_error = ctx.output_or_error();
668 match output_or_error {
670 Some(Ok(_)) | None => return RetryAction::NoActionIndicated,
671 _ => (),
672 };
673
674 let mut retry_actions = self.retry_actions.lock().unwrap();
675 if retry_actions.len() == 1 {
676 retry_actions.first().unwrap().clone()
677 } else {
678 retry_actions.pop().unwrap()
679 }
680 }
681
682 fn name(&self) -> &'static str {
683 "Always returns a preset retry reason"
684 }
685 }
686
687 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
688 fn setup_test(
689 retry_reasons: Vec<RetryAction>,
690 retry_config: RetryConfig,
691 ) -> (ConfigBag, RuntimeComponents, InterceptorContext) {
692 let rc = RuntimeComponentsBuilder::for_tests()
693 .with_retry_classifier(SharedRetryClassifier::new(
694 PresetReasonRetryClassifier::new(retry_reasons),
695 ))
696 .build()
697 .unwrap();
698 let mut layer = Layer::new("test");
699 layer.store_put(retry_config);
700 let cfg = ConfigBag::of_layers(vec![layer]);
701 let mut ctx = InterceptorContext::new(Input::doesnt_matter());
702 ctx.set_output_or_error(Err(OrchestratorError::other("doesn't matter")));
704
705 (cfg, rc, ctx)
706 }
707
708 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
709 #[test]
710 fn eventual_success() {
711 let (mut cfg, rc, mut ctx) = setup_test(
712 vec![RetryAction::server_error()],
713 RetryConfig::standard()
714 .with_use_static_exponential_base(true)
715 .with_max_attempts(5),
716 );
717 let strategy = StandardRetryStrategy::new();
718 cfg.interceptor_state().store_put(TokenBucket::default());
719 let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
720
721 cfg.interceptor_state().store_put(RequestAttempts::new(1));
722 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
723 let dur = should_retry.expect_delay();
724 assert_eq!(dur, Duration::from_secs(1));
725 assert_eq!(token_bucket.available_permits(), 495);
726
727 cfg.interceptor_state().store_put(RequestAttempts::new(2));
728 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
729 let dur = should_retry.expect_delay();
730 assert_eq!(dur, Duration::from_secs(2));
731 assert_eq!(token_bucket.available_permits(), 490);
732
733 ctx.set_output_or_error(Ok(Output::doesnt_matter()));
734
735 cfg.interceptor_state().store_put(RequestAttempts::new(3));
736 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
737 assert_eq!(no_retry, ShouldAttempt::No);
738 assert_eq!(token_bucket.available_permits(), 495);
739 }
740
741 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
742 #[test]
743 fn no_more_attempts() {
744 let (mut cfg, rc, ctx) = setup_test(
745 vec![RetryAction::server_error()],
746 RetryConfig::standard()
747 .with_use_static_exponential_base(true)
748 .with_max_attempts(3),
749 );
750 let strategy = StandardRetryStrategy::new();
751 cfg.interceptor_state().store_put(TokenBucket::default());
752 let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
753
754 cfg.interceptor_state().store_put(RequestAttempts::new(1));
755 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
756 let dur = should_retry.expect_delay();
757 assert_eq!(dur, Duration::from_secs(1));
758 assert_eq!(token_bucket.available_permits(), 495);
759
760 cfg.interceptor_state().store_put(RequestAttempts::new(2));
761 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
762 let dur = should_retry.expect_delay();
763 assert_eq!(dur, Duration::from_secs(2));
764 assert_eq!(token_bucket.available_permits(), 490);
765
766 cfg.interceptor_state().store_put(RequestAttempts::new(3));
767 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
768 assert_eq!(no_retry, ShouldAttempt::No);
769 assert_eq!(token_bucket.available_permits(), 490);
770 }
771
772 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
773 #[test]
774 fn successful_request_and_deser_should_be_retryable() {
775 #[derive(Clone, Copy, Debug)]
776 enum LongRunningOperationStatus {
777 Running,
778 Complete,
779 }
780
781 #[derive(Debug)]
782 struct LongRunningOperationOutput {
783 status: Option<LongRunningOperationStatus>,
784 }
785
786 impl LongRunningOperationOutput {
787 fn status(&self) -> Option<LongRunningOperationStatus> {
788 self.status
789 }
790 }
791
792 struct WaiterRetryClassifier {}
793
794 impl WaiterRetryClassifier {
795 fn new() -> Self {
796 WaiterRetryClassifier {}
797 }
798 }
799
800 impl fmt::Debug for WaiterRetryClassifier {
801 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
802 write!(f, "WaiterRetryClassifier")
803 }
804 }
805 impl ClassifyRetry for WaiterRetryClassifier {
806 fn classify_retry(&self, ctx: &InterceptorContext) -> RetryAction {
807 let status: Option<LongRunningOperationStatus> =
808 ctx.output_or_error().and_then(|res| {
809 res.ok().and_then(|output| {
810 output
811 .downcast_ref::<LongRunningOperationOutput>()
812 .and_then(|output| output.status())
813 })
814 });
815
816 if let Some(LongRunningOperationStatus::Running) = status {
817 return RetryAction::server_error();
818 };
819
820 RetryAction::NoActionIndicated
821 }
822
823 fn name(&self) -> &'static str {
824 "waiter retry classifier"
825 }
826 }
827
828 let retry_config = RetryConfig::standard()
829 .with_use_static_exponential_base(true)
830 .with_max_attempts(5);
831
832 let rc = RuntimeComponentsBuilder::for_tests()
833 .with_retry_classifier(SharedRetryClassifier::new(WaiterRetryClassifier::new()))
834 .build()
835 .unwrap();
836 let mut layer = Layer::new("test");
837 layer.store_put(retry_config);
838 let mut cfg = ConfigBag::of_layers(vec![layer]);
839 let mut ctx = InterceptorContext::new(Input::doesnt_matter());
840 let strategy = StandardRetryStrategy::new();
841
842 ctx.set_output_or_error(Ok(Output::erase(LongRunningOperationOutput {
843 status: Some(LongRunningOperationStatus::Running),
844 })));
845
846 cfg.interceptor_state().store_put(TokenBucket::new(5));
847 let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
848
849 cfg.interceptor_state().store_put(RequestAttempts::new(1));
850 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
851 let dur = should_retry.expect_delay();
852 assert_eq!(dur, Duration::from_secs(1));
853 assert_eq!(token_bucket.available_permits(), 0);
854
855 ctx.set_output_or_error(Ok(Output::erase(LongRunningOperationOutput {
856 status: Some(LongRunningOperationStatus::Complete),
857 })));
858 cfg.interceptor_state().store_put(RequestAttempts::new(2));
859 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
860 should_retry.expect_no();
861 assert_eq!(token_bucket.available_permits(), 5);
862 }
863
864 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
865 #[test]
866 fn no_quota() {
867 let (mut cfg, rc, ctx) = setup_test(
868 vec![RetryAction::server_error()],
869 RetryConfig::standard()
870 .with_use_static_exponential_base(true)
871 .with_max_attempts(5),
872 );
873 let strategy = StandardRetryStrategy::new();
874 cfg.interceptor_state().store_put(TokenBucket::new(5));
875 let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
876
877 cfg.interceptor_state().store_put(RequestAttempts::new(1));
878 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
879 let dur = should_retry.expect_delay();
880 assert_eq!(dur, Duration::from_secs(1));
881 assert_eq!(token_bucket.available_permits(), 0);
882
883 cfg.interceptor_state().store_put(RequestAttempts::new(2));
884 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
885 assert_eq!(no_retry, ShouldAttempt::No);
886 assert_eq!(token_bucket.available_permits(), 0);
887 }
888
889 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
890 #[test]
891 fn quota_replenishes_on_success() {
892 let (mut cfg, rc, mut ctx) = setup_test(
893 vec![
894 RetryAction::transient_error(),
895 RetryAction::retryable_error_with_explicit_delay(
896 ErrorKind::TransientError,
897 Duration::from_secs(1),
898 ),
899 ],
900 RetryConfig::standard()
901 .with_use_static_exponential_base(true)
902 .with_max_attempts(5),
903 );
904 let strategy = StandardRetryStrategy::new();
905 cfg.interceptor_state().store_put(TokenBucket::new(100));
906 let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
907
908 cfg.interceptor_state().store_put(RequestAttempts::new(1));
909 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
910 let dur = should_retry.expect_delay();
911 assert_eq!(dur, Duration::from_secs(1));
912 assert_eq!(token_bucket.available_permits(), 90);
913
914 cfg.interceptor_state().store_put(RequestAttempts::new(2));
915 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
916 let dur = should_retry.expect_delay();
917 assert_eq!(dur, Duration::from_secs(1));
918 assert_eq!(token_bucket.available_permits(), 80);
919
920 ctx.set_output_or_error(Ok(Output::doesnt_matter()));
921
922 cfg.interceptor_state().store_put(RequestAttempts::new(3));
923 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
924 assert_eq!(no_retry, ShouldAttempt::No);
925
926 assert_eq!(token_bucket.available_permits(), 90);
927 }
928
929 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
930 #[test]
931 fn quota_replenishes_on_first_try_success() {
932 const PERMIT_COUNT: usize = 20;
933 let (mut cfg, rc, mut ctx) = setup_test(
934 vec![RetryAction::transient_error()],
935 RetryConfig::standard()
936 .with_use_static_exponential_base(true)
937 .with_max_attempts(u32::MAX),
938 );
939 let strategy = StandardRetryStrategy::new();
940 cfg.interceptor_state()
941 .store_put(TokenBucket::new(PERMIT_COUNT));
942 let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
943
944 let mut attempt = 1;
945
946 while token_bucket.available_permits() > 0 {
948 if attempt > 2 {
950 panic!("This test should have completed by now (drain)");
951 }
952
953 cfg.interceptor_state()
954 .store_put(RequestAttempts::new(attempt));
955 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
956 assert!(matches!(should_retry, ShouldAttempt::YesAfterDelay(_)));
957 attempt += 1;
958 }
959
960 let permit = strategy.retry_permit.lock().unwrap().take().unwrap();
962 permit.forget();
963
964 ctx.set_output_or_error(Ok(Output::doesnt_matter()));
965
966 while token_bucket.available_permits() < PERMIT_COUNT {
968 if attempt > 23 {
969 panic!("This test should have completed by now (fill-up)");
970 }
971
972 cfg.interceptor_state()
973 .store_put(RequestAttempts::new(attempt));
974 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
975 assert_eq!(no_retry, ShouldAttempt::No);
976 attempt += 1;
977 }
978
979 assert_eq!(attempt, 23);
980 assert_eq!(token_bucket.available_permits(), PERMIT_COUNT);
981 }
982
983 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
984 #[test]
985 fn backoff_timing() {
986 let (mut cfg, rc, ctx) = setup_test(
987 vec![RetryAction::server_error()],
988 RetryConfig::standard()
989 .with_use_static_exponential_base(true)
990 .with_max_attempts(5),
991 );
992 let strategy = StandardRetryStrategy::new();
993 cfg.interceptor_state().store_put(TokenBucket::default());
994 let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
995
996 cfg.interceptor_state().store_put(RequestAttempts::new(1));
997 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
998 let dur = should_retry.expect_delay();
999 assert_eq!(dur, Duration::from_secs(1));
1000 assert_eq!(token_bucket.available_permits(), 495);
1001
1002 cfg.interceptor_state().store_put(RequestAttempts::new(2));
1003 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1004 let dur = should_retry.expect_delay();
1005 assert_eq!(dur, Duration::from_secs(2));
1006 assert_eq!(token_bucket.available_permits(), 490);
1007
1008 cfg.interceptor_state().store_put(RequestAttempts::new(3));
1009 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1010 let dur = should_retry.expect_delay();
1011 assert_eq!(dur, Duration::from_secs(4));
1012 assert_eq!(token_bucket.available_permits(), 485);
1013
1014 cfg.interceptor_state().store_put(RequestAttempts::new(4));
1015 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1016 let dur = should_retry.expect_delay();
1017 assert_eq!(dur, Duration::from_secs(8));
1018 assert_eq!(token_bucket.available_permits(), 480);
1019
1020 cfg.interceptor_state().store_put(RequestAttempts::new(5));
1021 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1022 assert_eq!(no_retry, ShouldAttempt::No);
1023 assert_eq!(token_bucket.available_permits(), 480);
1024 }
1025
1026 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1027 #[test]
1028 fn max_backoff_time() {
1029 let (mut cfg, rc, ctx) = setup_test(
1030 vec![RetryAction::server_error()],
1031 RetryConfig::standard()
1032 .with_use_static_exponential_base(true)
1033 .with_max_attempts(5)
1034 .with_initial_backoff(Duration::from_secs(1))
1035 .with_max_backoff(Duration::from_secs(3)),
1036 );
1037 let strategy = StandardRetryStrategy::new();
1038 cfg.interceptor_state().store_put(TokenBucket::default());
1039 let token_bucket = cfg.load::<TokenBucket>().unwrap().clone();
1040
1041 cfg.interceptor_state().store_put(RequestAttempts::new(1));
1042 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1043 let dur = should_retry.expect_delay();
1044 assert_eq!(dur, Duration::from_secs(1));
1045 assert_eq!(token_bucket.available_permits(), 495);
1046
1047 cfg.interceptor_state().store_put(RequestAttempts::new(2));
1048 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1049 let dur = should_retry.expect_delay();
1050 assert_eq!(dur, Duration::from_secs(2));
1051 assert_eq!(token_bucket.available_permits(), 490);
1052
1053 cfg.interceptor_state().store_put(RequestAttempts::new(3));
1054 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1055 let dur = should_retry.expect_delay();
1056 assert_eq!(dur, Duration::from_secs(3));
1057 assert_eq!(token_bucket.available_permits(), 485);
1058
1059 cfg.interceptor_state().store_put(RequestAttempts::new(4));
1060 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1061 let dur = should_retry.expect_delay();
1062 assert_eq!(dur, Duration::from_secs(3));
1063 assert_eq!(token_bucket.available_permits(), 480);
1064
1065 cfg.interceptor_state().store_put(RequestAttempts::new(5));
1066 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1067 assert_eq!(no_retry, ShouldAttempt::No);
1068 assert_eq!(token_bucket.available_permits(), 480);
1069 }
1070
1071 const MAX_BACKOFF: Duration = Duration::from_secs(20);
1072
1073 #[test]
1074 fn calculate_exponential_backoff_where_initial_backoff_is_one() {
1075 let initial_backoff = 1.0;
1076
1077 for (attempt, expected_backoff) in [initial_backoff, 2.0, 4.0].into_iter().enumerate() {
1078 let actual_backoff =
1079 calculate_exponential_backoff(1.0, initial_backoff, attempt as u32, MAX_BACKOFF);
1080 assert_eq!(Duration::from_secs_f64(expected_backoff), actual_backoff);
1081 }
1082 }
1083
1084 #[test]
1085 fn calculate_exponential_backoff_where_initial_backoff_is_greater_than_one() {
1086 let initial_backoff = 3.0;
1087
1088 for (attempt, expected_backoff) in [initial_backoff, 6.0, 12.0].into_iter().enumerate() {
1089 let actual_backoff =
1090 calculate_exponential_backoff(1.0, initial_backoff, attempt as u32, MAX_BACKOFF);
1091 assert_eq!(Duration::from_secs_f64(expected_backoff), actual_backoff);
1092 }
1093 }
1094
1095 #[test]
1096 fn calculate_exponential_backoff_where_initial_backoff_is_less_than_one() {
1097 let initial_backoff = 0.03;
1098
1099 for (attempt, expected_backoff) in [initial_backoff, 0.06, 0.12].into_iter().enumerate() {
1100 let actual_backoff =
1101 calculate_exponential_backoff(1.0, initial_backoff, attempt as u32, MAX_BACKOFF);
1102 assert_eq!(Duration::from_secs_f64(expected_backoff), actual_backoff);
1103 }
1104 }
1105
1106 #[test]
1107 fn calculate_backoff_overflow_should_gracefully_fallback_to_max_backoff() {
1108 assert_eq!(
1110 MAX_BACKOFF,
1111 calculate_exponential_backoff(1_f64, 10_f64, 100000, MAX_BACKOFF),
1112 );
1113 }
1114
1115 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1116 #[test]
1117 fn v2_1_non_throttling_uses_50ms_backoff() {
1118 let (ctx, rc, cfg) = set_up_cfg_and_context(
1119 ErrorKind::ServerError,
1120 1,
1121 RetryConfig::standard()
1122 .with_use_static_exponential_base(true)
1123 .with_max_attempts(3)
1124 .with_retry_spec(RetrySpec::v2_1()),
1125 );
1126 let strategy = StandardRetryStrategy::new();
1127 let actual = strategy
1128 .should_attempt_retry(&ctx, &rc, &cfg)
1129 .expect("method is infallible for this use");
1130 assert_eq!(
1132 ShouldAttempt::YesAfterDelay(Duration::from_millis(50)),
1133 actual
1134 );
1135 }
1136
1137 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1138 #[test]
1139 fn v2_1_throttling_uses_1s_backoff() {
1140 let (ctx, rc, cfg) = set_up_cfg_and_context(
1141 ErrorKind::ThrottlingError,
1142 1,
1143 RetryConfig::standard()
1144 .with_use_static_exponential_base(true)
1145 .with_max_attempts(3)
1146 .with_retry_spec(RetrySpec::v2_1()),
1147 );
1148 let strategy = StandardRetryStrategy::new();
1149 let actual = strategy
1150 .should_attempt_retry(&ctx, &rc, &cfg)
1151 .expect("method is infallible for this use");
1152 assert_eq!(ShouldAttempt::YesAfterDelay(Duration::from_secs(1)), actual);
1154 }
1155
1156 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1157 #[test]
1158 fn v2_0_non_throttling_uses_1s_backoff() {
1159 let (ctx, rc, cfg) = set_up_cfg_and_context(
1160 ErrorKind::ServerError,
1161 1,
1162 RetryConfig::standard()
1163 .with_use_static_exponential_base(true)
1164 .with_max_attempts(3)
1165 .with_retry_spec(RetrySpec::v2_0()),
1166 );
1167 let strategy = StandardRetryStrategy::new();
1168 let actual = strategy
1169 .should_attempt_retry(&ctx, &rc, &cfg)
1170 .expect("method is infallible for this use");
1171 assert_eq!(ShouldAttempt::YesAfterDelay(Duration::from_secs(1)), actual);
1173 }
1174
1175 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1176 #[test]
1177 fn v2_1_retry_after_bounded_between_t_i_and_t_i_plus_5s() {
1178 let (mut cfg, rc, ctx) = setup_test(
1179 vec![RetryAction::retryable_error_with_explicit_delay(
1180 ErrorKind::ServerError,
1181 Duration::from_secs(3),
1182 )],
1183 RetryConfig::standard()
1184 .with_use_static_exponential_base(true)
1185 .with_max_attempts(3)
1186 .with_retry_spec(RetrySpec::v2_1()),
1187 );
1188 let strategy = StandardRetryStrategy::new();
1189 cfg.interceptor_state().store_put(TokenBucket::default());
1190 cfg.interceptor_state().store_put(RequestAttempts::new(1));
1191 let actual = strategy
1192 .should_attempt_retry(&ctx, &rc, &cfg)
1193 .expect("method is infallible for this use");
1194 assert_eq!(ShouldAttempt::YesAfterDelay(Duration::from_secs(3)), actual);
1196 }
1197
1198 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1199 #[test]
1200 fn v2_1_retry_after_below_t_i_uses_t_i() {
1201 let (mut cfg, rc, ctx) = setup_test(
1202 vec![RetryAction::retryable_error_with_explicit_delay(
1203 ErrorKind::ServerError,
1204 Duration::from_millis(10),
1205 )],
1206 RetryConfig::standard()
1207 .with_use_static_exponential_base(true)
1208 .with_max_attempts(3)
1209 .with_retry_spec(RetrySpec::v2_1()),
1210 );
1211 let strategy = StandardRetryStrategy::new();
1212 cfg.interceptor_state().store_put(TokenBucket::default());
1213 cfg.interceptor_state().store_put(RequestAttempts::new(1));
1214 let actual = strategy
1215 .should_attempt_retry(&ctx, &rc, &cfg)
1216 .expect("method is infallible for this use");
1217 assert_eq!(
1219 ShouldAttempt::YesAfterDelay(Duration::from_millis(50)),
1220 actual
1221 );
1222 }
1223
1224 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1225 #[test]
1226 fn v2_1_retry_after_above_t_i_plus_5s_capped() {
1227 let (mut cfg, rc, ctx) = setup_test(
1228 vec![RetryAction::retryable_error_with_explicit_delay(
1229 ErrorKind::ServerError,
1230 Duration::from_secs(10),
1231 )],
1232 RetryConfig::standard()
1233 .with_use_static_exponential_base(true)
1234 .with_max_attempts(3)
1235 .with_retry_spec(RetrySpec::v2_1()),
1236 );
1237 let strategy = StandardRetryStrategy::new();
1238 cfg.interceptor_state().store_put(TokenBucket::default());
1239 cfg.interceptor_state().store_put(RequestAttempts::new(1));
1240 let actual = strategy
1241 .should_attempt_retry(&ctx, &rc, &cfg)
1242 .expect("method is infallible for this use");
1243 assert_eq!(
1245 ShouldAttempt::YesAfterDelay(Duration::from_millis(5050)),
1246 actual
1247 );
1248 }
1249
1250 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1251 #[test]
1252 fn v2_0_retry_after_capped_at_max_backoff() {
1253 let (mut cfg, rc, ctx) = setup_test(
1254 vec![RetryAction::retryable_error_with_explicit_delay(
1255 ErrorKind::ServerError,
1256 Duration::from_secs(30),
1257 )],
1258 RetryConfig::standard()
1259 .with_use_static_exponential_base(true)
1260 .with_max_attempts(3)
1261 .with_retry_spec(RetrySpec::v2_0()),
1262 );
1263 let strategy = StandardRetryStrategy::new();
1264 cfg.interceptor_state().store_put(TokenBucket::default());
1265 cfg.interceptor_state().store_put(RequestAttempts::new(1));
1266 let actual = strategy
1267 .should_attempt_retry(&ctx, &rc, &cfg)
1268 .expect("method is infallible for this use");
1269 assert_eq!(
1271 ShouldAttempt::YesAfterDelay(Duration::from_secs(20)),
1272 actual
1273 );
1274 }
1275
1276 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1277 #[test]
1278 fn long_polling_backs_off_when_token_bucket_empty() {
1279 let (mut cfg, rc, ctx) = setup_test(
1280 vec![RetryAction::server_error()],
1281 RetryConfig::standard()
1282 .with_use_static_exponential_base(true)
1283 .with_max_attempts(5)
1284 .with_retry_spec(RetrySpec::v2_1().with_long_polling(true)),
1285 );
1286 let strategy = StandardRetryStrategy::new();
1287 cfg.interceptor_state().store_put(TokenBucket::new(0));
1288 cfg.interceptor_state().store_put(RequestAttempts::new(1));
1289 let hint = LongPollingBackoff::default();
1290 cfg.interceptor_state().store_put(hint.clone());
1291
1292 let result = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1293 assert_eq!(result, ShouldAttempt::No);
1294 assert_eq!(hint.take(), Some(Duration::from_millis(50)));
1295 }
1296
1297 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1298 #[test]
1299 fn non_long_polling_no_backoff_when_token_bucket_empty() {
1300 let (mut cfg, rc, ctx) = setup_test(
1301 vec![RetryAction::server_error()],
1302 RetryConfig::standard()
1303 .with_use_static_exponential_base(true)
1304 .with_max_attempts(5)
1305 .with_retry_spec(RetrySpec::v2_0()),
1306 );
1307 let strategy = StandardRetryStrategy::new();
1308 cfg.interceptor_state().store_put(TokenBucket::new(0));
1309 cfg.interceptor_state().store_put(RequestAttempts::new(1));
1310
1311 let result = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1312 assert_eq!(result, ShouldAttempt::No);
1313 }
1314
1315 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1316 fn v2_1_token_bucket_with_capacity(capacity: usize) -> TokenBucket {
1317 TokenBucket::builder()
1318 .capacity(capacity)
1319 .retry_cost(DEFAULT_RETRY_COST)
1320 .throttling_retry_cost(THROTTLING_RETRY_COST)
1321 .timeout_retry_cost(DEFAULT_RETRY_TIMEOUT_COST)
1322 .build()
1323 }
1324
1325 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1326 fn v2_1_token_bucket() -> TokenBucket {
1327 v2_1_token_bucket_with_capacity(DEFAULT_CAPACITY)
1328 }
1329
1330 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1331 #[test]
1332 fn retry_eventually_succeeds() {
1333 let (mut cfg, rc, mut ctx) = setup_test(
1334 vec![RetryAction::server_error()],
1335 RetryConfig::standard()
1336 .with_use_static_exponential_base(true)
1337 .with_retry_spec(RetrySpec::v2_1()),
1338 );
1339 let strategy = StandardRetryStrategy::new();
1340 let tb = v2_1_token_bucket();
1341 cfg.interceptor_state().store_put(tb.clone());
1342
1343 cfg.interceptor_state().store_put(RequestAttempts::new(1));
1344 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1345 assert_eq!(should_retry.expect_delay(), Duration::from_millis(50));
1346 assert_eq!(
1347 tb.available_permits(),
1348 DEFAULT_CAPACITY - DEFAULT_RETRY_COST as usize
1349 );
1350
1351 cfg.interceptor_state().store_put(RequestAttempts::new(2));
1352 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1353 assert_eq!(should_retry.expect_delay(), Duration::from_millis(100));
1354 assert_eq!(
1355 tb.available_permits(),
1356 DEFAULT_CAPACITY - 2 * DEFAULT_RETRY_COST as usize
1357 );
1358
1359 ctx.set_output_or_error(Ok(Output::doesnt_matter()));
1360 cfg.interceptor_state().store_put(RequestAttempts::new(3));
1361 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1362 assert_eq!(no_retry, ShouldAttempt::No);
1363 assert_eq!(
1364 tb.available_permits(),
1365 DEFAULT_CAPACITY - DEFAULT_RETRY_COST as usize
1366 );
1367 }
1368
1369 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1370 #[test]
1371 fn fail_due_to_max_attempts_reached() {
1372 let (mut cfg, rc, ctx) = setup_test(
1373 vec![RetryAction::server_error()],
1374 RetryConfig::standard()
1375 .with_use_static_exponential_base(true)
1376 .with_retry_spec(RetrySpec::v2_1()),
1377 );
1378 let strategy = StandardRetryStrategy::new();
1379 let tb = v2_1_token_bucket();
1380 cfg.interceptor_state().store_put(tb.clone());
1381
1382 cfg.interceptor_state().store_put(RequestAttempts::new(1));
1383 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1384 assert_eq!(should_retry.expect_delay(), Duration::from_millis(50));
1385 assert_eq!(
1386 tb.available_permits(),
1387 DEFAULT_CAPACITY - DEFAULT_RETRY_COST as usize
1388 );
1389
1390 cfg.interceptor_state().store_put(RequestAttempts::new(2));
1391 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1392 assert_eq!(should_retry.expect_delay(), Duration::from_millis(100));
1393 assert_eq!(
1394 tb.available_permits(),
1395 DEFAULT_CAPACITY - 2 * DEFAULT_RETRY_COST as usize
1396 );
1397
1398 cfg.interceptor_state().store_put(RequestAttempts::new(3));
1399 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1400 assert_eq!(no_retry, ShouldAttempt::No);
1401 assert_eq!(
1402 tb.available_permits(),
1403 DEFAULT_CAPACITY - 2 * DEFAULT_RETRY_COST as usize
1404 );
1405 }
1406
1407 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1408 #[test]
1409 fn retry_quota_reached_after_single_retry() {
1410 let (mut cfg, rc, ctx) = setup_test(
1411 vec![RetryAction::server_error()],
1412 RetryConfig::standard()
1413 .with_use_static_exponential_base(true)
1414 .with_max_attempts(5)
1415 .with_retry_spec(RetrySpec::v2_1()),
1416 );
1417 let strategy = StandardRetryStrategy::new();
1418 let tb = v2_1_token_bucket_with_capacity(DEFAULT_RETRY_COST as usize);
1419 cfg.interceptor_state().store_put(tb.clone());
1420
1421 cfg.interceptor_state().store_put(RequestAttempts::new(1));
1422 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1423 assert_eq!(should_retry.expect_delay(), Duration::from_millis(50));
1424 assert_eq!(tb.available_permits(), 0);
1425
1426 cfg.interceptor_state().store_put(RequestAttempts::new(2));
1427 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1428 assert_eq!(no_retry, ShouldAttempt::No);
1429 assert_eq!(tb.available_permits(), 0);
1430 }
1431
1432 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1433 #[test]
1434 fn no_retries_if_retry_quota_is_zero() {
1435 let (mut cfg, rc, ctx) = setup_test(
1436 vec![RetryAction::server_error()],
1437 RetryConfig::standard()
1438 .with_use_static_exponential_base(true)
1439 .with_retry_spec(RetrySpec::v2_1()),
1440 );
1441 let strategy = StandardRetryStrategy::new();
1442 let tb = v2_1_token_bucket_with_capacity(0);
1443 cfg.interceptor_state().store_put(tb.clone());
1444
1445 cfg.interceptor_state().store_put(RequestAttempts::new(1));
1446 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1447 assert_eq!(no_retry, ShouldAttempt::No);
1448 assert_eq!(tb.available_permits(), 0);
1449 }
1450
1451 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1452 #[test]
1453 fn retry_stops_after_retry_quota_exhaustion() {
1454 let (mut cfg, rc, ctx) = setup_test(
1455 vec![RetryAction::server_error()],
1456 RetryConfig::standard()
1457 .with_use_static_exponential_base(true)
1458 .with_max_attempts(5)
1459 .with_retry_spec(RetrySpec::v2_1()),
1460 );
1461 let strategy = StandardRetryStrategy::new();
1462 let tb = v2_1_token_bucket_with_capacity(20);
1463 cfg.interceptor_state().store_put(tb.clone());
1464
1465 cfg.interceptor_state().store_put(RequestAttempts::new(1));
1466 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1467 assert_eq!(should_retry.expect_delay(), Duration::from_millis(50));
1468 assert_eq!(tb.available_permits(), 20 - DEFAULT_RETRY_COST as usize);
1469
1470 cfg.interceptor_state().store_put(RequestAttempts::new(2));
1471 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1472 assert_eq!(no_retry, ShouldAttempt::No);
1473 assert_eq!(tb.available_permits(), 20 - DEFAULT_RETRY_COST as usize);
1474 }
1475
1476 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1477 #[test]
1478 fn retry_quota_recovery_after_successful_responses() {
1479 let (mut cfg, rc, mut ctx) = setup_test(
1480 vec![RetryAction::server_error()],
1481 RetryConfig::standard()
1482 .with_use_static_exponential_base(true)
1483 .with_max_attempts(5)
1484 .with_retry_spec(RetrySpec::v2_1()),
1485 );
1486 let strategy = StandardRetryStrategy::new();
1487 let tb = v2_1_token_bucket_with_capacity(30);
1488 cfg.interceptor_state().store_put(tb.clone());
1489
1490 cfg.interceptor_state().store_put(RequestAttempts::new(1));
1491 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1492 assert_eq!(should_retry.expect_delay(), Duration::from_millis(50));
1493 assert_eq!(tb.available_permits(), 30 - DEFAULT_RETRY_COST as usize);
1494
1495 cfg.interceptor_state().store_put(RequestAttempts::new(2));
1496 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1497 assert_eq!(should_retry.expect_delay(), Duration::from_millis(100));
1498 assert_eq!(tb.available_permits(), 30 - 2 * DEFAULT_RETRY_COST as usize);
1499
1500 ctx.set_output_or_error(Ok(Output::doesnt_matter()));
1501 cfg.interceptor_state().store_put(RequestAttempts::new(3));
1502 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1503 assert_eq!(no_retry, ShouldAttempt::No);
1504 assert_eq!(tb.available_permits(), 30 - DEFAULT_RETRY_COST as usize);
1505
1506 ctx.set_output_or_error(Err(OrchestratorError::other("doesn't matter")));
1507 cfg.interceptor_state().store_put(RequestAttempts::new(1));
1508 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1509 assert_eq!(should_retry.expect_delay(), Duration::from_millis(50));
1510 assert_eq!(tb.available_permits(), 30 - 2 * DEFAULT_RETRY_COST as usize);
1511
1512 ctx.set_output_or_error(Ok(Output::doesnt_matter()));
1513 cfg.interceptor_state().store_put(RequestAttempts::new(2));
1514 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1515 assert_eq!(no_retry, ShouldAttempt::No);
1516 assert_eq!(tb.available_permits(), 30 - DEFAULT_RETRY_COST as usize);
1517 }
1518
1519 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1520 #[test]
1521 fn throttling_error_token_bucket_drain_and_backoff() {
1522 let (mut cfg, rc, mut ctx) = setup_test(
1523 vec![RetryAction::retryable_error(ErrorKind::ThrottlingError)],
1524 RetryConfig::standard()
1525 .with_use_static_exponential_base(true)
1526 .with_retry_spec(RetrySpec::v2_1()),
1527 );
1528 let strategy = StandardRetryStrategy::new();
1529 let tb = v2_1_token_bucket();
1530 cfg.interceptor_state().store_put(tb.clone());
1531
1532 cfg.interceptor_state().store_put(RequestAttempts::new(1));
1533 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1534 assert_eq!(should_retry.expect_delay(), Duration::from_secs(1));
1535 assert_eq!(
1536 tb.available_permits(),
1537 DEFAULT_CAPACITY - THROTTLING_RETRY_COST as usize
1538 );
1539
1540 ctx.set_output_or_error(Ok(Output::doesnt_matter()));
1541 cfg.interceptor_state().store_put(RequestAttempts::new(2));
1542 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1543 assert_eq!(no_retry, ShouldAttempt::No);
1544 assert_eq!(tb.available_permits(), DEFAULT_CAPACITY);
1545 }
1546
1547 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1548 #[test]
1549 fn long_polling_backoff_after_throttling_error_when_token_bucket_empty() {
1550 let (mut cfg, rc, ctx) = setup_test(
1551 vec![RetryAction::retryable_error(ErrorKind::ThrottlingError)],
1552 RetryConfig::standard()
1553 .with_use_static_exponential_base(true)
1554 .with_max_attempts(5)
1555 .with_retry_spec(RetrySpec::v2_1().with_long_polling(true)),
1556 );
1557 let strategy = StandardRetryStrategy::new();
1558 let tb = v2_1_token_bucket_with_capacity(0);
1559 cfg.interceptor_state().store_put(tb.clone());
1560 cfg.interceptor_state().store_put(RequestAttempts::new(1));
1561 let hint = LongPollingBackoff::default();
1562 cfg.interceptor_state().store_put(hint.clone());
1563
1564 let result = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1565 assert_eq!(result, ShouldAttempt::No);
1566 assert_eq!(hint.take(), Some(Duration::from_secs(1)));
1567 assert_eq!(tb.available_permits(), 0);
1568 }
1569
1570 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1571 #[test]
1572 fn long_polling_max_attempts_exceeded_must_not_delay() {
1573 let (mut cfg, rc, ctx) = setup_test(
1574 vec![RetryAction::server_error()],
1575 RetryConfig::standard()
1576 .with_use_static_exponential_base(true)
1577 .with_max_attempts(2)
1578 .with_retry_spec(RetrySpec::v2_1().with_long_polling(true)),
1579 );
1580 let strategy = StandardRetryStrategy::new();
1581 let tb = v2_1_token_bucket();
1582 cfg.interceptor_state().store_put(tb.clone());
1583
1584 cfg.interceptor_state().store_put(RequestAttempts::new(1));
1585 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1586 assert_eq!(should_retry.expect_delay(), Duration::from_millis(50));
1587
1588 cfg.interceptor_state().store_put(RequestAttempts::new(2));
1589 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1590 assert_eq!(no_retry, ShouldAttempt::No);
1591 }
1592
1593 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1594 #[test]
1595 fn long_polling_success_must_not_delay() {
1596 let (mut cfg, rc, mut ctx) = setup_test(
1597 vec![RetryAction::server_error()],
1598 RetryConfig::standard()
1599 .with_use_static_exponential_base(true)
1600 .with_max_attempts(2)
1601 .with_retry_spec(RetrySpec::v2_1().with_long_polling(true)),
1602 );
1603 let strategy = StandardRetryStrategy::new();
1604 let tb = v2_1_token_bucket();
1605 cfg.interceptor_state().store_put(tb.clone());
1606
1607 cfg.interceptor_state().store_put(RequestAttempts::new(1));
1608 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1609 assert_eq!(should_retry.expect_delay(), Duration::from_millis(50));
1610
1611 ctx.set_output_or_error(Ok(Output::doesnt_matter()));
1612 cfg.interceptor_state().store_put(RequestAttempts::new(2));
1613 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1614 assert_eq!(no_retry, ShouldAttempt::No);
1615 }
1616
1617 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1618 #[test]
1619 fn long_polling_non_retryable_errors_must_not_delay() {
1620 let (mut cfg, rc, ctx) = setup_test(
1621 vec![RetryAction::NoActionIndicated],
1622 RetryConfig::standard()
1623 .with_use_static_exponential_base(true)
1624 .with_max_attempts(2)
1625 .with_retry_spec(RetrySpec::v2_1().with_long_polling(true)),
1626 );
1627 let strategy = StandardRetryStrategy::new();
1628 let tb = v2_1_token_bucket();
1629 cfg.interceptor_state().store_put(tb.clone());
1630 cfg.interceptor_state().store_put(RequestAttempts::new(1));
1631
1632 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1633 assert_eq!(no_retry, ShouldAttempt::No);
1634 }
1635
1636 #[cfg(any(feature = "test-util", feature = "legacy-test-util"))]
1637 #[test]
1638 fn verify_max_backoff_time() {
1639 let (mut cfg, rc, ctx) = setup_test(
1640 vec![RetryAction::server_error()],
1641 RetryConfig::standard()
1642 .with_use_static_exponential_base(true)
1643 .with_max_attempts(5)
1644 .with_max_backoff(Duration::from_millis(200))
1645 .with_retry_spec(RetrySpec::v2_1()),
1646 );
1647 let strategy = StandardRetryStrategy::new();
1648 let tb = v2_1_token_bucket();
1649 cfg.interceptor_state().store_put(tb.clone());
1650
1651 cfg.interceptor_state().store_put(RequestAttempts::new(1));
1652 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1653 assert_eq!(should_retry.expect_delay(), Duration::from_millis(50));
1654 assert_eq!(
1655 tb.available_permits(),
1656 DEFAULT_CAPACITY - DEFAULT_RETRY_COST as usize
1657 );
1658
1659 cfg.interceptor_state().store_put(RequestAttempts::new(2));
1660 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1661 assert_eq!(should_retry.expect_delay(), Duration::from_millis(100));
1662 assert_eq!(
1663 tb.available_permits(),
1664 DEFAULT_CAPACITY - 2 * DEFAULT_RETRY_COST as usize
1665 );
1666
1667 cfg.interceptor_state().store_put(RequestAttempts::new(3));
1668 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1669 assert_eq!(should_retry.expect_delay(), Duration::from_millis(200));
1670 assert_eq!(
1671 tb.available_permits(),
1672 DEFAULT_CAPACITY - 3 * DEFAULT_RETRY_COST as usize
1673 );
1674
1675 cfg.interceptor_state().store_put(RequestAttempts::new(4));
1677 let should_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1678 assert_eq!(should_retry.expect_delay(), Duration::from_millis(200));
1679 assert_eq!(
1680 tb.available_permits(),
1681 DEFAULT_CAPACITY - 4 * DEFAULT_RETRY_COST as usize
1682 );
1683
1684 cfg.interceptor_state().store_put(RequestAttempts::new(5));
1685 let no_retry = strategy.should_attempt_retry(&ctx, &rc, &cfg).unwrap();
1686 assert_eq!(no_retry, ShouldAttempt::No);
1687 assert_eq!(
1688 tb.available_permits(),
1689 DEFAULT_CAPACITY - 4 * DEFAULT_RETRY_COST as usize
1690 );
1691 }
1692}