Skip to main content

kube_client/client/
retry.rs

1//! Retry policy for Kubernetes API requests.
2//!
3//! This module provides a [`RetryPolicy`] that implements [`tower::retry::Policy`]
4//! for retrying failed Kubernetes API requests with exponential backoff.
5//!
6//! # Example
7//!
8//! ```no_run
9//! use kube::{Client, Config, client::ConfigExt};
10//! use kube::client::retry::RetryPolicy;
11//! use tower::{ServiceBuilder, BoxError};
12//! use tower::retry::RetryLayer;
13//! use tower::buffer::BufferLayer;
14//! use hyper_util::rt::TokioExecutor;
15//!
16//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
17//! let config = Config::infer().await?;
18//! let https = config.rustls_https_connector()?;
19//!
20//! let service = ServiceBuilder::new()
21//!     .layer(config.base_uri_layer())
22//!     .option_layer(config.auth_layer()?)
23//!     .layer(BufferLayer::new(1024))
24//!     .layer(RetryLayer::new(RetryPolicy::default()))
25//!     .map_err(BoxError::from)
26//!     .service(hyper_util::client::legacy::Client::builder(TokioExecutor::new()).build(https));
27//!
28//! let client = Client::new(service, config.default_namespace);
29//! # Ok(())
30//! # }
31//! ```
32
33use std::time::Duration;
34
35use http::{Request, Response, StatusCode};
36use tower::{
37    BoxError,
38    retry::{
39        Policy,
40        backoff::{Backoff, ExponentialBackoff, ExponentialBackoffMaker, MakeBackoff},
41    },
42    util::rng::HasherRng,
43};
44
45use super::Body;
46
47/// Backoff configuration validation error.
48pub use tower::retry::backoff::InvalidBackoff;
49
50/// A retry policy for Kubernetes API requests.
51///
52/// This policy retries requests that fail with:
53/// - 429 Too Many Requests
54/// - 503 Service Unavailable
55/// - 504 Gateway Timeout
56///
57/// Uses exponential backoff starting from `min_delay` up to `max_delay`,
58/// with a configurable maximum number of retries.
59#[derive(Clone)]
60pub struct RetryPolicy {
61    backoff: ExponentialBackoff,
62    current_attempt: u32,
63    max_retries: u32,
64}
65
66impl RetryPolicy {
67    /// Create a new retry policy with custom parameters.
68    ///
69    /// # Arguments
70    ///
71    /// * `min_delay` - Initial delay between retries
72    /// * `max_delay` - Maximum delay between retries (cap for exponential growth)
73    /// * `max_retries` - Maximum number of retry attempts
74    ///
75    /// # Errors
76    ///
77    /// Returns [`InvalidBackoff`] if the backoff parameters are invalid.
78    pub fn new(min_delay: Duration, max_delay: Duration, max_retries: u32) -> Result<Self, InvalidBackoff> {
79        let backoff =
80            ExponentialBackoffMaker::new(min_delay, max_delay, 2.0, HasherRng::new())?.make_backoff();
81
82        Ok(Self {
83            backoff,
84            current_attempt: 0,
85            max_retries,
86        })
87    }
88
89    /// Check if the status code is retryable.
90    fn is_retryable_status(status: StatusCode) -> bool {
91        matches!(
92            status,
93            StatusCode::TOO_MANY_REQUESTS | StatusCode::SERVICE_UNAVAILABLE | StatusCode::GATEWAY_TIMEOUT
94        )
95    }
96}
97
98impl Default for RetryPolicy {
99    /// Create a default retry policy.
100    ///
101    /// Default parameters:
102    /// - `min_delay`: 500ms
103    /// - `max_delay`: 5s
104    /// - `max_retries`: 3
105    fn default() -> Self {
106        Self::new(Duration::from_millis(500), Duration::from_secs(5), 3)
107            .expect("default RetryPolicy parameters are valid")
108    }
109}
110
111impl<Res> Policy<Request<Body>, Response<Res>, BoxError> for RetryPolicy {
112    type Future = tokio::time::Sleep;
113
114    fn retry(
115        &mut self,
116        _req: &mut Request<Body>,
117        result: &mut Result<Response<Res>, BoxError>,
118    ) -> Option<Self::Future> {
119        match result {
120            Ok(response)
121                if Self::is_retryable_status(response.status())
122                    && self.current_attempt < self.max_retries =>
123            {
124                self.current_attempt += 1;
125                Some(self.backoff.next_backoff())
126            }
127            _ => None,
128        }
129    }
130
131    fn clone_request(&mut self, req: &Request<Body>) -> Option<Request<Body>> {
132        // Try to clone the body - only Kind::Once bodies can be cloned
133        let body = req.body().try_clone()?;
134
135        let mut builder = Request::builder()
136            .method(req.method().clone())
137            .uri(req.uri().clone())
138            .version(req.version());
139
140        // Copy headers
141        if let Some(headers) = builder.headers_mut() {
142            headers.extend(req.headers().clone());
143        }
144
145        // Copy extensions
146        builder.body(body).ok().map(|mut new_req| {
147            *new_req.extensions_mut() = req.extensions().clone();
148            new_req
149        })
150    }
151}
152
153#[cfg(test)]
154mod tests {
155    use super::*;
156
157    #[test]
158    fn test_default_policy() {
159        let policy = RetryPolicy::default();
160        assert_eq!(policy.max_retries, 3);
161    }
162
163    #[test]
164    fn test_retryable_status() {
165        assert!(RetryPolicy::is_retryable_status(StatusCode::TOO_MANY_REQUESTS));
166        assert!(RetryPolicy::is_retryable_status(StatusCode::SERVICE_UNAVAILABLE));
167        assert!(RetryPolicy::is_retryable_status(StatusCode::GATEWAY_TIMEOUT));
168
169        assert!(!RetryPolicy::is_retryable_status(StatusCode::OK));
170        assert!(!RetryPolicy::is_retryable_status(StatusCode::BAD_REQUEST));
171        assert!(!RetryPolicy::is_retryable_status(
172            StatusCode::INTERNAL_SERVER_ERROR
173        ));
174        assert!(!RetryPolicy::is_retryable_status(StatusCode::NOT_FOUND));
175    }
176
177    #[test]
178    fn test_invalid_backoff() {
179        let result = RetryPolicy::new(Duration::from_secs(10), Duration::from_secs(1), 3);
180        assert!(result.is_err());
181    }
182}