tower/ready_cache/cache.rs
1//! A cache of services.
2
3use super::error;
4use futures_core::Stream;
5use futures_util::{stream::FuturesUnordered, task::AtomicWaker};
6pub use indexmap::Equivalent;
7use indexmap::IndexMap;
8use std::fmt;
9use std::future::Future;
10use std::hash::Hash;
11use std::pin::Pin;
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::sync::Arc;
14use std::task::{Context, Poll};
15use tower_service::Service;
16use tracing::{debug, trace};
17
18/// Drives readiness over a set of services.
19///
20/// The cache maintains two internal data structures:
21///
22/// * a set of _pending_ services that have not yet become ready; and
23/// * a set of _ready_ services that have previously polled ready.
24///
25/// As each `S` typed [`Service`] is added to the cache via [`ReadyCache::push`], it
26/// is added to the _pending set_. As [`ReadyCache::poll_pending`] is invoked,
27/// pending services are polled and added to the _ready set_.
28///
29/// [`ReadyCache::call_ready`] (or [`ReadyCache::call_ready_index`]) dispatches a
30/// request to the specified service, but panics if the specified service is not
31/// in the ready set. The `ReadyCache::check_*` functions can be used to ensure
32/// that a service is ready before dispatching a request.
33///
34/// The ready set can hold services for an abitrarily long time. During this
35/// time, the runtime may process events that invalidate that ready state (for
36/// instance, if a keepalive detects a lost connection). In such cases, callers
37/// should use [`ReadyCache::check_ready`] (or [`ReadyCache::check_ready_index`])
38/// immediately before dispatching a request to ensure that the service has not
39/// become unavailable.
40///
41/// Once `ReadyCache::call_ready*` is invoked, the service is placed back into
42/// the _pending_ set to be driven to readiness again.
43///
44/// When `ReadyCache::check_ready*` returns `false`, it indicates that the
45/// specified service is _not_ ready. If an error is returned, this indicats that
46/// the server failed and has been removed from the cache entirely.
47///
48/// [`ReadyCache::evict`] can be used to remove a service from the cache (by key),
49/// though the service may not be dropped (if it is currently pending) until
50/// [`ReadyCache::poll_pending`] is invoked.
51///
52/// Note that the by-index accessors are provided to support use cases (like
53/// power-of-two-choices load balancing) where the caller does not care to keep
54/// track of each service's key. Instead, it needs only to access _some_ ready
55/// service. In such a case, it should be noted that calls to
56/// [`ReadyCache::poll_pending`] and [`ReadyCache::evict`] may perturb the order of
57/// the ready set, so any cached indexes should be discarded after such a call.
58pub struct ReadyCache<K, S, Req>
59where
60 K: Eq + Hash,
61{
62 /// A stream of services that are not yet ready.
63 pending: FuturesUnordered<Pending<K, S, Req>>,
64 /// An index of cancelation handles for pending streams.
65 pending_cancel_txs: IndexMap<K, CancelTx>,
66
67 /// Services that have previously become ready. Readiness can become stale,
68 /// so a given service should be polled immediately before use.
69 ///
70 /// The cancelation oneshot is preserved (though unused) while the service is
71 /// ready so that it need not be reallocated each time a request is
72 /// dispatched.
73 ready: IndexMap<K, (S, CancelPair)>,
74}
75
76// Safety: This is safe because we do not use `Pin::new_unchecked`.
77impl<S, K: Eq + Hash, Req> Unpin for ReadyCache<K, S, Req> {}
78
79#[derive(Debug)]
80struct Cancel {
81 waker: AtomicWaker,
82 canceled: AtomicBool,
83}
84
85#[derive(Debug)]
86struct CancelRx(Arc<Cancel>);
87
88#[derive(Debug)]
89struct CancelTx(Arc<Cancel>);
90
91type CancelPair = (CancelTx, CancelRx);
92
93#[derive(Debug)]
94enum PendingError<K, E> {
95 Canceled(K),
96 Inner(K, E),
97}
98
99pin_project_lite::pin_project! {
100 /// A [`Future`] that becomes satisfied when an `S`-typed service is ready.
101 ///
102 /// May fail due to cancelation, i.e. if the service is evicted from the balancer.
103 struct Pending<K, S, Req> {
104 key: Option<K>,
105 cancel: Option<CancelRx>,
106 ready: Option<S>,
107 _pd: std::marker::PhantomData<Req>,
108 }
109}
110
111// === ReadyCache ===
112
113impl<K, S, Req> Default for ReadyCache<K, S, Req>
114where
115 K: Eq + Hash,
116 S: Service<Req>,
117{
118 fn default() -> Self {
119 Self {
120 ready: IndexMap::default(),
121 pending: FuturesUnordered::new(),
122 pending_cancel_txs: IndexMap::default(),
123 }
124 }
125}
126
127impl<K, S, Req> fmt::Debug for ReadyCache<K, S, Req>
128where
129 K: fmt::Debug + Eq + Hash,
130 S: fmt::Debug,
131{
132 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
133 let Self {
134 pending,
135 pending_cancel_txs,
136 ready,
137 } = self;
138 f.debug_struct("ReadyCache")
139 .field("pending", pending)
140 .field("pending_cancel_txs", pending_cancel_txs)
141 .field("ready", ready)
142 .finish()
143 }
144}
145
146impl<K, S, Req> ReadyCache<K, S, Req>
147where
148 K: Eq + Hash,
149{
150 /// Returns the total number of services in the cache.
151 pub fn len(&self) -> usize {
152 self.ready_len() + self.pending_len()
153 }
154
155 /// Returns whether or not there are any services in the cache.
156 pub fn is_empty(&self) -> bool {
157 self.ready.is_empty() && self.pending.is_empty()
158 }
159
160 /// Returns the number of services in the ready set.
161 pub fn ready_len(&self) -> usize {
162 self.ready.len()
163 }
164
165 /// Returns the number of services in the unready set.
166 pub fn pending_len(&self) -> usize {
167 self.pending.len()
168 }
169
170 /// Returns true iff the given key is in the unready set.
171 pub fn pending_contains<Q: Hash + Equivalent<K>>(&self, key: &Q) -> bool {
172 self.pending_cancel_txs.contains_key(key)
173 }
174
175 /// Obtains a reference to a service in the ready set by key.
176 pub fn get_ready<Q: Hash + Equivalent<K>>(&self, key: &Q) -> Option<(usize, &K, &S)> {
177 self.ready.get_full(key).map(|(i, k, v)| (i, k, &v.0))
178 }
179
180 /// Obtains a mutable reference to a service in the ready set by key.
181 pub fn get_ready_mut<Q: Hash + Equivalent<K>>(
182 &mut self,
183 key: &Q,
184 ) -> Option<(usize, &K, &mut S)> {
185 self.ready
186 .get_full_mut(key)
187 .map(|(i, k, v)| (i, k, &mut v.0))
188 }
189
190 /// Obtains a reference to a service in the ready set by index.
191 pub fn get_ready_index(&self, idx: usize) -> Option<(&K, &S)> {
192 self.ready.get_index(idx).map(|(k, v)| (k, &v.0))
193 }
194
195 /// Obtains a mutable reference to a service in the ready set by index.
196 pub fn get_ready_index_mut(&mut self, idx: usize) -> Option<(&mut K, &mut S)> {
197 self.ready.get_index_mut(idx).map(|(k, v)| (k, &mut v.0))
198 }
199
200 /// Evicts an item from the cache.
201 ///
202 /// Returns true if a service was marked for eviction.
203 ///
204 /// Services are dropped from the ready set immediately. Services in the
205 /// pending set are marked for cancellation, but [`ReadyCache::poll_pending`]
206 /// must be called to cause the service to be dropped.
207 pub fn evict<Q: Hash + Equivalent<K>>(&mut self, key: &Q) -> bool {
208 let canceled = if let Some(c) = self.pending_cancel_txs.swap_remove(key) {
209 c.cancel();
210 true
211 } else {
212 false
213 };
214
215 self.ready
216 .swap_remove_full(key)
217 .map(|_| true)
218 .unwrap_or(canceled)
219 }
220}
221
222impl<K, S, Req> ReadyCache<K, S, Req>
223where
224 K: Clone + Eq + Hash,
225 S: Service<Req>,
226 <S as Service<Req>>::Error: Into<crate::BoxError>,
227 S::Error: Into<crate::BoxError>,
228{
229 /// Pushes a new service onto the pending set.
230 ///
231 /// The service will be promoted to the ready set as [`poll_pending`] is invoked.
232 ///
233 /// Note that this does **not** remove services from the ready set. Once the
234 /// old service is used, it will be dropped instead of being added back to
235 /// the pending set; OR, when the new service becomes ready, it will replace
236 /// the prior service in the ready set.
237 ///
238 /// [`poll_pending`]: crate::ready_cache::cache::ReadyCache::poll_pending
239 pub fn push(&mut self, key: K, svc: S) {
240 let cancel = cancelable();
241 self.push_pending(key, svc, cancel);
242 }
243
244 fn push_pending(&mut self, key: K, svc: S, (cancel_tx, cancel_rx): CancelPair) {
245 if let Some(c) = self.pending_cancel_txs.insert(key.clone(), cancel_tx) {
246 // If there is already a service for this key, cancel it.
247 c.cancel();
248 }
249 self.pending.push(Pending {
250 key: Some(key),
251 cancel: Some(cancel_rx),
252 ready: Some(svc),
253 _pd: std::marker::PhantomData,
254 });
255 }
256
257 /// Polls services pending readiness, adding ready services to the ready set.
258 ///
259 /// Returns [`Poll::Ready`] when there are no remaining unready services.
260 /// [`poll_pending`] should be called again after [`push`] or
261 /// [`call_ready_index`] are invoked.
262 ///
263 /// Failures indicate that an individual pending service failed to become
264 /// ready (and has been removed from the cache). In such a case,
265 /// [`poll_pending`] should typically be called again to continue driving
266 /// pending services to readiness.
267 ///
268 /// [`poll_pending`]: crate::ready_cache::cache::ReadyCache::poll_pending
269 /// [`push`]: crate::ready_cache::cache::ReadyCache::push
270 /// [`call_ready_index`]: crate::ready_cache::cache::ReadyCache::call_ready_index
271 pub fn poll_pending(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), error::Failed<K>>> {
272 loop {
273 match Pin::new(&mut self.pending).poll_next(cx) {
274 Poll::Pending => return Poll::Pending,
275 Poll::Ready(None) => return Poll::Ready(Ok(())),
276 Poll::Ready(Some(Ok((key, svc, cancel_rx)))) => {
277 trace!("endpoint ready");
278 let cancel_tx = self.pending_cancel_txs.swap_remove(&key);
279 if let Some(cancel_tx) = cancel_tx {
280 // Keep track of the cancelation so that it need not be
281 // recreated after the service is used.
282 self.ready.insert(key, (svc, (cancel_tx, cancel_rx)));
283 } else {
284 assert!(
285 cancel_tx.is_some(),
286 "services that become ready must have a pending cancelation"
287 );
288 }
289 }
290 Poll::Ready(Some(Err(PendingError::Canceled(_)))) => {
291 debug!("endpoint canceled");
292 // The cancellation for this service was removed in order to
293 // cause this cancellation.
294 }
295 Poll::Ready(Some(Err(PendingError::Inner(key, e)))) => {
296 let cancel_tx = self.pending_cancel_txs.swap_remove(&key);
297 assert!(
298 cancel_tx.is_some(),
299 "services that return an error must have a pending cancelation"
300 );
301 return Err(error::Failed(key, e.into())).into();
302 }
303 }
304 }
305 }
306
307 /// Checks whether the referenced endpoint is ready.
308 ///
309 /// Returns true if the endpoint is ready and false if it is not. An error is
310 /// returned if the endpoint fails.
311 pub fn check_ready<Q: Hash + Equivalent<K>>(
312 &mut self,
313 cx: &mut Context<'_>,
314 key: &Q,
315 ) -> Result<bool, error::Failed<K>> {
316 match self.ready.get_full_mut(key) {
317 Some((index, _, _)) => self.check_ready_index(cx, index),
318 None => Ok(false),
319 }
320 }
321
322 /// Checks whether the referenced endpoint is ready.
323 ///
324 /// If the service is no longer ready, it is moved back into the pending set
325 /// and `false` is returned.
326 ///
327 /// If the service errors, it is removed and dropped and the error is returned.
328 pub fn check_ready_index(
329 &mut self,
330 cx: &mut Context<'_>,
331 index: usize,
332 ) -> Result<bool, error::Failed<K>> {
333 let svc = match self.ready.get_index_mut(index) {
334 None => return Ok(false),
335 Some((_, (svc, _))) => svc,
336 };
337 match svc.poll_ready(cx) {
338 Poll::Ready(Ok(())) => Ok(true),
339 Poll::Pending => {
340 // became unready; so move it back there.
341 let (key, (svc, cancel)) = self
342 .ready
343 .swap_remove_index(index)
344 .expect("invalid ready index");
345
346 // If a new version of this service has been added to the
347 // unready set, don't overwrite it.
348 if !self.pending_contains(&key) {
349 self.push_pending(key, svc, cancel);
350 }
351
352 Ok(false)
353 }
354 Poll::Ready(Err(e)) => {
355 // failed, so drop it.
356 let (key, _) = self
357 .ready
358 .swap_remove_index(index)
359 .expect("invalid ready index");
360 Err(error::Failed(key, e.into()))
361 }
362 }
363 }
364
365 /// Calls a ready service by key.
366 ///
367 /// # Panics
368 ///
369 /// If the specified key does not exist in the ready
370 pub fn call_ready<Q: Hash + Equivalent<K>>(&mut self, key: &Q, req: Req) -> S::Future {
371 let (index, _, _) = self
372 .ready
373 .get_full_mut(key)
374 .expect("check_ready was not called");
375 self.call_ready_index(index, req)
376 }
377
378 /// Calls a ready service by index.
379 ///
380 /// # Panics
381 ///
382 /// If the specified index is out of range.
383 pub fn call_ready_index(&mut self, index: usize, req: Req) -> S::Future {
384 let (key, (mut svc, cancel)) = self
385 .ready
386 .swap_remove_index(index)
387 .expect("check_ready_index was not called");
388
389 let fut = svc.call(req);
390
391 // If a new version of this service has been added to the
392 // unready set, don't overwrite it.
393 if !self.pending_contains(&key) {
394 self.push_pending(key, svc, cancel);
395 }
396
397 fut
398 }
399}
400
401// === impl Cancel ===
402
403/// Creates a cancelation sender and receiver.
404///
405/// A `tokio::sync::oneshot` is NOT used, as a `Receiver` is not guaranteed to
406/// observe results as soon as a `Sender` fires. Using an `AtomicBool` allows
407/// the state to be observed as soon as the cancelation is triggered.
408fn cancelable() -> CancelPair {
409 let cx = Arc::new(Cancel {
410 waker: AtomicWaker::new(),
411 canceled: AtomicBool::new(false),
412 });
413 (CancelTx(cx.clone()), CancelRx(cx))
414}
415
416impl CancelTx {
417 fn cancel(self) {
418 self.0.canceled.store(true, Ordering::SeqCst);
419 self.0.waker.wake();
420 }
421}
422
423// === Pending ===
424
425impl<K, S, Req> Future for Pending<K, S, Req>
426where
427 S: Service<Req>,
428{
429 type Output = Result<(K, S, CancelRx), PendingError<K, S::Error>>;
430
431 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
432 let this = self.project();
433 // Before checking whether the service is ready, check to see whether
434 // readiness has been canceled.
435 let CancelRx(cancel) = this.cancel.as_mut().expect("polled after complete");
436 if cancel.canceled.load(Ordering::SeqCst) {
437 let key = this.key.take().expect("polled after complete");
438 return Err(PendingError::Canceled(key)).into();
439 }
440
441 match this
442 .ready
443 .as_mut()
444 .expect("polled after ready")
445 .poll_ready(cx)
446 {
447 Poll::Pending => {
448 // Before returning Pending, register interest in cancelation so
449 // that this future is polled again if the state changes.
450 let CancelRx(cancel) = this.cancel.as_mut().expect("polled after complete");
451 cancel.waker.register(cx.waker());
452 // Because both the cancel receiver and cancel sender are held
453 // by the `ReadyCache` (i.e., on a single task), then it must
454 // not be possible for the cancelation state to change while
455 // polling a `Pending` service.
456 assert!(
457 !cancel.canceled.load(Ordering::SeqCst),
458 "cancelation cannot be notified while polling a pending service"
459 );
460 Poll::Pending
461 }
462 Poll::Ready(Ok(())) => {
463 let key = this.key.take().expect("polled after complete");
464 let cancel = this.cancel.take().expect("polled after complete");
465 Ok((key, this.ready.take().expect("polled after ready"), cancel)).into()
466 }
467 Poll::Ready(Err(e)) => {
468 let key = this.key.take().expect("polled after compete");
469 Err(PendingError::Inner(key, e)).into()
470 }
471 }
472 }
473}
474
475impl<K, S, Req> fmt::Debug for Pending<K, S, Req>
476where
477 K: fmt::Debug,
478 S: fmt::Debug,
479{
480 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
481 let Self {
482 key,
483 cancel,
484 ready,
485 _pd,
486 } = self;
487 f.debug_struct("Pending")
488 .field("key", key)
489 .field("cancel", cancel)
490 .field("ready", ready)
491 .finish()
492 }
493}