tower/ready_cache/
cache.rs

1//! A cache of services.
2
3use super::error;
4use futures_core::Stream;
5use futures_util::{stream::FuturesUnordered, task::AtomicWaker};
6pub use indexmap::Equivalent;
7use indexmap::IndexMap;
8use std::fmt;
9use std::future::Future;
10use std::hash::Hash;
11use std::pin::Pin;
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::sync::Arc;
14use std::task::{Context, Poll};
15use tower_service::Service;
16use tracing::{debug, trace};
17
18/// Drives readiness over a set of services.
19///
20/// The cache maintains two internal data structures:
21///
22/// * a set of _pending_ services that have not yet become ready; and
23/// * a set of _ready_ services that have previously polled ready.
24///
25/// As each `S` typed [`Service`] is added to the cache via [`ReadyCache::push`], it
26/// is added to the _pending set_. As [`ReadyCache::poll_pending`] is invoked,
27/// pending services are polled and added to the _ready set_.
28///
29/// [`ReadyCache::call_ready`] (or [`ReadyCache::call_ready_index`]) dispatches a
30/// request to the specified service, but panics if the specified service is not
31/// in the ready set. The `ReadyCache::check_*` functions can be used to ensure
32/// that a service is ready before dispatching a request.
33///
34/// The ready set can hold services for an arbitrarily long time. During this
35/// time, the runtime may process events that invalidate that ready state (for
36/// instance, if a keepalive detects a lost connection). In such cases, callers
37/// should use [`ReadyCache::check_ready`] (or [`ReadyCache::check_ready_index`])
38/// immediately before dispatching a request to ensure that the service has not
39/// become unavailable.
40///
41/// Once `ReadyCache::call_ready*` is invoked, the service is placed back into
42/// the _pending_ set to be driven to readiness again.
43///
44/// When `ReadyCache::check_ready*` returns `false`, it indicates that the
45/// specified service is _not_ ready. If an error is returned, this indicates that
46/// the server failed and has been removed from the cache entirely.
47///
48/// [`ReadyCache::evict`] can be used to remove a service from the cache (by key),
49/// though the service may not be dropped (if it is currently pending) until
50/// [`ReadyCache::poll_pending`] is invoked.
51///
52/// Note that the by-index accessors are provided to support use cases (like
53/// power-of-two-choices load balancing) where the caller does not care to keep
54/// track of each service's key. Instead, it needs only to access _some_ ready
55/// service. In such a case, it should be noted that calls to
56/// [`ReadyCache::poll_pending`] and [`ReadyCache::evict`] may perturb the order of
57/// the ready set, so any cached indexes should be discarded after such a call.
58pub struct ReadyCache<K, S, Req>
59where
60    K: Eq + Hash,
61{
62    /// A stream of services that are not yet ready.
63    pending: FuturesUnordered<Pending<K, S, Req>>,
64    /// An index of cancelation handles for pending streams.
65    pending_cancel_txs: IndexMap<K, CancelTx>,
66
67    /// Services that have previously become ready. Readiness can become stale,
68    /// so a given service should be polled immediately before use.
69    ///
70    /// The cancelation oneshot is preserved (though unused) while the service is
71    /// ready so that it need not be reallocated each time a request is
72    /// dispatched.
73    ready: IndexMap<K, (S, CancelPair)>,
74}
75
76// Safety: This is safe because we do not use `Pin::new_unchecked`.
77impl<S, K: Eq + Hash, Req> Unpin for ReadyCache<K, S, Req> {}
78
79#[derive(Debug)]
80struct Cancel {
81    waker: AtomicWaker,
82    canceled: AtomicBool,
83}
84
85#[derive(Debug)]
86struct CancelRx(Arc<Cancel>);
87
88#[derive(Debug)]
89struct CancelTx(Arc<Cancel>);
90
91type CancelPair = (CancelTx, CancelRx);
92
93#[derive(Debug)]
94enum PendingError<K, E> {
95    Canceled(K),
96    Inner(K, E),
97}
98
99pin_project_lite::pin_project! {
100    /// A [`Future`] that becomes satisfied when an `S`-typed service is ready.
101    ///
102    /// May fail due to cancelation, i.e. if the service is evicted from the balancer.
103    struct Pending<K, S, Req> {
104        key: Option<K>,
105        cancel: Option<CancelRx>,
106        ready: Option<S>,
107        _pd: std::marker::PhantomData<Req>,
108    }
109}
110
111// === ReadyCache ===
112
113impl<K, S, Req> Default for ReadyCache<K, S, Req>
114where
115    K: Eq + Hash,
116    S: Service<Req>,
117{
118    fn default() -> Self {
119        Self {
120            ready: IndexMap::default(),
121            pending: FuturesUnordered::new(),
122            pending_cancel_txs: IndexMap::default(),
123        }
124    }
125}
126
127impl<K, S, Req> fmt::Debug for ReadyCache<K, S, Req>
128where
129    K: fmt::Debug + Eq + Hash,
130    S: fmt::Debug,
131{
132    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
133        let Self {
134            pending,
135            pending_cancel_txs,
136            ready,
137        } = self;
138        f.debug_struct("ReadyCache")
139            .field("pending", pending)
140            .field("pending_cancel_txs", pending_cancel_txs)
141            .field("ready", ready)
142            .finish()
143    }
144}
145
146impl<K, S, Req> ReadyCache<K, S, Req>
147where
148    K: Eq + Hash,
149{
150    /// Returns the total number of services in the cache.
151    pub fn len(&self) -> usize {
152        self.ready_len() + self.pending_len()
153    }
154
155    /// Returns whether or not there are any services in the cache.
156    pub fn is_empty(&self) -> bool {
157        self.ready.is_empty() && self.pending.is_empty()
158    }
159
160    /// Returns the number of services in the ready set.
161    pub fn ready_len(&self) -> usize {
162        self.ready.len()
163    }
164
165    /// Returns the number of services in the unready set.
166    pub fn pending_len(&self) -> usize {
167        self.pending.len()
168    }
169
170    /// Returns true iff the given key is in the unready set.
171    pub fn pending_contains<Q: Hash + Equivalent<K>>(&self, key: &Q) -> bool {
172        self.pending_cancel_txs.contains_key(key)
173    }
174
175    /// Obtains a reference to a service in the ready set by key.
176    pub fn get_ready<Q: Hash + Equivalent<K>>(&self, key: &Q) -> Option<(usize, &K, &S)> {
177        self.ready.get_full(key).map(|(i, k, v)| (i, k, &v.0))
178    }
179
180    /// Obtains a mutable reference to a service in the ready set by key.
181    pub fn get_ready_mut<Q: Hash + Equivalent<K>>(
182        &mut self,
183        key: &Q,
184    ) -> Option<(usize, &K, &mut S)> {
185        self.ready
186            .get_full_mut(key)
187            .map(|(i, k, v)| (i, k, &mut v.0))
188    }
189
190    /// Obtains a reference to a service in the ready set by index.
191    pub fn get_ready_index(&self, idx: usize) -> Option<(&K, &S)> {
192        self.ready.get_index(idx).map(|(k, v)| (k, &v.0))
193    }
194
195    /// Obtains a mutable reference to a service in the ready set by index.
196    pub fn get_ready_index_mut(&mut self, idx: usize) -> Option<(&K, &mut S)> {
197        self.ready.get_index_mut(idx).map(|(k, v)| (k, &mut v.0))
198    }
199
200    /// Returns an iterator over the ready keys and services.
201    pub fn iter_ready(&self) -> impl Iterator<Item = (&K, &S)> {
202        self.ready.iter().map(|(k, s)| (k, &s.0))
203    }
204
205    /// Returns a mutable iterator over the ready keys and services.
206    pub fn iter_ready_mut(&mut self) -> impl Iterator<Item = (&K, &mut S)> {
207        self.ready.iter_mut().map(|(k, s)| (k, &mut s.0))
208    }
209
210    /// Evicts an item from the cache.
211    ///
212    /// Returns true if a service was marked for eviction.
213    ///
214    /// Services are dropped from the ready set immediately. Services in the
215    /// pending set are marked for cancellation, but [`ReadyCache::poll_pending`]
216    /// must be called to cause the service to be dropped.
217    pub fn evict<Q: Hash + Equivalent<K>>(&mut self, key: &Q) -> bool {
218        let canceled = if let Some(c) = self.pending_cancel_txs.swap_remove(key) {
219            c.cancel();
220            true
221        } else {
222            false
223        };
224
225        self.ready
226            .swap_remove_full(key)
227            .map(|_| true)
228            .unwrap_or(canceled)
229    }
230}
231
232impl<K, S, Req> ReadyCache<K, S, Req>
233where
234    K: Clone + Eq + Hash,
235    S: Service<Req>,
236    <S as Service<Req>>::Error: Into<crate::BoxError>,
237    S::Error: Into<crate::BoxError>,
238{
239    /// Pushes a new service onto the pending set.
240    ///
241    /// The service will be promoted to the ready set as [`poll_pending`] is invoked.
242    ///
243    /// Note that this does **not** remove services from the ready set. Once the
244    /// old service is used, it will be dropped instead of being added back to
245    /// the pending set; OR, when the new service becomes ready, it will replace
246    /// the prior service in the ready set.
247    ///
248    /// [`poll_pending`]: crate::ready_cache::cache::ReadyCache::poll_pending
249    pub fn push(&mut self, key: K, svc: S) {
250        let cancel = cancelable();
251        self.push_pending(key, svc, cancel);
252    }
253
254    fn push_pending(&mut self, key: K, svc: S, (cancel_tx, cancel_rx): CancelPair) {
255        if let Some(c) = self.pending_cancel_txs.insert(key.clone(), cancel_tx) {
256            // If there is already a service for this key, cancel it.
257            c.cancel();
258        }
259        self.pending.push(Pending {
260            key: Some(key),
261            cancel: Some(cancel_rx),
262            ready: Some(svc),
263            _pd: std::marker::PhantomData,
264        });
265    }
266
267    /// Polls services pending readiness, adding ready services to the ready set.
268    ///
269    /// Returns [`Poll::Ready`] when there are no remaining unready services.
270    /// [`poll_pending`] should be called again after [`push`] or
271    /// [`call_ready_index`] are invoked.
272    ///
273    /// Failures indicate that an individual pending service failed to become
274    /// ready (and has been removed from the cache). In such a case,
275    /// [`poll_pending`] should typically be called again to continue driving
276    /// pending services to readiness.
277    ///
278    /// [`poll_pending`]: crate::ready_cache::cache::ReadyCache::poll_pending
279    /// [`push`]: crate::ready_cache::cache::ReadyCache::push
280    /// [`call_ready_index`]: crate::ready_cache::cache::ReadyCache::call_ready_index
281    pub fn poll_pending(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), error::Failed<K>>> {
282        loop {
283            match Pin::new(&mut self.pending).poll_next(cx) {
284                Poll::Pending => return Poll::Pending,
285                Poll::Ready(None) => return Poll::Ready(Ok(())),
286                Poll::Ready(Some(Ok((key, svc, cancel_rx)))) => {
287                    trace!("endpoint ready");
288                    let cancel_tx = self.pending_cancel_txs.swap_remove(&key);
289                    if let Some(cancel_tx) = cancel_tx {
290                        // Keep track of the cancelation so that it need not be
291                        // recreated after the service is used.
292                        self.ready.insert(key, (svc, (cancel_tx, cancel_rx)));
293                    } else {
294                        assert!(
295                            cancel_tx.is_some(),
296                            "services that become ready must have a pending cancelation"
297                        );
298                    }
299                }
300                Poll::Ready(Some(Err(PendingError::Canceled(_)))) => {
301                    debug!("endpoint canceled");
302                    // The cancellation for this service was removed in order to
303                    // cause this cancellation.
304                }
305                Poll::Ready(Some(Err(PendingError::Inner(key, e)))) => {
306                    let cancel_tx = self.pending_cancel_txs.swap_remove(&key);
307                    assert!(
308                        cancel_tx.is_some(),
309                        "services that return an error must have a pending cancelation"
310                    );
311                    return Err(error::Failed(key, e.into())).into();
312                }
313            }
314        }
315    }
316
317    /// Checks whether the referenced endpoint is ready.
318    ///
319    /// Returns true if the endpoint is ready and false if it is not. An error is
320    /// returned if the endpoint fails.
321    pub fn check_ready<Q: Hash + Equivalent<K>>(
322        &mut self,
323        cx: &mut Context<'_>,
324        key: &Q,
325    ) -> Result<bool, error::Failed<K>> {
326        match self.ready.get_full_mut(key) {
327            Some((index, _, _)) => self.check_ready_index(cx, index),
328            None => Ok(false),
329        }
330    }
331
332    /// Checks whether the referenced endpoint is ready.
333    ///
334    /// If the service is no longer ready, it is moved back into the pending set
335    /// and `false` is returned.
336    ///
337    /// If the service errors, it is removed and dropped and the error is returned.
338    pub fn check_ready_index(
339        &mut self,
340        cx: &mut Context<'_>,
341        index: usize,
342    ) -> Result<bool, error::Failed<K>> {
343        let svc = match self.ready.get_index_mut(index) {
344            None => return Ok(false),
345            Some((_, (svc, _))) => svc,
346        };
347        match svc.poll_ready(cx) {
348            Poll::Ready(Ok(())) => Ok(true),
349            Poll::Pending => {
350                // became unready; so move it back there.
351                let (key, (svc, cancel)) = self
352                    .ready
353                    .swap_remove_index(index)
354                    .expect("invalid ready index");
355
356                // If a new version of this service has been added to the
357                // unready set, don't overwrite it.
358                if !self.pending_contains(&key) {
359                    self.push_pending(key, svc, cancel);
360                }
361
362                Ok(false)
363            }
364            Poll::Ready(Err(e)) => {
365                // failed, so drop it.
366                let (key, _) = self
367                    .ready
368                    .swap_remove_index(index)
369                    .expect("invalid ready index");
370                Err(error::Failed(key, e.into()))
371            }
372        }
373    }
374
375    /// Calls a ready service by key.
376    ///
377    /// # Panics
378    ///
379    /// If the specified key does not exist in the ready
380    pub fn call_ready<Q: Hash + Equivalent<K>>(&mut self, key: &Q, req: Req) -> S::Future {
381        let (index, _, _) = self
382            .ready
383            .get_full_mut(key)
384            .expect("check_ready was not called");
385        self.call_ready_index(index, req)
386    }
387
388    /// Calls a ready service by index.
389    ///
390    /// # Panics
391    ///
392    /// If the specified index is out of range.
393    pub fn call_ready_index(&mut self, index: usize, req: Req) -> S::Future {
394        let (key, (mut svc, cancel)) = self
395            .ready
396            .swap_remove_index(index)
397            .expect("check_ready_index was not called");
398
399        let fut = svc.call(req);
400
401        // If a new version of this service has been added to the
402        // unready set, don't overwrite it.
403        if !self.pending_contains(&key) {
404            self.push_pending(key, svc, cancel);
405        }
406
407        fut
408    }
409}
410
411// === impl Cancel ===
412
413/// Creates a cancelation sender and receiver.
414///
415/// A `tokio::sync::oneshot` is NOT used, as a `Receiver` is not guaranteed to
416/// observe results as soon as a `Sender` fires. Using an `AtomicBool` allows
417/// the state to be observed as soon as the cancelation is triggered.
418fn cancelable() -> CancelPair {
419    let cx = Arc::new(Cancel {
420        waker: AtomicWaker::new(),
421        canceled: AtomicBool::new(false),
422    });
423    (CancelTx(cx.clone()), CancelRx(cx))
424}
425
426impl CancelTx {
427    fn cancel(self) {
428        self.0.canceled.store(true, Ordering::SeqCst);
429        self.0.waker.wake();
430    }
431}
432
433// === Pending ===
434
435impl<K, S, Req> Future for Pending<K, S, Req>
436where
437    S: Service<Req>,
438{
439    type Output = Result<(K, S, CancelRx), PendingError<K, S::Error>>;
440
441    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
442        let this = self.project();
443        // Before checking whether the service is ready, check to see whether
444        // readiness has been canceled.
445        let CancelRx(cancel) = this.cancel.as_mut().expect("polled after complete");
446        if cancel.canceled.load(Ordering::SeqCst) {
447            let key = this.key.take().expect("polled after complete");
448            return Err(PendingError::Canceled(key)).into();
449        }
450
451        match this
452            .ready
453            .as_mut()
454            .expect("polled after ready")
455            .poll_ready(cx)
456        {
457            Poll::Pending => {
458                // Before returning Pending, register interest in cancelation so
459                // that this future is polled again if the state changes.
460                let CancelRx(cancel) = this.cancel.as_mut().expect("polled after complete");
461                cancel.waker.register(cx.waker());
462                // Because both the cancel receiver and cancel sender are held
463                // by the `ReadyCache` (i.e., on a single task), then it must
464                // not be possible for the cancelation state to change while
465                // polling a `Pending` service.
466                assert!(
467                    !cancel.canceled.load(Ordering::SeqCst),
468                    "cancelation cannot be notified while polling a pending service"
469                );
470                Poll::Pending
471            }
472            Poll::Ready(Ok(())) => {
473                let key = this.key.take().expect("polled after complete");
474                let cancel = this.cancel.take().expect("polled after complete");
475                Ok((key, this.ready.take().expect("polled after ready"), cancel)).into()
476            }
477            Poll::Ready(Err(e)) => {
478                let key = this.key.take().expect("polled after compete");
479                Err(PendingError::Inner(key, e)).into()
480            }
481        }
482    }
483}
484
485impl<K, S, Req> fmt::Debug for Pending<K, S, Req>
486where
487    K: fmt::Debug,
488    S: fmt::Debug,
489{
490    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
491        let Self {
492            key,
493            cancel,
494            ready,
495            _pd,
496        } = self;
497        f.debug_struct("Pending")
498            .field("key", key)
499            .field("cancel", cancel)
500            .field("ready", ready)
501            .finish()
502    }
503}