kube_client/client/
retry.rs1use std::time::Duration;
34
35use http::{Request, Response, StatusCode};
36use tower::{
37 BoxError,
38 retry::{
39 Policy,
40 backoff::{Backoff, ExponentialBackoff, ExponentialBackoffMaker, MakeBackoff},
41 },
42 util::rng::HasherRng,
43};
44
45use super::Body;
46
47pub use tower::retry::backoff::InvalidBackoff;
49
50#[derive(Clone)]
60pub struct RetryPolicy {
61 backoff: ExponentialBackoff,
62 current_attempt: u32,
63 max_retries: u32,
64}
65
66impl RetryPolicy {
67 pub fn new(min_delay: Duration, max_delay: Duration, max_retries: u32) -> Result<Self, InvalidBackoff> {
79 let backoff =
80 ExponentialBackoffMaker::new(min_delay, max_delay, 2.0, HasherRng::new())?.make_backoff();
81
82 Ok(Self {
83 backoff,
84 current_attempt: 0,
85 max_retries,
86 })
87 }
88
89 fn is_retryable_status(status: StatusCode) -> bool {
91 matches!(
92 status,
93 StatusCode::TOO_MANY_REQUESTS | StatusCode::SERVICE_UNAVAILABLE | StatusCode::GATEWAY_TIMEOUT
94 )
95 }
96}
97
98impl Default for RetryPolicy {
99 fn default() -> Self {
106 Self::new(Duration::from_millis(500), Duration::from_secs(5), 3)
107 .expect("default RetryPolicy parameters are valid")
108 }
109}
110
111impl<Res> Policy<Request<Body>, Response<Res>, BoxError> for RetryPolicy {
112 type Future = tokio::time::Sleep;
113
114 fn retry(
115 &mut self,
116 _req: &mut Request<Body>,
117 result: &mut Result<Response<Res>, BoxError>,
118 ) -> Option<Self::Future> {
119 match result {
120 Ok(response)
121 if Self::is_retryable_status(response.status())
122 && self.current_attempt < self.max_retries =>
123 {
124 self.current_attempt += 1;
125 Some(self.backoff.next_backoff())
126 }
127 _ => None,
128 }
129 }
130
131 fn clone_request(&mut self, req: &Request<Body>) -> Option<Request<Body>> {
132 let body = req.body().try_clone()?;
134
135 let mut builder = Request::builder()
136 .method(req.method().clone())
137 .uri(req.uri().clone())
138 .version(req.version());
139
140 if let Some(headers) = builder.headers_mut() {
142 headers.extend(req.headers().clone());
143 }
144
145 builder.body(body).ok().map(|mut new_req| {
147 *new_req.extensions_mut() = req.extensions().clone();
148 new_req
149 })
150 }
151}
152
153#[cfg(test)]
154mod tests {
155 use super::*;
156
157 #[test]
158 fn test_default_policy() {
159 let policy = RetryPolicy::default();
160 assert_eq!(policy.max_retries, 3);
161 }
162
163 #[test]
164 fn test_retryable_status() {
165 assert!(RetryPolicy::is_retryable_status(StatusCode::TOO_MANY_REQUESTS));
166 assert!(RetryPolicy::is_retryable_status(StatusCode::SERVICE_UNAVAILABLE));
167 assert!(RetryPolicy::is_retryable_status(StatusCode::GATEWAY_TIMEOUT));
168
169 assert!(!RetryPolicy::is_retryable_status(StatusCode::OK));
170 assert!(!RetryPolicy::is_retryable_status(StatusCode::BAD_REQUEST));
171 assert!(!RetryPolicy::is_retryable_status(
172 StatusCode::INTERNAL_SERVER_ERROR
173 ));
174 assert!(!RetryPolicy::is_retryable_status(StatusCode::NOT_FOUND));
175 }
176
177 #[test]
178 fn test_invalid_backoff() {
179 let result = RetryPolicy::new(Duration::from_secs(10), Duration::from_secs(1), 3);
180 assert!(result.is_err());
181 }
182}