tower/ready_cache/
cache.rs

1//! A cache of services.
2
3use super::error;
4use futures_core::Stream;
5use futures_util::{stream::FuturesUnordered, task::AtomicWaker};
6pub use indexmap::Equivalent;
7use indexmap::IndexMap;
8use std::fmt;
9use std::future::Future;
10use std::hash::Hash;
11use std::pin::Pin;
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::sync::Arc;
14use std::task::{Context, Poll};
15use tower_service::Service;
16use tracing::{debug, trace};
17
18/// Drives readiness over a set of services.
19///
20/// The cache maintains two internal data structures:
21///
22/// * a set of _pending_ services that have not yet become ready; and
23/// * a set of _ready_ services that have previously polled ready.
24///
25/// As each `S` typed [`Service`] is added to the cache via [`ReadyCache::push`], it
26/// is added to the _pending set_. As [`ReadyCache::poll_pending`] is invoked,
27/// pending services are polled and added to the _ready set_.
28///
29/// [`ReadyCache::call_ready`] (or [`ReadyCache::call_ready_index`]) dispatches a
30/// request to the specified service, but panics if the specified service is not
31/// in the ready set. The `ReadyCache::check_*` functions can be used to ensure
32/// that a service is ready before dispatching a request.
33///
34/// The ready set can hold services for an abitrarily long time. During this
35/// time, the runtime may process events that invalidate that ready state (for
36/// instance, if a keepalive detects a lost connection). In such cases, callers
37/// should use [`ReadyCache::check_ready`] (or [`ReadyCache::check_ready_index`])
38/// immediately before dispatching a request to ensure that the service has not
39/// become unavailable.
40///
41/// Once `ReadyCache::call_ready*` is invoked, the service is placed back into
42/// the _pending_ set to be driven to readiness again.
43///
44/// When `ReadyCache::check_ready*` returns `false`, it indicates that the
45/// specified service is _not_ ready. If an error is returned, this indicats that
46/// the server failed and has been removed from the cache entirely.
47///
48/// [`ReadyCache::evict`] can be used to remove a service from the cache (by key),
49/// though the service may not be dropped (if it is currently pending) until
50/// [`ReadyCache::poll_pending`] is invoked.
51///
52/// Note that the by-index accessors are provided to support use cases (like
53/// power-of-two-choices load balancing) where the caller does not care to keep
54/// track of each service's key. Instead, it needs only to access _some_ ready
55/// service. In such a case, it should be noted that calls to
56/// [`ReadyCache::poll_pending`] and [`ReadyCache::evict`] may perturb the order of
57/// the ready set, so any cached indexes should be discarded after such a call.
58pub struct ReadyCache<K, S, Req>
59where
60    K: Eq + Hash,
61{
62    /// A stream of services that are not yet ready.
63    pending: FuturesUnordered<Pending<K, S, Req>>,
64    /// An index of cancelation handles for pending streams.
65    pending_cancel_txs: IndexMap<K, CancelTx>,
66
67    /// Services that have previously become ready. Readiness can become stale,
68    /// so a given service should be polled immediately before use.
69    ///
70    /// The cancelation oneshot is preserved (though unused) while the service is
71    /// ready so that it need not be reallocated each time a request is
72    /// dispatched.
73    ready: IndexMap<K, (S, CancelPair)>,
74}
75
76// Safety: This is safe because we do not use `Pin::new_unchecked`.
77impl<S, K: Eq + Hash, Req> Unpin for ReadyCache<K, S, Req> {}
78
79#[derive(Debug)]
80struct Cancel {
81    waker: AtomicWaker,
82    canceled: AtomicBool,
83}
84
85#[derive(Debug)]
86struct CancelRx(Arc<Cancel>);
87
88#[derive(Debug)]
89struct CancelTx(Arc<Cancel>);
90
91type CancelPair = (CancelTx, CancelRx);
92
93#[derive(Debug)]
94enum PendingError<K, E> {
95    Canceled(K),
96    Inner(K, E),
97}
98
99pin_project_lite::pin_project! {
100    /// A [`Future`] that becomes satisfied when an `S`-typed service is ready.
101    ///
102    /// May fail due to cancelation, i.e. if the service is evicted from the balancer.
103    struct Pending<K, S, Req> {
104        key: Option<K>,
105        cancel: Option<CancelRx>,
106        ready: Option<S>,
107        _pd: std::marker::PhantomData<Req>,
108    }
109}
110
111// === ReadyCache ===
112
113impl<K, S, Req> Default for ReadyCache<K, S, Req>
114where
115    K: Eq + Hash,
116    S: Service<Req>,
117{
118    fn default() -> Self {
119        Self {
120            ready: IndexMap::default(),
121            pending: FuturesUnordered::new(),
122            pending_cancel_txs: IndexMap::default(),
123        }
124    }
125}
126
127impl<K, S, Req> fmt::Debug for ReadyCache<K, S, Req>
128where
129    K: fmt::Debug + Eq + Hash,
130    S: fmt::Debug,
131{
132    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
133        let Self {
134            pending,
135            pending_cancel_txs,
136            ready,
137        } = self;
138        f.debug_struct("ReadyCache")
139            .field("pending", pending)
140            .field("pending_cancel_txs", pending_cancel_txs)
141            .field("ready", ready)
142            .finish()
143    }
144}
145
146impl<K, S, Req> ReadyCache<K, S, Req>
147where
148    K: Eq + Hash,
149{
150    /// Returns the total number of services in the cache.
151    pub fn len(&self) -> usize {
152        self.ready_len() + self.pending_len()
153    }
154
155    /// Returns whether or not there are any services in the cache.
156    pub fn is_empty(&self) -> bool {
157        self.ready.is_empty() && self.pending.is_empty()
158    }
159
160    /// Returns the number of services in the ready set.
161    pub fn ready_len(&self) -> usize {
162        self.ready.len()
163    }
164
165    /// Returns the number of services in the unready set.
166    pub fn pending_len(&self) -> usize {
167        self.pending.len()
168    }
169
170    /// Returns true iff the given key is in the unready set.
171    pub fn pending_contains<Q: Hash + Equivalent<K>>(&self, key: &Q) -> bool {
172        self.pending_cancel_txs.contains_key(key)
173    }
174
175    /// Obtains a reference to a service in the ready set by key.
176    pub fn get_ready<Q: Hash + Equivalent<K>>(&self, key: &Q) -> Option<(usize, &K, &S)> {
177        self.ready.get_full(key).map(|(i, k, v)| (i, k, &v.0))
178    }
179
180    /// Obtains a mutable reference to a service in the ready set by key.
181    pub fn get_ready_mut<Q: Hash + Equivalent<K>>(
182        &mut self,
183        key: &Q,
184    ) -> Option<(usize, &K, &mut S)> {
185        self.ready
186            .get_full_mut(key)
187            .map(|(i, k, v)| (i, k, &mut v.0))
188    }
189
190    /// Obtains a reference to a service in the ready set by index.
191    pub fn get_ready_index(&self, idx: usize) -> Option<(&K, &S)> {
192        self.ready.get_index(idx).map(|(k, v)| (k, &v.0))
193    }
194
195    /// Obtains a mutable reference to a service in the ready set by index.
196    pub fn get_ready_index_mut(&mut self, idx: usize) -> Option<(&mut K, &mut S)> {
197        self.ready.get_index_mut(idx).map(|(k, v)| (k, &mut v.0))
198    }
199
200    /// Evicts an item from the cache.
201    ///
202    /// Returns true if a service was marked for eviction.
203    ///
204    /// Services are dropped from the ready set immediately. Services in the
205    /// pending set are marked for cancellation, but [`ReadyCache::poll_pending`]
206    /// must be called to cause the service to be dropped.
207    pub fn evict<Q: Hash + Equivalent<K>>(&mut self, key: &Q) -> bool {
208        let canceled = if let Some(c) = self.pending_cancel_txs.swap_remove(key) {
209            c.cancel();
210            true
211        } else {
212            false
213        };
214
215        self.ready
216            .swap_remove_full(key)
217            .map(|_| true)
218            .unwrap_or(canceled)
219    }
220}
221
222impl<K, S, Req> ReadyCache<K, S, Req>
223where
224    K: Clone + Eq + Hash,
225    S: Service<Req>,
226    <S as Service<Req>>::Error: Into<crate::BoxError>,
227    S::Error: Into<crate::BoxError>,
228{
229    /// Pushes a new service onto the pending set.
230    ///
231    /// The service will be promoted to the ready set as [`poll_pending`] is invoked.
232    ///
233    /// Note that this does **not** remove services from the ready set. Once the
234    /// old service is used, it will be dropped instead of being added back to
235    /// the pending set; OR, when the new service becomes ready, it will replace
236    /// the prior service in the ready set.
237    ///
238    /// [`poll_pending`]: crate::ready_cache::cache::ReadyCache::poll_pending
239    pub fn push(&mut self, key: K, svc: S) {
240        let cancel = cancelable();
241        self.push_pending(key, svc, cancel);
242    }
243
244    fn push_pending(&mut self, key: K, svc: S, (cancel_tx, cancel_rx): CancelPair) {
245        if let Some(c) = self.pending_cancel_txs.insert(key.clone(), cancel_tx) {
246            // If there is already a service for this key, cancel it.
247            c.cancel();
248        }
249        self.pending.push(Pending {
250            key: Some(key),
251            cancel: Some(cancel_rx),
252            ready: Some(svc),
253            _pd: std::marker::PhantomData,
254        });
255    }
256
257    /// Polls services pending readiness, adding ready services to the ready set.
258    ///
259    /// Returns [`Poll::Ready`] when there are no remaining unready services.
260    /// [`poll_pending`] should be called again after [`push`] or
261    /// [`call_ready_index`] are invoked.
262    ///
263    /// Failures indicate that an individual pending service failed to become
264    /// ready (and has been removed from the cache). In such a case,
265    /// [`poll_pending`] should typically be called again to continue driving
266    /// pending services to readiness.
267    ///
268    /// [`poll_pending`]: crate::ready_cache::cache::ReadyCache::poll_pending
269    /// [`push`]: crate::ready_cache::cache::ReadyCache::push
270    /// [`call_ready_index`]: crate::ready_cache::cache::ReadyCache::call_ready_index
271    pub fn poll_pending(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), error::Failed<K>>> {
272        loop {
273            match Pin::new(&mut self.pending).poll_next(cx) {
274                Poll::Pending => return Poll::Pending,
275                Poll::Ready(None) => return Poll::Ready(Ok(())),
276                Poll::Ready(Some(Ok((key, svc, cancel_rx)))) => {
277                    trace!("endpoint ready");
278                    let cancel_tx = self.pending_cancel_txs.swap_remove(&key);
279                    if let Some(cancel_tx) = cancel_tx {
280                        // Keep track of the cancelation so that it need not be
281                        // recreated after the service is used.
282                        self.ready.insert(key, (svc, (cancel_tx, cancel_rx)));
283                    } else {
284                        assert!(
285                            cancel_tx.is_some(),
286                            "services that become ready must have a pending cancelation"
287                        );
288                    }
289                }
290                Poll::Ready(Some(Err(PendingError::Canceled(_)))) => {
291                    debug!("endpoint canceled");
292                    // The cancellation for this service was removed in order to
293                    // cause this cancellation.
294                }
295                Poll::Ready(Some(Err(PendingError::Inner(key, e)))) => {
296                    let cancel_tx = self.pending_cancel_txs.swap_remove(&key);
297                    assert!(
298                        cancel_tx.is_some(),
299                        "services that return an error must have a pending cancelation"
300                    );
301                    return Err(error::Failed(key, e.into())).into();
302                }
303            }
304        }
305    }
306
307    /// Checks whether the referenced endpoint is ready.
308    ///
309    /// Returns true if the endpoint is ready and false if it is not. An error is
310    /// returned if the endpoint fails.
311    pub fn check_ready<Q: Hash + Equivalent<K>>(
312        &mut self,
313        cx: &mut Context<'_>,
314        key: &Q,
315    ) -> Result<bool, error::Failed<K>> {
316        match self.ready.get_full_mut(key) {
317            Some((index, _, _)) => self.check_ready_index(cx, index),
318            None => Ok(false),
319        }
320    }
321
322    /// Checks whether the referenced endpoint is ready.
323    ///
324    /// If the service is no longer ready, it is moved back into the pending set
325    /// and `false` is returned.
326    ///
327    /// If the service errors, it is removed and dropped and the error is returned.
328    pub fn check_ready_index(
329        &mut self,
330        cx: &mut Context<'_>,
331        index: usize,
332    ) -> Result<bool, error::Failed<K>> {
333        let svc = match self.ready.get_index_mut(index) {
334            None => return Ok(false),
335            Some((_, (svc, _))) => svc,
336        };
337        match svc.poll_ready(cx) {
338            Poll::Ready(Ok(())) => Ok(true),
339            Poll::Pending => {
340                // became unready; so move it back there.
341                let (key, (svc, cancel)) = self
342                    .ready
343                    .swap_remove_index(index)
344                    .expect("invalid ready index");
345
346                // If a new version of this service has been added to the
347                // unready set, don't overwrite it.
348                if !self.pending_contains(&key) {
349                    self.push_pending(key, svc, cancel);
350                }
351
352                Ok(false)
353            }
354            Poll::Ready(Err(e)) => {
355                // failed, so drop it.
356                let (key, _) = self
357                    .ready
358                    .swap_remove_index(index)
359                    .expect("invalid ready index");
360                Err(error::Failed(key, e.into()))
361            }
362        }
363    }
364
365    /// Calls a ready service by key.
366    ///
367    /// # Panics
368    ///
369    /// If the specified key does not exist in the ready
370    pub fn call_ready<Q: Hash + Equivalent<K>>(&mut self, key: &Q, req: Req) -> S::Future {
371        let (index, _, _) = self
372            .ready
373            .get_full_mut(key)
374            .expect("check_ready was not called");
375        self.call_ready_index(index, req)
376    }
377
378    /// Calls a ready service by index.
379    ///
380    /// # Panics
381    ///
382    /// If the specified index is out of range.
383    pub fn call_ready_index(&mut self, index: usize, req: Req) -> S::Future {
384        let (key, (mut svc, cancel)) = self
385            .ready
386            .swap_remove_index(index)
387            .expect("check_ready_index was not called");
388
389        let fut = svc.call(req);
390
391        // If a new version of this service has been added to the
392        // unready set, don't overwrite it.
393        if !self.pending_contains(&key) {
394            self.push_pending(key, svc, cancel);
395        }
396
397        fut
398    }
399}
400
401// === impl Cancel ===
402
403/// Creates a cancelation sender and receiver.
404///
405/// A `tokio::sync::oneshot` is NOT used, as a `Receiver` is not guaranteed to
406/// observe results as soon as a `Sender` fires. Using an `AtomicBool` allows
407/// the state to be observed as soon as the cancelation is triggered.
408fn cancelable() -> CancelPair {
409    let cx = Arc::new(Cancel {
410        waker: AtomicWaker::new(),
411        canceled: AtomicBool::new(false),
412    });
413    (CancelTx(cx.clone()), CancelRx(cx))
414}
415
416impl CancelTx {
417    fn cancel(self) {
418        self.0.canceled.store(true, Ordering::SeqCst);
419        self.0.waker.wake();
420    }
421}
422
423// === Pending ===
424
425impl<K, S, Req> Future for Pending<K, S, Req>
426where
427    S: Service<Req>,
428{
429    type Output = Result<(K, S, CancelRx), PendingError<K, S::Error>>;
430
431    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
432        let this = self.project();
433        // Before checking whether the service is ready, check to see whether
434        // readiness has been canceled.
435        let CancelRx(cancel) = this.cancel.as_mut().expect("polled after complete");
436        if cancel.canceled.load(Ordering::SeqCst) {
437            let key = this.key.take().expect("polled after complete");
438            return Err(PendingError::Canceled(key)).into();
439        }
440
441        match this
442            .ready
443            .as_mut()
444            .expect("polled after ready")
445            .poll_ready(cx)
446        {
447            Poll::Pending => {
448                // Before returning Pending, register interest in cancelation so
449                // that this future is polled again if the state changes.
450                let CancelRx(cancel) = this.cancel.as_mut().expect("polled after complete");
451                cancel.waker.register(cx.waker());
452                // Because both the cancel receiver and cancel sender are held
453                // by the `ReadyCache` (i.e., on a single task), then it must
454                // not be possible for the cancelation state to change while
455                // polling a `Pending` service.
456                assert!(
457                    !cancel.canceled.load(Ordering::SeqCst),
458                    "cancelation cannot be notified while polling a pending service"
459                );
460                Poll::Pending
461            }
462            Poll::Ready(Ok(())) => {
463                let key = this.key.take().expect("polled after complete");
464                let cancel = this.cancel.take().expect("polled after complete");
465                Ok((key, this.ready.take().expect("polled after ready"), cancel)).into()
466            }
467            Poll::Ready(Err(e)) => {
468                let key = this.key.take().expect("polled after compete");
469                Err(PendingError::Inner(key, e)).into()
470            }
471        }
472    }
473}
474
475impl<K, S, Req> fmt::Debug for Pending<K, S, Req>
476where
477    K: fmt::Debug,
478    S: fmt::Debug,
479{
480    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
481        let Self {
482            key,
483            cancel,
484            ready,
485            _pd,
486        } = self;
487        f.debug_struct("Pending")
488            .field("key", key)
489            .field("cancel", cancel)
490            .field("ready", ready)
491            .finish()
492    }
493}