moka/
future.rs

1//! Provides a thread-safe, concurrent asynchronous (futures aware) cache
2//! implementation.
3//!
4//! To use this module, enable a crate feature called "future".
5
6use crossbeam_channel::Sender;
7use futures_util::future::{BoxFuture, Shared};
8use std::{future::Future, hash::Hash, sync::Arc};
9
10use crate::common::{concurrent::WriteOp, time::Instant};
11
12mod base_cache;
13mod builder;
14mod cache;
15mod entry_selector;
16mod housekeeper;
17mod invalidator;
18mod key_lock;
19mod notifier;
20mod value_initializer;
21
22pub use {
23    builder::CacheBuilder,
24    cache::Cache,
25    entry_selector::{OwnedKeyEntrySelector, RefKeyEntrySelector},
26};
27
28/// The type of the unique ID to identify a predicate used by
29/// [`Cache::invalidate_entries_if`][invalidate-if] method.
30///
31/// A `PredicateId` is a `String` of UUID (version 4).
32///
33/// [invalidate-if]: ./struct.Cache.html#method.invalidate_entries_if
34pub type PredicateId = String;
35
36pub(crate) type PredicateIdStr<'a> = &'a str;
37
38// Empty struct to be used in `InitResult::InitErr` to represent the Option None.
39pub(crate) struct OptionallyNone;
40
41// Empty struct to be used in `InitResult::InitErr` to represent the Compute None.
42pub(crate) struct ComputeNone;
43
44impl<T: ?Sized> FutureExt for T where T: Future {}
45
46pub trait FutureExt: Future {
47    fn boxed<'a, T>(self) -> BoxFuture<'a, T>
48    where
49        Self: Future<Output = T> + Sized + Send + 'a,
50    {
51        Box::pin(self)
52    }
53}
54
55/// Iterator visiting all key-value pairs in a cache in arbitrary order.
56///
57/// Call [`Cache::iter`](./struct.Cache.html#method.iter) method to obtain an `Iter`.
58pub struct Iter<'i, K, V>(crate::sync_base::iter::Iter<'i, K, V>);
59
60impl<'i, K, V> Iter<'i, K, V> {
61    pub(crate) fn new(inner: crate::sync_base::iter::Iter<'i, K, V>) -> Self {
62        Self(inner)
63    }
64}
65
66impl<K, V> Iterator for Iter<'_, K, V>
67where
68    K: Eq + Hash + Send + Sync + 'static,
69    V: Clone + Send + Sync + 'static,
70{
71    type Item = (Arc<K>, V);
72
73    fn next(&mut self) -> Option<Self::Item> {
74        self.0.next()
75    }
76}
77
78/// Operation that has been interrupted (stopped polling) by async cancellation.
79pub(crate) enum InterruptedOp<K, V> {
80    CallEvictionListener {
81        ts: Instant,
82        // 'static means that the future can capture only owned value and/or static
83        // references. No non-static references are allowed.
84        future: Shared<BoxFuture<'static, ()>>,
85        op: WriteOp<K, V>,
86    },
87    SendWriteOp {
88        ts: Instant,
89        op: WriteOp<K, V>,
90    },
91}
92
93/// Drop guard for an async task being performed. If this guard is dropped while it
94/// is still having the shared `future` or the write `op`, it will convert them to an
95/// `InterruptedOp` and send it to the interrupted operations channel. Later, the
96/// interrupted op will be retried by `retry_interrupted_ops` method of
97/// `BaseCache`.
98struct CancelGuard<'a, K, V> {
99    interrupted_op_ch: &'a Sender<InterruptedOp<K, V>>,
100    ts: Instant,
101    future: Option<Shared<BoxFuture<'static, ()>>>,
102    op: Option<WriteOp<K, V>>,
103}
104
105impl<'a, K, V> CancelGuard<'a, K, V> {
106    fn new(interrupted_op_ch: &'a Sender<InterruptedOp<K, V>>, ts: Instant) -> Self {
107        Self {
108            interrupted_op_ch,
109            ts,
110            future: None,
111            op: None,
112        }
113    }
114
115    fn set_future_and_op(&mut self, future: Shared<BoxFuture<'static, ()>>, op: WriteOp<K, V>) {
116        self.future = Some(future);
117        self.op = Some(op);
118    }
119
120    fn set_op(&mut self, op: WriteOp<K, V>) {
121        self.op = Some(op);
122    }
123
124    fn unset_future(&mut self) {
125        self.future = None;
126    }
127
128    fn clear(&mut self) {
129        self.future = None;
130        self.op = None;
131    }
132}
133
134impl<K, V> Drop for CancelGuard<'_, K, V> {
135    fn drop(&mut self) {
136        let interrupted_op = match (self.future.take(), self.op.take()) {
137            (Some(future), Some(op)) => InterruptedOp::CallEvictionListener {
138                ts: self.ts,
139                future,
140                op,
141            },
142            (None, Some(op)) => InterruptedOp::SendWriteOp { ts: self.ts, op },
143            _ => return,
144        };
145
146        self.interrupted_op_ch
147            .send(interrupted_op)
148            .expect("Failed to send a pending op");
149    }
150}