tower/ready_cache/cache.rs
1//! A cache of services.
2
3use super::error;
4use futures_core::Stream;
5use futures_util::{stream::FuturesUnordered, task::AtomicWaker};
6pub use indexmap::Equivalent;
7use indexmap::IndexMap;
8use std::fmt;
9use std::future::Future;
10use std::hash::Hash;
11use std::pin::Pin;
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::sync::Arc;
14use std::task::{Context, Poll};
15use tower_service::Service;
16use tracing::{debug, trace};
17
18/// Drives readiness over a set of services.
19///
20/// The cache maintains two internal data structures:
21///
22/// * a set of _pending_ services that have not yet become ready; and
23/// * a set of _ready_ services that have previously polled ready.
24///
25/// As each `S` typed [`Service`] is added to the cache via [`ReadyCache::push`], it
26/// is added to the _pending set_. As [`ReadyCache::poll_pending`] is invoked,
27/// pending services are polled and added to the _ready set_.
28///
29/// [`ReadyCache::call_ready`] (or [`ReadyCache::call_ready_index`]) dispatches a
30/// request to the specified service, but panics if the specified service is not
31/// in the ready set. The `ReadyCache::check_*` functions can be used to ensure
32/// that a service is ready before dispatching a request.
33///
34/// The ready set can hold services for an arbitrarily long time. During this
35/// time, the runtime may process events that invalidate that ready state (for
36/// instance, if a keepalive detects a lost connection). In such cases, callers
37/// should use [`ReadyCache::check_ready`] (or [`ReadyCache::check_ready_index`])
38/// immediately before dispatching a request to ensure that the service has not
39/// become unavailable.
40///
41/// Once `ReadyCache::call_ready*` is invoked, the service is placed back into
42/// the _pending_ set to be driven to readiness again.
43///
44/// When `ReadyCache::check_ready*` returns `false`, it indicates that the
45/// specified service is _not_ ready. If an error is returned, this indicates that
46/// the server failed and has been removed from the cache entirely.
47///
48/// [`ReadyCache::evict`] can be used to remove a service from the cache (by key),
49/// though the service may not be dropped (if it is currently pending) until
50/// [`ReadyCache::poll_pending`] is invoked.
51///
52/// Note that the by-index accessors are provided to support use cases (like
53/// power-of-two-choices load balancing) where the caller does not care to keep
54/// track of each service's key. Instead, it needs only to access _some_ ready
55/// service. In such a case, it should be noted that calls to
56/// [`ReadyCache::poll_pending`] and [`ReadyCache::evict`] may perturb the order of
57/// the ready set, so any cached indexes should be discarded after such a call.
58pub struct ReadyCache<K, S, Req>
59where
60 K: Eq + Hash,
61{
62 /// A stream of services that are not yet ready.
63 pending: FuturesUnordered<Pending<K, S, Req>>,
64 /// An index of cancelation handles for pending streams.
65 pending_cancel_txs: IndexMap<K, CancelTx>,
66
67 /// Services that have previously become ready. Readiness can become stale,
68 /// so a given service should be polled immediately before use.
69 ///
70 /// The cancelation oneshot is preserved (though unused) while the service is
71 /// ready so that it need not be reallocated each time a request is
72 /// dispatched.
73 ready: IndexMap<K, (S, CancelPair)>,
74}
75
76// Safety: This is safe because we do not use `Pin::new_unchecked`.
77impl<S, K: Eq + Hash, Req> Unpin for ReadyCache<K, S, Req> {}
78
79#[derive(Debug)]
80struct Cancel {
81 waker: AtomicWaker,
82 canceled: AtomicBool,
83}
84
85#[derive(Debug)]
86struct CancelRx(Arc<Cancel>);
87
88#[derive(Debug)]
89struct CancelTx(Arc<Cancel>);
90
91type CancelPair = (CancelTx, CancelRx);
92
93#[derive(Debug)]
94enum PendingError<K, E> {
95 Canceled(K),
96 Inner(K, E),
97}
98
99pin_project_lite::pin_project! {
100 /// A [`Future`] that becomes satisfied when an `S`-typed service is ready.
101 ///
102 /// May fail due to cancelation, i.e. if the service is evicted from the balancer.
103 struct Pending<K, S, Req> {
104 key: Option<K>,
105 cancel: Option<CancelRx>,
106 ready: Option<S>,
107 _pd: std::marker::PhantomData<Req>,
108 }
109}
110
111// === ReadyCache ===
112
113impl<K, S, Req> Default for ReadyCache<K, S, Req>
114where
115 K: Eq + Hash,
116 S: Service<Req>,
117{
118 fn default() -> Self {
119 Self {
120 ready: IndexMap::default(),
121 pending: FuturesUnordered::new(),
122 pending_cancel_txs: IndexMap::default(),
123 }
124 }
125}
126
127impl<K, S, Req> fmt::Debug for ReadyCache<K, S, Req>
128where
129 K: fmt::Debug + Eq + Hash,
130 S: fmt::Debug,
131{
132 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
133 let Self {
134 pending,
135 pending_cancel_txs,
136 ready,
137 } = self;
138 f.debug_struct("ReadyCache")
139 .field("pending", pending)
140 .field("pending_cancel_txs", pending_cancel_txs)
141 .field("ready", ready)
142 .finish()
143 }
144}
145
146impl<K, S, Req> ReadyCache<K, S, Req>
147where
148 K: Eq + Hash,
149{
150 /// Returns the total number of services in the cache.
151 pub fn len(&self) -> usize {
152 self.ready_len() + self.pending_len()
153 }
154
155 /// Returns whether or not there are any services in the cache.
156 pub fn is_empty(&self) -> bool {
157 self.ready.is_empty() && self.pending.is_empty()
158 }
159
160 /// Returns the number of services in the ready set.
161 pub fn ready_len(&self) -> usize {
162 self.ready.len()
163 }
164
165 /// Returns the number of services in the unready set.
166 pub fn pending_len(&self) -> usize {
167 self.pending.len()
168 }
169
170 /// Returns true iff the given key is in the unready set.
171 pub fn pending_contains<Q: Hash + Equivalent<K>>(&self, key: &Q) -> bool {
172 self.pending_cancel_txs.contains_key(key)
173 }
174
175 /// Obtains a reference to a service in the ready set by key.
176 pub fn get_ready<Q: Hash + Equivalent<K>>(&self, key: &Q) -> Option<(usize, &K, &S)> {
177 self.ready.get_full(key).map(|(i, k, v)| (i, k, &v.0))
178 }
179
180 /// Obtains a mutable reference to a service in the ready set by key.
181 pub fn get_ready_mut<Q: Hash + Equivalent<K>>(
182 &mut self,
183 key: &Q,
184 ) -> Option<(usize, &K, &mut S)> {
185 self.ready
186 .get_full_mut(key)
187 .map(|(i, k, v)| (i, k, &mut v.0))
188 }
189
190 /// Obtains a reference to a service in the ready set by index.
191 pub fn get_ready_index(&self, idx: usize) -> Option<(&K, &S)> {
192 self.ready.get_index(idx).map(|(k, v)| (k, &v.0))
193 }
194
195 /// Obtains a mutable reference to a service in the ready set by index.
196 pub fn get_ready_index_mut(&mut self, idx: usize) -> Option<(&K, &mut S)> {
197 self.ready.get_index_mut(idx).map(|(k, v)| (k, &mut v.0))
198 }
199
200 /// Returns an iterator over the ready keys and services.
201 pub fn iter_ready(&self) -> impl Iterator<Item = (&K, &S)> {
202 self.ready.iter().map(|(k, s)| (k, &s.0))
203 }
204
205 /// Returns a mutable iterator over the ready keys and services.
206 pub fn iter_ready_mut(&mut self) -> impl Iterator<Item = (&K, &mut S)> {
207 self.ready.iter_mut().map(|(k, s)| (k, &mut s.0))
208 }
209
210 /// Evicts an item from the cache.
211 ///
212 /// Returns true if a service was marked for eviction.
213 ///
214 /// Services are dropped from the ready set immediately. Services in the
215 /// pending set are marked for cancellation, but [`ReadyCache::poll_pending`]
216 /// must be called to cause the service to be dropped.
217 pub fn evict<Q: Hash + Equivalent<K>>(&mut self, key: &Q) -> bool {
218 let canceled = if let Some(c) = self.pending_cancel_txs.swap_remove(key) {
219 c.cancel();
220 true
221 } else {
222 false
223 };
224
225 self.ready
226 .swap_remove_full(key)
227 .map(|_| true)
228 .unwrap_or(canceled)
229 }
230}
231
232impl<K, S, Req> ReadyCache<K, S, Req>
233where
234 K: Clone + Eq + Hash,
235 S: Service<Req>,
236 <S as Service<Req>>::Error: Into<crate::BoxError>,
237 S::Error: Into<crate::BoxError>,
238{
239 /// Pushes a new service onto the pending set.
240 ///
241 /// The service will be promoted to the ready set as [`poll_pending`] is invoked.
242 ///
243 /// Note that this does **not** remove services from the ready set. Once the
244 /// old service is used, it will be dropped instead of being added back to
245 /// the pending set; OR, when the new service becomes ready, it will replace
246 /// the prior service in the ready set.
247 ///
248 /// [`poll_pending`]: crate::ready_cache::cache::ReadyCache::poll_pending
249 pub fn push(&mut self, key: K, svc: S) {
250 let cancel = cancelable();
251 self.push_pending(key, svc, cancel);
252 }
253
254 fn push_pending(&mut self, key: K, svc: S, (cancel_tx, cancel_rx): CancelPair) {
255 if let Some(c) = self.pending_cancel_txs.insert(key.clone(), cancel_tx) {
256 // If there is already a service for this key, cancel it.
257 c.cancel();
258 }
259 self.pending.push(Pending {
260 key: Some(key),
261 cancel: Some(cancel_rx),
262 ready: Some(svc),
263 _pd: std::marker::PhantomData,
264 });
265 }
266
267 /// Polls services pending readiness, adding ready services to the ready set.
268 ///
269 /// Returns [`Poll::Ready`] when there are no remaining unready services.
270 /// [`poll_pending`] should be called again after [`push`] or
271 /// [`call_ready_index`] are invoked.
272 ///
273 /// Failures indicate that an individual pending service failed to become
274 /// ready (and has been removed from the cache). In such a case,
275 /// [`poll_pending`] should typically be called again to continue driving
276 /// pending services to readiness.
277 ///
278 /// [`poll_pending`]: crate::ready_cache::cache::ReadyCache::poll_pending
279 /// [`push`]: crate::ready_cache::cache::ReadyCache::push
280 /// [`call_ready_index`]: crate::ready_cache::cache::ReadyCache::call_ready_index
281 pub fn poll_pending(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), error::Failed<K>>> {
282 loop {
283 match Pin::new(&mut self.pending).poll_next(cx) {
284 Poll::Pending => return Poll::Pending,
285 Poll::Ready(None) => return Poll::Ready(Ok(())),
286 Poll::Ready(Some(Ok((key, svc, cancel_rx)))) => {
287 trace!("endpoint ready");
288 let cancel_tx = self.pending_cancel_txs.swap_remove(&key);
289 if let Some(cancel_tx) = cancel_tx {
290 // Keep track of the cancelation so that it need not be
291 // recreated after the service is used.
292 self.ready.insert(key, (svc, (cancel_tx, cancel_rx)));
293 } else {
294 assert!(
295 cancel_tx.is_some(),
296 "services that become ready must have a pending cancelation"
297 );
298 }
299 }
300 Poll::Ready(Some(Err(PendingError::Canceled(_)))) => {
301 debug!("endpoint canceled");
302 // The cancellation for this service was removed in order to
303 // cause this cancellation.
304 }
305 Poll::Ready(Some(Err(PendingError::Inner(key, e)))) => {
306 let cancel_tx = self.pending_cancel_txs.swap_remove(&key);
307 assert!(
308 cancel_tx.is_some(),
309 "services that return an error must have a pending cancelation"
310 );
311 return Err(error::Failed(key, e.into())).into();
312 }
313 }
314 }
315 }
316
317 /// Checks whether the referenced endpoint is ready.
318 ///
319 /// Returns true if the endpoint is ready and false if it is not. An error is
320 /// returned if the endpoint fails.
321 pub fn check_ready<Q: Hash + Equivalent<K>>(
322 &mut self,
323 cx: &mut Context<'_>,
324 key: &Q,
325 ) -> Result<bool, error::Failed<K>> {
326 match self.ready.get_full_mut(key) {
327 Some((index, _, _)) => self.check_ready_index(cx, index),
328 None => Ok(false),
329 }
330 }
331
332 /// Checks whether the referenced endpoint is ready.
333 ///
334 /// If the service is no longer ready, it is moved back into the pending set
335 /// and `false` is returned.
336 ///
337 /// If the service errors, it is removed and dropped and the error is returned.
338 pub fn check_ready_index(
339 &mut self,
340 cx: &mut Context<'_>,
341 index: usize,
342 ) -> Result<bool, error::Failed<K>> {
343 let svc = match self.ready.get_index_mut(index) {
344 None => return Ok(false),
345 Some((_, (svc, _))) => svc,
346 };
347 match svc.poll_ready(cx) {
348 Poll::Ready(Ok(())) => Ok(true),
349 Poll::Pending => {
350 // became unready; so move it back there.
351 let (key, (svc, cancel)) = self
352 .ready
353 .swap_remove_index(index)
354 .expect("invalid ready index");
355
356 // If a new version of this service has been added to the
357 // unready set, don't overwrite it.
358 if !self.pending_contains(&key) {
359 self.push_pending(key, svc, cancel);
360 }
361
362 Ok(false)
363 }
364 Poll::Ready(Err(e)) => {
365 // failed, so drop it.
366 let (key, _) = self
367 .ready
368 .swap_remove_index(index)
369 .expect("invalid ready index");
370 Err(error::Failed(key, e.into()))
371 }
372 }
373 }
374
375 /// Calls a ready service by key.
376 ///
377 /// # Panics
378 ///
379 /// If the specified key does not exist in the ready
380 pub fn call_ready<Q: Hash + Equivalent<K>>(&mut self, key: &Q, req: Req) -> S::Future {
381 let (index, _, _) = self
382 .ready
383 .get_full_mut(key)
384 .expect("check_ready was not called");
385 self.call_ready_index(index, req)
386 }
387
388 /// Calls a ready service by index.
389 ///
390 /// # Panics
391 ///
392 /// If the specified index is out of range.
393 pub fn call_ready_index(&mut self, index: usize, req: Req) -> S::Future {
394 let (key, (mut svc, cancel)) = self
395 .ready
396 .swap_remove_index(index)
397 .expect("check_ready_index was not called");
398
399 let fut = svc.call(req);
400
401 // If a new version of this service has been added to the
402 // unready set, don't overwrite it.
403 if !self.pending_contains(&key) {
404 self.push_pending(key, svc, cancel);
405 }
406
407 fut
408 }
409}
410
411// === impl Cancel ===
412
413/// Creates a cancelation sender and receiver.
414///
415/// A `tokio::sync::oneshot` is NOT used, as a `Receiver` is not guaranteed to
416/// observe results as soon as a `Sender` fires. Using an `AtomicBool` allows
417/// the state to be observed as soon as the cancelation is triggered.
418fn cancelable() -> CancelPair {
419 let cx = Arc::new(Cancel {
420 waker: AtomicWaker::new(),
421 canceled: AtomicBool::new(false),
422 });
423 (CancelTx(cx.clone()), CancelRx(cx))
424}
425
426impl CancelTx {
427 fn cancel(self) {
428 self.0.canceled.store(true, Ordering::SeqCst);
429 self.0.waker.wake();
430 }
431}
432
433// === Pending ===
434
435impl<K, S, Req> Future for Pending<K, S, Req>
436where
437 S: Service<Req>,
438{
439 type Output = Result<(K, S, CancelRx), PendingError<K, S::Error>>;
440
441 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
442 let this = self.project();
443 // Before checking whether the service is ready, check to see whether
444 // readiness has been canceled.
445 let CancelRx(cancel) = this.cancel.as_mut().expect("polled after complete");
446 if cancel.canceled.load(Ordering::SeqCst) {
447 let key = this.key.take().expect("polled after complete");
448 return Err(PendingError::Canceled(key)).into();
449 }
450
451 match this
452 .ready
453 .as_mut()
454 .expect("polled after ready")
455 .poll_ready(cx)
456 {
457 Poll::Pending => {
458 // Before returning Pending, register interest in cancelation so
459 // that this future is polled again if the state changes.
460 let CancelRx(cancel) = this.cancel.as_mut().expect("polled after complete");
461 cancel.waker.register(cx.waker());
462 // Because both the cancel receiver and cancel sender are held
463 // by the `ReadyCache` (i.e., on a single task), then it must
464 // not be possible for the cancelation state to change while
465 // polling a `Pending` service.
466 assert!(
467 !cancel.canceled.load(Ordering::SeqCst),
468 "cancelation cannot be notified while polling a pending service"
469 );
470 Poll::Pending
471 }
472 Poll::Ready(Ok(())) => {
473 let key = this.key.take().expect("polled after complete");
474 let cancel = this.cancel.take().expect("polled after complete");
475 Ok((key, this.ready.take().expect("polled after ready"), cancel)).into()
476 }
477 Poll::Ready(Err(e)) => {
478 let key = this.key.take().expect("polled after compete");
479 Err(PendingError::Inner(key, e)).into()
480 }
481 }
482 }
483}
484
485impl<K, S, Req> fmt::Debug for Pending<K, S, Req>
486where
487 K: fmt::Debug,
488 S: fmt::Debug,
489{
490 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
491 let Self {
492 key,
493 cancel,
494 ready,
495 _pd,
496 } = self;
497 f.debug_struct("Pending")
498 .field("key", key)
499 .field("cancel", cancel)
500 .field("ready", ready)
501 .finish()
502 }
503}