moka/future/cache.rs
1use super::{
2 base_cache::BaseCache,
3 value_initializer::{InitResult, ValueInitializer},
4 CacheBuilder, CancelGuard, Iter, OwnedKeyEntrySelector, PredicateId, RefKeyEntrySelector,
5 WriteOp,
6};
7use crate::{
8 common::{concurrent::Weigher, time::Clock, HousekeeperConfig},
9 notification::AsyncEvictionListener,
10 ops::compute::{self, CompResult},
11 policy::{EvictionPolicy, ExpirationPolicy},
12 Entry, Policy, PredicateError,
13};
14
15#[cfg(feature = "unstable-debug-counters")]
16use crate::common::concurrent::debug_counters::CacheDebugStats;
17
18use std::{
19 borrow::Borrow,
20 collections::hash_map::RandomState,
21 fmt,
22 future::Future,
23 hash::{BuildHasher, Hash},
24 pin::Pin,
25 sync::Arc,
26};
27
28#[cfg(test)]
29use std::sync::atomic::{AtomicBool, Ordering};
30
31/// A thread-safe, futures-aware concurrent in-memory cache.
32///
33/// `Cache` supports full concurrency of retrievals and a high expected concurrency
34/// for updates. It utilizes a lock-free concurrent hash table as the central
35/// key-value storage. It performs a best-effort bounding of the map using an entry
36/// replacement algorithm to determine which entries to evict when the capacity is
37/// exceeded.
38///
39/// To use this cache, enable a crate feature called "future".
40///
41/// # Table of Contents
42///
43/// - [Example: `insert`, `get` and `invalidate`](#example-insert-get-and-invalidate)
44/// - [Avoiding to clone the value at `get`](#avoiding-to-clone-the-value-at-get)
45/// - [Sharing a cache across asynchronous tasks](#sharing-a-cache-across-asynchronous-tasks)
46/// - [No lock is needed](#no-lock-is-needed)
47/// - [Hashing Algorithm](#hashing-algorithm)
48/// - [Example: Size-based Eviction](#example-size-based-eviction)
49/// - [Example: Time-based Expirations](#example-time-based-expirations)
50/// - [Cache-level TTL and TTI policies](#cache-level-ttl-and-tti-policies)
51/// - [Per-entry expiration policy](#per-entry-expiration-policy)
52/// - [Example: Eviction Listener](#example-eviction-listener)
53/// - [You should avoid eviction listener to panic](#you-should-avoid-eviction-listener-to-panic)
54///
55/// # Example: `insert`, `get` and `invalidate`
56///
57/// Cache entries are manually added using [`insert`](#method.insert) of
58/// [`get_with`](#method.get_with) method, and are stored in the cache until either
59/// evicted or manually invalidated:
60///
61/// Here's an example of reading and updating a cache by using multiple asynchronous
62/// tasks with [Tokio][tokio-crate] runtime:
63///
64/// [tokio-crate]: https://crates.io/crates/tokio
65///
66///```rust
67/// // Cargo.toml
68/// //
69/// // [dependencies]
70/// // moka = { version = "0.12", features = ["future"] }
71/// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
72/// // futures-util = "0.3"
73///
74/// use moka::future::Cache;
75///
76/// #[tokio::main]
77/// async fn main() {
78/// const NUM_TASKS: usize = 16;
79/// const NUM_KEYS_PER_TASK: usize = 64;
80///
81/// fn value(n: usize) -> String {
82/// format!("value {n}")
83/// }
84///
85/// // Create a cache that can store up to 10,000 entries.
86/// let cache = Cache::new(10_000);
87///
88/// // Spawn async tasks and write to and read from the cache.
89/// let tasks: Vec<_> = (0..NUM_TASKS)
90/// .map(|i| {
91/// // To share the same cache across the async tasks, clone it.
92/// // This is a cheap operation.
93/// let my_cache = cache.clone();
94/// let start = i * NUM_KEYS_PER_TASK;
95/// let end = (i + 1) * NUM_KEYS_PER_TASK;
96///
97/// tokio::spawn(async move {
98/// // Insert 64 entries. (NUM_KEYS_PER_TASK = 64)
99/// for key in start..end {
100/// my_cache.insert(key, value(key)).await;
101/// // get() returns Option<String>, a clone of the stored value.
102/// assert_eq!(my_cache.get(&key).await, Some(value(key)));
103/// }
104///
105/// // Invalidate every 4 element of the inserted entries.
106/// for key in (start..end).step_by(4) {
107/// my_cache.invalidate(&key).await;
108/// }
109/// })
110/// })
111/// .collect();
112///
113/// // Wait for all tasks to complete.
114/// futures_util::future::join_all(tasks).await;
115///
116/// // Verify the result.
117/// for key in 0..(NUM_TASKS * NUM_KEYS_PER_TASK) {
118/// if key % 4 == 0 {
119/// assert_eq!(cache.get(&key).await, None);
120/// } else {
121/// assert_eq!(cache.get(&key).await, Some(value(key)));
122/// }
123/// }
124/// }
125/// ```
126///
127/// If you want to atomically initialize and insert a value when the key is not
128/// present, you might want to check other insertion methods
129/// [`get_with`](#method.get_with) and [`try_get_with`](#method.try_get_with).
130///
131/// # Avoiding to clone the value at `get`
132///
133/// The return type of `get` method is `Option<V>` instead of `Option<&V>`. Every
134/// time `get` is called for an existing key, it creates a clone of the stored value
135/// `V` and returns it. This is because the `Cache` allows concurrent updates from
136/// threads so a value stored in the cache can be dropped or replaced at any time by
137/// any other thread. `get` cannot return a reference `&V` as it is impossible to
138/// guarantee the value outlives the reference.
139///
140/// If you want to store values that will be expensive to clone, wrap them by
141/// `std::sync::Arc` before storing in a cache. [`Arc`][rustdoc-std-arc] is a
142/// thread-safe reference-counted pointer and its `clone()` method is cheap.
143///
144/// [rustdoc-std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html
145///
146/// # Sharing a cache across asynchronous tasks
147///
148/// To share a cache across async tasks (or OS threads), do one of the followings:
149///
150/// - Create a clone of the cache by calling its `clone` method and pass it to other
151/// task.
152/// - If you are using a web application framework such as Actix Web or Axum, you can
153/// store a cache in Actix Web's [`web::Data`][actix-web-data] or Axum's
154/// [shared state][axum-state-extractor], and access it from each request handler.
155/// - Wrap the cache by a `sync::OnceCell` or `sync::Lazy` from
156/// [once_cell][once-cell-crate] create, and set it to a `static` variable.
157///
158/// Cloning is a cheap operation for `Cache` as it only creates thread-safe
159/// reference-counted pointers to the internal data structures.
160///
161/// [once-cell-crate]: https://crates.io/crates/once_cell
162/// [actix-web-data]: https://docs.rs/actix-web/4.3.1/actix_web/web/struct.Data.html
163/// [axum-state-extractor]: https://docs.rs/axum/latest/axum/#sharing-state-with-handlers
164///
165/// ## No lock is needed
166///
167/// Don't wrap a `Cache` by a lock such as `Mutex` or `RwLock`. All methods provided
168/// by the `Cache` are considered thread-safe, and can be safely called by multiple
169/// async tasks at the same time. No lock is needed.
170///
171/// [once-cell-crate]: https://crates.io/crates/once_cell
172///
173/// # Hashing Algorithm
174///
175/// By default, `Cache` uses a hashing algorithm selected to provide resistance
176/// against HashDoS attacks. It will be the same one used by
177/// `std::collections::HashMap`, which is currently SipHash 1-3.
178///
179/// While SipHash's performance is very competitive for medium sized keys, other
180/// hashing algorithms will outperform it for small keys such as integers as well as
181/// large keys such as long strings. However those algorithms will typically not
182/// protect against attacks such as HashDoS.
183///
184/// The hashing algorithm can be replaced on a per-`Cache` basis using the
185/// [`build_with_hasher`][build-with-hasher-method] method of the `CacheBuilder`.
186/// Many alternative algorithms are available on crates.io, such as the
187/// [AHash][ahash-crate] crate.
188///
189/// [build-with-hasher-method]: ./struct.CacheBuilder.html#method.build_with_hasher
190/// [ahash-crate]: https://crates.io/crates/ahash
191///
192/// # Example: Size-based Eviction
193///
194/// ```rust
195/// // Cargo.toml
196/// //
197/// // [dependencies]
198/// // moka = { version = "0.12", features = ["future"] }
199/// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
200/// // futures-util = "0.3"
201///
202/// use moka::future::Cache;
203///
204/// #[tokio::main]
205/// async fn main() {
206/// // Evict based on the number of entries in the cache.
207/// let cache = Cache::builder()
208/// // Up to 10,000 entries.
209/// .max_capacity(10_000)
210/// // Create the cache.
211/// .build();
212/// cache.insert(1, "one".to_string()).await;
213///
214/// // Evict based on the byte length of strings in the cache.
215/// let cache = Cache::builder()
216/// // A weigher closure takes &K and &V and returns a u32
217/// // representing the relative size of the entry.
218/// .weigher(|_key, value: &String| -> u32 {
219/// value.len().try_into().unwrap_or(u32::MAX)
220/// })
221/// // This cache will hold up to 32MiB of values.
222/// .max_capacity(32 * 1024 * 1024)
223/// .build();
224/// cache.insert(2, "two".to_string()).await;
225/// }
226/// ```
227///
228/// If your cache should not grow beyond a certain size, use the `max_capacity`
229/// method of the [`CacheBuilder`][builder-struct] to set the upper bound. The cache
230/// will try to evict entries that have not been used recently or very often.
231///
232/// At the cache creation time, a weigher closure can be set by the `weigher` method
233/// of the `CacheBuilder`. A weigher closure takes `&K` and `&V` as the arguments and
234/// returns a `u32` representing the relative size of the entry:
235///
236/// - If the `weigher` is _not_ set, the cache will treat each entry has the same
237/// size of `1`. This means the cache will be bounded by the number of entries.
238/// - If the `weigher` is set, the cache will call the weigher to calculate the
239/// weighted size (relative size) on an entry. This means the cache will be bounded
240/// by the total weighted size of entries.
241///
242/// Note that weighted sizes are not used when making eviction selections.
243///
244/// [builder-struct]: ./struct.CacheBuilder.html
245///
246/// # Example: Time-based Expirations
247///
248/// ## Cache-level TTL and TTI policies
249///
250/// `Cache` supports the following cache-level expiration policies:
251///
252/// - **Time to live (TTL)**: A cached entry will be expired after the specified
253/// duration past from `insert`.
254/// - **Time to idle (TTI)**: A cached entry will be expired after the specified
255/// duration past from `get` or `insert`.
256///
257/// They are a cache-level expiration policies; all entries in the cache will have
258/// the same TTL and/or TTI durations. If you want to set different expiration
259/// durations for different entries, see the next section.
260///
261/// ```rust
262/// // Cargo.toml
263/// //
264/// // [dependencies]
265/// // moka = { version = "0.12", features = ["future"] }
266/// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
267/// // futures-util = "0.3"
268///
269/// use moka::future::Cache;
270/// use std::time::Duration;
271///
272/// #[tokio::main]
273/// async fn main() {
274/// let cache = Cache::builder()
275/// // Time to live (TTL): 30 minutes
276/// .time_to_live(Duration::from_secs(30 * 60))
277/// // Time to idle (TTI): 5 minutes
278/// .time_to_idle(Duration::from_secs( 5 * 60))
279/// // Create the cache.
280/// .build();
281///
282/// // This entry will expire after 5 minutes (TTI) if there is no get().
283/// cache.insert(0, "zero").await;
284///
285/// // This get() will extend the entry life for another 5 minutes.
286/// cache.get(&0);
287///
288/// // Even though we keep calling get(), the entry will expire
289/// // after 30 minutes (TTL) from the insert().
290/// }
291/// ```
292///
293/// ## Per-entry expiration policy
294///
295/// `Cache` supports per-entry expiration policy through the `Expiry` trait.
296///
297/// `Expiry` trait provides three callback methods:
298/// [`expire_after_create`][exp-create], [`expire_after_read`][exp-read] and
299/// [`expire_after_update`][exp-update]. When a cache entry is inserted, read or
300/// updated, one of these methods is called. These methods return an
301/// `Option<Duration>`, which is used as the expiration duration of the entry.
302///
303/// `Expiry` trait provides the default implementations of these methods, so you will
304/// implement only the methods you want to customize.
305///
306/// [exp-create]: ../trait.Expiry.html#method.expire_after_create
307/// [exp-read]: ../trait.Expiry.html#method.expire_after_read
308/// [exp-update]: ../trait.Expiry.html#method.expire_after_update
309///
310/// ```rust
311/// // Cargo.toml
312/// //
313/// // [dependencies]
314/// // moka = { version = "0.12", features = ["future"] }
315/// // tokio = { version = "1", features = ["rt-multi-thread", "macros", "time" ] }
316///
317/// use moka::{future::Cache, Expiry};
318/// use std::time::{Duration, Instant};
319///
320/// // In this example, we will create a `future::Cache` with `u32` as the key, and
321/// // `(Expiration, String)` as the value. `Expiration` is an enum to represent the
322/// // expiration of the value, and `String` is the application data of the value.
323///
324/// /// An enum to represent the expiration of a value.
325/// #[derive(Clone, Copy, Debug, Eq, PartialEq)]
326/// pub enum Expiration {
327/// /// The value never expires.
328/// Never,
329/// /// The value expires after a short time. (5 seconds in this example)
330/// AfterShortTime,
331/// /// The value expires after a long time. (15 seconds in this example)
332/// AfterLongTime,
333/// }
334///
335/// impl Expiration {
336/// /// Returns the duration of this expiration.
337/// pub fn as_duration(&self) -> Option<Duration> {
338/// match self {
339/// Expiration::Never => None,
340/// Expiration::AfterShortTime => Some(Duration::from_secs(5)),
341/// Expiration::AfterLongTime => Some(Duration::from_secs(15)),
342/// }
343/// }
344/// }
345///
346/// /// An expiry that implements `moka::Expiry` trait. `Expiry` trait provides the
347/// /// default implementations of three callback methods `expire_after_create`,
348/// /// `expire_after_read`, and `expire_after_update`.
349/// ///
350/// /// In this example, we only override the `expire_after_create` method.
351/// pub struct MyExpiry;
352///
353/// impl Expiry<u32, (Expiration, String)> for MyExpiry {
354/// /// Returns the duration of the expiration of the value that was just
355/// /// created.
356/// fn expire_after_create(
357/// &self,
358/// _key: &u32,
359/// value: &(Expiration, String),
360/// _current_time: Instant,
361/// ) -> Option<Duration> {
362/// let duration = value.0.as_duration();
363/// println!("MyExpiry: expire_after_create called with key {_key} and value {value:?}. Returning {duration:?}.");
364/// duration
365/// }
366/// }
367///
368/// #[tokio::main]
369/// async fn main() {
370/// // Create a `Cache<u32, (Expiration, String)>` with an expiry `MyExpiry` and
371/// // eviction listener.
372/// let expiry = MyExpiry;
373///
374/// let eviction_listener = |key, _value, cause| {
375/// println!("Evicted key {key}. Cause: {cause:?}");
376/// };
377///
378/// let cache = Cache::builder()
379/// .max_capacity(100)
380/// .expire_after(expiry)
381/// .eviction_listener(eviction_listener)
382/// .build();
383///
384/// // Insert some entries into the cache with different expirations.
385/// cache
386/// .get_with(0, async { (Expiration::AfterShortTime, "a".to_string()) })
387/// .await;
388///
389/// cache
390/// .get_with(1, async { (Expiration::AfterLongTime, "b".to_string()) })
391/// .await;
392///
393/// cache
394/// .get_with(2, async { (Expiration::Never, "c".to_string()) })
395/// .await;
396///
397/// // Verify that all the inserted entries exist.
398/// assert!(cache.contains_key(&0));
399/// assert!(cache.contains_key(&1));
400/// assert!(cache.contains_key(&2));
401///
402/// // Sleep for 6 seconds. Key 0 should expire.
403/// println!("\nSleeping for 6 seconds...\n");
404/// tokio::time::sleep(Duration::from_secs(6)).await;
405/// cache.run_pending_tasks().await;
406/// println!("Entry count: {}", cache.entry_count());
407///
408/// // Verify that key 0 has been evicted.
409/// assert!(!cache.contains_key(&0));
410/// assert!(cache.contains_key(&1));
411/// assert!(cache.contains_key(&2));
412///
413/// // Sleep for 10 more seconds. Key 1 should expire.
414/// println!("\nSleeping for 10 seconds...\n");
415/// tokio::time::sleep(Duration::from_secs(10)).await;
416/// cache.run_pending_tasks().await;
417/// println!("Entry count: {}", cache.entry_count());
418///
419/// // Verify that key 1 has been evicted.
420/// assert!(!cache.contains_key(&1));
421/// assert!(cache.contains_key(&2));
422///
423/// // Manually invalidate key 2.
424/// cache.invalidate(&2).await;
425/// assert!(!cache.contains_key(&2));
426///
427/// println!("\nSleeping for a second...\n");
428/// tokio::time::sleep(Duration::from_secs(1)).await;
429/// cache.run_pending_tasks().await;
430/// println!("Entry count: {}", cache.entry_count());
431///
432/// println!("\nDone!");
433/// }
434/// ```
435///
436/// # Example: Eviction Listener
437///
438/// A `Cache` can be configured with an eviction listener, a closure that is called
439/// every time there is a cache eviction. The listener takes three parameters: the
440/// key and value of the evicted entry, and the
441/// [`RemovalCause`](../notification/enum.RemovalCause.html) to indicate why the
442/// entry was evicted.
443///
444/// An eviction listener can be used to keep other data structures in sync with the
445/// cache, for example.
446///
447/// The following example demonstrates how to use an eviction listener with
448/// time-to-live expiration to manage the lifecycle of temporary files on a
449/// filesystem. The cache stores the paths of the files, and when one of them has
450/// expired, the eviction listener will be called with the path, so it can remove the
451/// file from the filesystem.
452///
453/// ```rust
454/// // Cargo.toml
455/// //
456/// // [dependencies]
457/// // anyhow = "1.0"
458/// // uuid = { version = "1.1", features = ["v4"] }
459/// // tokio = { version = "1.18", features = ["fs", "macros", "rt-multi-thread", "sync", "time"] }
460///
461/// use moka::{future::Cache, notification::ListenerFuture};
462/// // FutureExt trait provides the boxed method.
463/// use moka::future::FutureExt;
464///
465/// use anyhow::{anyhow, Context};
466/// use std::{
467/// io,
468/// path::{Path, PathBuf},
469/// sync::Arc,
470/// time::Duration,
471/// };
472/// use tokio::{fs, sync::RwLock};
473/// use uuid::Uuid;
474///
475/// /// The DataFileManager writes, reads and removes data files.
476/// struct DataFileManager {
477/// base_dir: PathBuf,
478/// file_count: usize,
479/// }
480///
481/// impl DataFileManager {
482/// fn new(base_dir: PathBuf) -> Self {
483/// Self {
484/// base_dir,
485/// file_count: 0,
486/// }
487/// }
488///
489/// async fn write_data_file(
490/// &mut self,
491/// key: impl AsRef<str>,
492/// contents: String
493/// ) -> io::Result<PathBuf> {
494/// // Use the key as a part of the filename.
495/// let mut path = self.base_dir.to_path_buf();
496/// path.push(key.as_ref());
497///
498/// assert!(!path.exists(), "Path already exists: {path:?}");
499///
500/// // create the file at the path and write the contents to the file.
501/// fs::write(&path, contents).await?;
502/// self.file_count += 1;
503/// println!("Created a data file at {path:?} (file count: {})", self.file_count);
504/// Ok(path)
505/// }
506///
507/// async fn read_data_file(&self, path: impl AsRef<Path>) -> io::Result<String> {
508/// // Reads the contents of the file at the path, and return the contents.
509/// fs::read_to_string(path).await
510/// }
511///
512/// async fn remove_data_file(&mut self, path: impl AsRef<Path>) -> io::Result<()> {
513/// // Remove the file at the path.
514/// fs::remove_file(path.as_ref()).await?;
515/// self.file_count -= 1;
516/// println!(
517/// "Removed a data file at {:?} (file count: {})",
518/// path.as_ref(),
519/// self.file_count
520/// );
521///
522/// Ok(())
523/// }
524/// }
525///
526/// #[tokio::main]
527/// async fn main() -> anyhow::Result<()> {
528/// // Create an instance of the DataFileManager and wrap it with
529/// // Arc<RwLock<_>> so it can be shared across threads.
530/// let mut base_dir = std::env::temp_dir();
531/// base_dir.push(Uuid::new_v4().as_hyphenated().to_string());
532/// println!("base_dir: {base_dir:?}");
533/// std::fs::create_dir(&base_dir)?;
534///
535/// let file_mgr = DataFileManager::new(base_dir);
536/// let file_mgr = Arc::new(RwLock::new(file_mgr));
537///
538/// let file_mgr1 = Arc::clone(&file_mgr);
539/// let rt = tokio::runtime::Handle::current();
540///
541/// // Create an eviction listener closure.
542/// let eviction_listener = move |k, v: PathBuf, cause| -> ListenerFuture {
543/// println!("\n== An entry has been evicted. k: {k:?}, v: {v:?}, cause: {cause:?}");
544/// let file_mgr2 = Arc::clone(&file_mgr1);
545///
546/// // Create a Future that removes the data file at the path `v`.
547/// async move {
548/// // Acquire the write lock of the DataFileManager.
549/// let mut mgr = file_mgr2.write().await;
550/// // Remove the data file. We must handle error cases here to
551/// // prevent the listener from panicking.
552/// if let Err(_e) = mgr.remove_data_file(v.as_path()).await {
553/// eprintln!("Failed to remove a data file at {v:?}");
554/// }
555/// }
556/// // Convert the regular Future into ListenerFuture. This method is
557/// // provided by moka::future::FutureExt trait.
558/// .boxed()
559/// };
560///
561/// // Create the cache. Set time to live for two seconds and set the
562/// // eviction listener.
563/// let cache = Cache::builder()
564/// .max_capacity(100)
565/// .time_to_live(Duration::from_secs(2))
566/// .async_eviction_listener(eviction_listener)
567/// .build();
568///
569/// // Insert an entry to the cache.
570/// // This will create and write a data file for the key "user1", store the
571/// // path of the file to the cache, and return it.
572/// println!("== try_get_with()");
573/// let key = "user1";
574/// let path = cache
575/// .try_get_with(key, async {
576/// let mut mgr = file_mgr.write().await;
577/// let path = mgr
578/// .write_data_file(key, "user data".into())
579/// .await
580/// .with_context(|| format!("Failed to create a data file"))?;
581/// Ok(path) as anyhow::Result<_>
582/// })
583/// .await
584/// .map_err(|e| anyhow!("{e}"))?;
585///
586/// // Read the data file at the path and print the contents.
587/// println!("\n== read_data_file()");
588/// {
589/// let mgr = file_mgr.read().await;
590/// let contents = mgr
591/// .read_data_file(path.as_path())
592/// .await
593/// .with_context(|| format!("Failed to read data from {path:?}"))?;
594/// println!("contents: {contents}");
595/// }
596///
597/// // Sleep for five seconds. While sleeping, the cache entry for key "user1"
598/// // will be expired and evicted, so the eviction listener will be called to
599/// // remove the file.
600/// tokio::time::sleep(Duration::from_secs(5)).await;
601///
602/// cache.run_pending_tasks();
603///
604/// Ok(())
605/// }
606/// ```
607///
608/// ## You should avoid eviction listener to panic
609///
610/// It is very important to make an eviction listener closure not to panic.
611/// Otherwise, the cache will stop calling the listener after a panic. This is an
612/// intended behavior because the cache cannot know whether it is memory safe or not
613/// to call the panicked listener again.
614///
615/// When a listener panics, the cache will swallow the panic and disable the
616/// listener. If you want to know when a listener panics and the reason of the panic,
617/// you can enable an optional `logging` feature of Moka and check error-level logs.
618///
619/// To enable the `logging`, do the followings:
620///
621/// 1. In `Cargo.toml`, add the crate feature `logging` for `moka`.
622/// 2. Set the logging level for `moka` to `error` or any lower levels (`warn`,
623/// `info`, ...):
624/// - If you are using the `env_logger` crate, you can achieve this by setting
625/// `RUST_LOG` environment variable to `moka=error`.
626/// 3. If you have more than one caches, you may want to set a distinct name for each
627/// cache by using cache builder's [`name`][builder-name-method] method. The name
628/// will appear in the log.
629///
630/// [builder-name-method]: ./struct.CacheBuilder.html#method.name
631///
632pub struct Cache<K, V, S = RandomState> {
633 pub(crate) base: BaseCache<K, V, S>,
634 value_initializer: Arc<ValueInitializer<K, V, S>>,
635
636 #[cfg(test)]
637 schedule_write_op_should_block: AtomicBool,
638}
639
640// TODO: https://github.com/moka-rs/moka/issues/54
641#[allow(clippy::non_send_fields_in_send_ty)]
642unsafe impl<K, V, S> Send for Cache<K, V, S>
643where
644 K: Send + Sync,
645 V: Send + Sync,
646 S: Send,
647{
648}
649
650unsafe impl<K, V, S> Sync for Cache<K, V, S>
651where
652 K: Send + Sync,
653 V: Send + Sync,
654 S: Sync,
655{
656}
657
658// NOTE: We cannot do `#[derive(Clone)]` because it will add `Clone` bound to `K`.
659impl<K, V, S> Clone for Cache<K, V, S> {
660 /// Makes a clone of this shared cache.
661 ///
662 /// This operation is cheap as it only creates thread-safe reference counted
663 /// pointers to the shared internal data structures.
664 fn clone(&self) -> Self {
665 Self {
666 base: self.base.clone(),
667 value_initializer: Arc::clone(&self.value_initializer),
668
669 #[cfg(test)]
670 schedule_write_op_should_block: AtomicBool::new(
671 self.schedule_write_op_should_block.load(Ordering::Acquire),
672 ),
673 }
674 }
675}
676
677impl<K, V, S> fmt::Debug for Cache<K, V, S>
678where
679 K: fmt::Debug + Eq + Hash + Send + Sync + 'static,
680 V: fmt::Debug + Clone + Send + Sync + 'static,
681 // TODO: Remove these bounds from S.
682 S: BuildHasher + Clone + Send + Sync + 'static,
683{
684 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
685 let mut d_map = f.debug_map();
686
687 for (k, v) in self {
688 d_map.entry(&k, &v);
689 }
690
691 d_map.finish()
692 }
693}
694
695impl<K, V, S> Cache<K, V, S> {
696 /// Returns cache’s name.
697 pub fn name(&self) -> Option<&str> {
698 self.base.name()
699 }
700
701 /// Returns a read-only cache policy of this cache.
702 ///
703 /// At this time, cache policy cannot be modified after cache creation.
704 /// A future version may support to modify it.
705 pub fn policy(&self) -> Policy {
706 self.base.policy()
707 }
708
709 /// Returns an approximate number of entries in this cache.
710 ///
711 /// The value returned is _an estimate_; the actual count may differ if there are
712 /// concurrent insertions or removals, or if some entries are pending removal due
713 /// to expiration. This inaccuracy can be mitigated by calling
714 /// `run_pending_tasks` first.
715 ///
716 /// # Example
717 ///
718 /// ```rust
719 /// // Cargo.toml
720 /// //
721 /// // [dependencies]
722 /// // moka = { version = "0.12", features = ["future"] }
723 /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
724 /// use moka::future::Cache;
725 ///
726 /// #[tokio::main]
727 /// async fn main() {
728 /// let cache = Cache::new(10);
729 /// cache.insert('n', "Netherland Dwarf").await;
730 /// cache.insert('l', "Lop Eared").await;
731 /// cache.insert('d', "Dutch").await;
732 ///
733 /// // Ensure an entry exists.
734 /// assert!(cache.contains_key(&'n'));
735 ///
736 /// // However, followings may print stale number zeros instead of threes.
737 /// println!("{}", cache.entry_count()); // -> 0
738 /// println!("{}", cache.weighted_size()); // -> 0
739 ///
740 /// // To mitigate the inaccuracy, call `run_pending_tasks` to run pending
741 /// // internal tasks.
742 /// cache.run_pending_tasks().await;
743 ///
744 /// // Followings will print the actual numbers.
745 /// println!("{}", cache.entry_count()); // -> 3
746 /// println!("{}", cache.weighted_size()); // -> 3
747 /// }
748 /// ```
749 ///
750 pub fn entry_count(&self) -> u64 {
751 self.base.entry_count()
752 }
753
754 /// Returns an approximate total weighted size of entries in this cache.
755 ///
756 /// The value returned is _an estimate_; the actual size may differ if there are
757 /// concurrent insertions or removals, or if some entries are pending removal due
758 /// to expiration. This inaccuracy can be mitigated by calling
759 /// `run_pending_tasks` first. See [`entry_count`](#method.entry_count) for a
760 /// sample code.
761 pub fn weighted_size(&self) -> u64 {
762 self.base.weighted_size()
763 }
764
765 #[cfg(feature = "unstable-debug-counters")]
766 #[cfg_attr(docsrs, doc(cfg(feature = "unstable-debug-counters")))]
767 pub async fn debug_stats(&self) -> CacheDebugStats {
768 self.base.debug_stats().await
769 }
770}
771
772impl<K, V> Cache<K, V, RandomState>
773where
774 K: Hash + Eq + Send + Sync + 'static,
775 V: Clone + Send + Sync + 'static,
776{
777 /// Constructs a new `Cache<K, V>` that will store up to the `max_capacity`.
778 ///
779 /// To adjust various configuration knobs such as `initial_capacity` or
780 /// `time_to_live`, use the [`CacheBuilder`][builder-struct].
781 ///
782 /// [builder-struct]: ./struct.CacheBuilder.html
783 pub fn new(max_capacity: u64) -> Self {
784 let build_hasher = RandomState::default();
785 Self::with_everything(
786 None,
787 Some(max_capacity),
788 None,
789 build_hasher,
790 None,
791 EvictionPolicy::default(),
792 None,
793 ExpirationPolicy::default(),
794 HousekeeperConfig::default(),
795 false,
796 Clock::default(),
797 )
798 }
799
800 /// Returns a [`CacheBuilder`][builder-struct], which can builds a `Cache` with
801 /// various configuration knobs.
802 ///
803 /// [builder-struct]: ./struct.CacheBuilder.html
804 pub fn builder() -> CacheBuilder<K, V, Cache<K, V, RandomState>> {
805 CacheBuilder::default()
806 }
807}
808
809impl<K, V, S> Cache<K, V, S>
810where
811 K: Hash + Eq + Send + Sync + 'static,
812 V: Clone + Send + Sync + 'static,
813 S: BuildHasher + Clone + Send + Sync + 'static,
814{
815 // https://rust-lang.github.io/rust-clippy/master/index.html#too_many_arguments
816 #[allow(clippy::too_many_arguments)]
817 pub(crate) fn with_everything(
818 name: Option<String>,
819 max_capacity: Option<u64>,
820 initial_capacity: Option<usize>,
821 build_hasher: S,
822 weigher: Option<Weigher<K, V>>,
823 eviction_policy: EvictionPolicy,
824 eviction_listener: Option<AsyncEvictionListener<K, V>>,
825 expiration_policy: ExpirationPolicy<K, V>,
826 housekeeper_config: HousekeeperConfig,
827 invalidator_enabled: bool,
828 clock: Clock,
829 ) -> Self {
830 Self {
831 base: BaseCache::new(
832 name,
833 max_capacity,
834 initial_capacity,
835 build_hasher.clone(),
836 weigher,
837 eviction_policy,
838 eviction_listener,
839 expiration_policy,
840 housekeeper_config,
841 invalidator_enabled,
842 clock,
843 ),
844 value_initializer: Arc::new(ValueInitializer::with_hasher(build_hasher)),
845
846 #[cfg(test)]
847 schedule_write_op_should_block: Default::default(), // false
848 }
849 }
850
851 /// Returns `true` if the cache contains a value for the key.
852 ///
853 /// Unlike the `get` method, this method is not considered a cache read operation,
854 /// so it does not update the historic popularity estimator or reset the idle
855 /// timer for the key.
856 ///
857 /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
858 /// on the borrowed form _must_ match those for the key type.
859 pub fn contains_key<Q>(&self, key: &Q) -> bool
860 where
861 K: Borrow<Q>,
862 Q: Hash + Eq + ?Sized,
863 {
864 self.base.contains_key_with_hash(key, self.base.hash(key))
865 }
866
867 /// Returns a _clone_ of the value corresponding to the key.
868 ///
869 /// If you want to store values that will be expensive to clone, wrap them by
870 /// `std::sync::Arc` before storing in a cache. [`Arc`][rustdoc-std-arc] is a
871 /// thread-safe reference-counted pointer and its `clone()` method is cheap.
872 ///
873 /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
874 /// on the borrowed form _must_ match those for the key type.
875 ///
876 /// [rustdoc-std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html
877 pub async fn get<Q>(&self, key: &Q) -> Option<V>
878 where
879 K: Borrow<Q>,
880 Q: Hash + Eq + ?Sized,
881 {
882 let ignore_if = None as Option<&mut fn(&V) -> bool>;
883
884 self.base
885 .get_with_hash(key, self.base.hash(key), ignore_if, false, true)
886 .await
887 .map(Entry::into_value)
888 }
889
890 /// Takes a key `K` and returns an [`OwnedKeyEntrySelector`] that can be used to
891 /// select or insert an entry.
892 ///
893 /// [`OwnedKeyEntrySelector`]: ./struct.OwnedKeyEntrySelector.html
894 ///
895 /// # Example
896 ///
897 /// ```rust
898 /// // Cargo.toml
899 /// //
900 /// // [dependencies]
901 /// // moka = { version = "0.12", features = ["future"] }
902 /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
903 ///
904 /// use moka::future::Cache;
905 ///
906 /// #[tokio::main]
907 /// async fn main() {
908 /// let cache: Cache<String, u32> = Cache::new(100);
909 /// let key = "key1".to_string();
910 ///
911 /// let entry = cache.entry(key.clone()).or_insert(3).await;
912 /// assert!(entry.is_fresh());
913 /// assert_eq!(entry.key(), &key);
914 /// assert_eq!(entry.into_value(), 3);
915 ///
916 /// let entry = cache.entry(key).or_insert(6).await;
917 /// // Not fresh because the value was already in the cache.
918 /// assert!(!entry.is_fresh());
919 /// assert_eq!(entry.into_value(), 3);
920 /// }
921 /// ```
922 pub fn entry(&self, key: K) -> OwnedKeyEntrySelector<'_, K, V, S>
923 where
924 K: Hash + Eq,
925 {
926 let hash = self.base.hash(&key);
927 OwnedKeyEntrySelector::new(key, hash, self)
928 }
929
930 /// Takes a reference `&Q` of a key and returns an [`RefKeyEntrySelector`] that
931 /// can be used to select or insert an entry.
932 ///
933 /// [`RefKeyEntrySelector`]: ./struct.RefKeyEntrySelector.html
934 ///
935 /// # Example
936 ///
937 /// ```rust
938 /// // Cargo.toml
939 /// //
940 /// // [dependencies]
941 /// // moka = { version = "0.12", features = ["future"] }
942 /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
943 ///
944 /// use moka::future::Cache;
945 ///
946 /// #[tokio::main]
947 /// async fn main() {
948 /// let cache: Cache<String, u32> = Cache::new(100);
949 /// let key = "key1".to_string();
950 ///
951 /// let entry = cache.entry_by_ref(&key).or_insert(3).await;
952 /// assert!(entry.is_fresh());
953 /// assert_eq!(entry.key(), &key);
954 /// assert_eq!(entry.into_value(), 3);
955 ///
956 /// let entry = cache.entry_by_ref(&key).or_insert(6).await;
957 /// // Not fresh because the value was already in the cache.
958 /// assert!(!entry.is_fresh());
959 /// assert_eq!(entry.into_value(), 3);
960 /// }
961 /// ```
962 pub fn entry_by_ref<'a, Q>(&'a self, key: &'a Q) -> RefKeyEntrySelector<'a, K, Q, V, S>
963 where
964 K: Borrow<Q>,
965 Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
966 {
967 let hash = self.base.hash(key);
968 RefKeyEntrySelector::new(key, hash, self)
969 }
970
971 /// Returns a _clone_ of the value corresponding to the key. If the value does
972 /// not exist, resolve the `init` future and inserts the output.
973 ///
974 /// # Concurrent calls on the same key
975 ///
976 /// This method guarantees that concurrent calls on the same not-existing key are
977 /// coalesced into one evaluation of the `init` future. Only one of the calls
978 /// evaluates its future, and other calls wait for that future to resolve.
979 ///
980 /// The following code snippet demonstrates this behavior:
981 ///
982 /// ```rust
983 /// // Cargo.toml
984 /// //
985 /// // [dependencies]
986 /// // moka = { version = "0.12", features = ["future"] }
987 /// // futures-util = "0.3"
988 /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
989 /// use moka::future::Cache;
990 /// use std::sync::Arc;
991 ///
992 /// #[tokio::main]
993 /// async fn main() {
994 /// const TEN_MIB: usize = 10 * 1024 * 1024; // 10MiB
995 /// let cache = Cache::new(100);
996 ///
997 /// // Spawn four async tasks.
998 /// let tasks: Vec<_> = (0..4_u8)
999 /// .map(|task_id| {
1000 /// let my_cache = cache.clone();
1001 /// tokio::spawn(async move {
1002 /// println!("Task {task_id} started.");
1003 ///
1004 /// // Insert and get the value for key1. Although all four async
1005 /// // tasks will call `get_with` at the same time, the `init`
1006 /// // async block must be resolved only once.
1007 /// let value = my_cache
1008 /// .get_with("key1", async move {
1009 /// println!("Task {task_id} inserting a value.");
1010 /// Arc::new(vec![0u8; TEN_MIB])
1011 /// })
1012 /// .await;
1013 ///
1014 /// // Ensure the value exists now.
1015 /// assert_eq!(value.len(), TEN_MIB);
1016 /// assert!(my_cache.get(&"key1").await.is_some());
1017 ///
1018 /// println!("Task {task_id} got the value. (len: {})", value.len());
1019 /// })
1020 /// })
1021 /// .collect();
1022 ///
1023 /// // Run all tasks concurrently and wait for them to complete.
1024 /// futures_util::future::join_all(tasks).await;
1025 /// }
1026 /// ```
1027 ///
1028 /// **A Sample Result**
1029 ///
1030 /// - The `init` future (async black) was resolved exactly once by task 3.
1031 /// - Other tasks were blocked until task 3 inserted the value.
1032 ///
1033 /// ```console
1034 /// Task 0 started.
1035 /// Task 3 started.
1036 /// Task 1 started.
1037 /// Task 2 started.
1038 /// Task 3 inserting a value.
1039 /// Task 3 got the value. (len: 10485760)
1040 /// Task 0 got the value. (len: 10485760)
1041 /// Task 1 got the value. (len: 10485760)
1042 /// Task 2 got the value. (len: 10485760)
1043 /// ```
1044 ///
1045 /// # Panics
1046 ///
1047 /// This method panics when the `init` future has panicked. When it happens, only
1048 /// the caller whose `init` future panicked will get the panic (e.g. only task 3
1049 /// in the above sample). If there are other calls in progress (e.g. task 0, 1
1050 /// and 2 above), this method will restart and resolve one of the remaining
1051 /// `init` futures.
1052 ///
1053 pub async fn get_with(&self, key: K, init: impl Future<Output = V>) -> V {
1054 futures_util::pin_mut!(init);
1055 let hash = self.base.hash(&key);
1056 let key = Arc::new(key);
1057 let replace_if = None as Option<fn(&V) -> bool>;
1058 self.get_or_insert_with_hash_and_fun(key, hash, init, replace_if, false)
1059 .await
1060 .into_value()
1061 }
1062
1063 /// Similar to [`get_with`](#method.get_with), but instead of passing an owned
1064 /// key, you can pass a reference to the key. If the key does not exist in the
1065 /// cache, the key will be cloned to create new entry in the cache.
1066 pub async fn get_with_by_ref<Q>(&self, key: &Q, init: impl Future<Output = V>) -> V
1067 where
1068 K: Borrow<Q>,
1069 Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
1070 {
1071 futures_util::pin_mut!(init);
1072 let hash = self.base.hash(key);
1073 let replace_if = None as Option<fn(&V) -> bool>;
1074 self.get_or_insert_with_hash_by_ref_and_fun(key, hash, init, replace_if, false)
1075 .await
1076 .into_value()
1077 }
1078
1079 /// TODO: Remove this in v0.13.0.
1080 /// Deprecated, replaced with
1081 /// [`entry()::or_insert_with_if()`](./struct.OwnedKeyEntrySelector.html#method.or_insert_with_if)
1082 #[deprecated(since = "0.10.0", note = "Replaced with `entry().or_insert_with_if()`")]
1083 pub async fn get_with_if(
1084 &self,
1085 key: K,
1086 init: impl Future<Output = V>,
1087 replace_if: impl FnMut(&V) -> bool + Send,
1088 ) -> V {
1089 futures_util::pin_mut!(init);
1090 let hash = self.base.hash(&key);
1091 let key = Arc::new(key);
1092 self.get_or_insert_with_hash_and_fun(key, hash, init, Some(replace_if), false)
1093 .await
1094 .into_value()
1095 }
1096
1097 /// Returns a _clone_ of the value corresponding to the key. If the value does
1098 /// not exist, resolves the `init` future, and inserts the value if `Some(value)`
1099 /// was returned. If `None` was returned from the future, this method does not
1100 /// insert a value and returns `None`.
1101 ///
1102 /// # Concurrent calls on the same key
1103 ///
1104 /// This method guarantees that concurrent calls on the same not-existing key are
1105 /// coalesced into one evaluation of the `init` future. Only one of the calls
1106 /// evaluates its future, and other calls wait for that future to resolve.
1107 ///
1108 /// The following code snippet demonstrates this behavior:
1109 ///
1110 /// ```rust
1111 /// // Cargo.toml
1112 /// //
1113 /// // [dependencies]
1114 /// // moka = { version = "0.12", features = ["future"] }
1115 /// // futures-util = "0.3"
1116 /// // reqwest = "0.11"
1117 /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
1118 /// use moka::future::Cache;
1119 ///
1120 /// // This async function tries to get HTML from the given URI.
1121 /// async fn get_html(task_id: u8, uri: &str) -> Option<String> {
1122 /// println!("get_html() called by task {task_id}.");
1123 /// reqwest::get(uri).await.ok()?.text().await.ok()
1124 /// }
1125 ///
1126 /// #[tokio::main]
1127 /// async fn main() {
1128 /// let cache = Cache::new(100);
1129 ///
1130 /// // Spawn four async tasks.
1131 /// let tasks: Vec<_> = (0..4_u8)
1132 /// .map(|task_id| {
1133 /// let my_cache = cache.clone();
1134 /// tokio::spawn(async move {
1135 /// println!("Task {task_id} started.");
1136 ///
1137 /// // Try to insert and get the value for key1. Although
1138 /// // all four async tasks will call `try_get_with`
1139 /// // at the same time, get_html() must be called only once.
1140 /// let value = my_cache
1141 /// .optionally_get_with(
1142 /// "key1",
1143 /// get_html(task_id, "https://www.rust-lang.org"),
1144 /// ).await;
1145 ///
1146 /// // Ensure the value exists now.
1147 /// assert!(value.is_some());
1148 /// assert!(my_cache.get(&"key1").await.is_some());
1149 ///
1150 /// println!(
1151 /// "Task {task_id} got the value. (len: {})",
1152 /// value.unwrap().len()
1153 /// );
1154 /// })
1155 /// })
1156 /// .collect();
1157 ///
1158 /// // Run all tasks concurrently and wait for them to complete.
1159 /// futures_util::future::join_all(tasks).await;
1160 /// }
1161 /// ```
1162 ///
1163 /// **A Sample Result**
1164 ///
1165 /// - `get_html()` was called exactly once by task 2.
1166 /// - Other tasks were blocked until task 2 inserted the value.
1167 ///
1168 /// ```console
1169 /// Task 1 started.
1170 /// Task 0 started.
1171 /// Task 2 started.
1172 /// Task 3 started.
1173 /// get_html() called by task 2.
1174 /// Task 2 got the value. (len: 19419)
1175 /// Task 1 got the value. (len: 19419)
1176 /// Task 0 got the value. (len: 19419)
1177 /// Task 3 got the value. (len: 19419)
1178 /// ```
1179 ///
1180 /// # Panics
1181 ///
1182 /// This method panics when the `init` future has panicked. When it happens, only
1183 /// the caller whose `init` future panicked will get the panic (e.g. only task 2
1184 /// in the above sample). If there are other calls in progress (e.g. task 0, 1
1185 /// and 3 above), this method will restart and resolve one of the remaining
1186 /// `init` futures.
1187 ///
1188 pub async fn optionally_get_with<F>(&self, key: K, init: F) -> Option<V>
1189 where
1190 F: Future<Output = Option<V>>,
1191 {
1192 futures_util::pin_mut!(init);
1193 let hash = self.base.hash(&key);
1194 let key = Arc::new(key);
1195 self.get_or_optionally_insert_with_hash_and_fun(key, hash, init, false)
1196 .await
1197 .map(Entry::into_value)
1198 }
1199
1200 /// Similar to [`optionally_get_with`](#method.optionally_get_with), but instead
1201 /// of passing an owned key, you can pass a reference to the key. If the key does
1202 /// not exist in the cache, the key will be cloned to create new entry in the
1203 /// cache.
1204 pub async fn optionally_get_with_by_ref<F, Q>(&self, key: &Q, init: F) -> Option<V>
1205 where
1206 F: Future<Output = Option<V>>,
1207 K: Borrow<Q>,
1208 Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
1209 {
1210 futures_util::pin_mut!(init);
1211 let hash = self.base.hash(key);
1212 self.get_or_optionally_insert_with_hash_by_ref_and_fun(key, hash, init, false)
1213 .await
1214 .map(Entry::into_value)
1215 }
1216
1217 /// Returns a _clone_ of the value corresponding to the key. If the value does
1218 /// not exist, resolves the `init` future, and inserts the value if `Ok(value)`
1219 /// was returned. If `Err(_)` was returned from the future, this method does not
1220 /// insert a value and returns the `Err` wrapped by [`std::sync::Arc`][std-arc].
1221 ///
1222 /// [std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html
1223 ///
1224 /// # Concurrent calls on the same key
1225 ///
1226 /// This method guarantees that concurrent calls on the same not-existing key are
1227 /// coalesced into one evaluation of the `init` future (as long as these
1228 /// futures return the same error type). Only one of the calls evaluates its
1229 /// future, and other calls wait for that future to resolve.
1230 ///
1231 /// The following code snippet demonstrates this behavior:
1232 ///
1233 /// ```rust
1234 /// // Cargo.toml
1235 /// //
1236 /// // [dependencies]
1237 /// // moka = { version = "0.12", features = ["future"] }
1238 /// // futures-util = "0.3"
1239 /// // reqwest = "0.11"
1240 /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
1241 /// use moka::future::Cache;
1242 ///
1243 /// // This async function tries to get HTML from the given URI.
1244 /// async fn get_html(task_id: u8, uri: &str) -> Result<String, reqwest::Error> {
1245 /// println!("get_html() called by task {task_id}.");
1246 /// reqwest::get(uri).await?.text().await
1247 /// }
1248 ///
1249 /// #[tokio::main]
1250 /// async fn main() {
1251 /// let cache = Cache::new(100);
1252 ///
1253 /// // Spawn four async tasks.
1254 /// let tasks: Vec<_> = (0..4_u8)
1255 /// .map(|task_id| {
1256 /// let my_cache = cache.clone();
1257 /// tokio::spawn(async move {
1258 /// println!("Task {task_id} started.");
1259 ///
1260 /// // Try to insert and get the value for key1. Although
1261 /// // all four async tasks will call `try_get_with`
1262 /// // at the same time, get_html() must be called only once.
1263 /// let value = my_cache
1264 /// .try_get_with(
1265 /// "key1",
1266 /// get_html(task_id, "https://www.rust-lang.org"),
1267 /// ).await;
1268 ///
1269 /// // Ensure the value exists now.
1270 /// assert!(value.is_ok());
1271 /// assert!(my_cache.get(&"key1").await.is_some());
1272 ///
1273 /// println!(
1274 /// "Task {task_id} got the value. (len: {})",
1275 /// value.unwrap().len()
1276 /// );
1277 /// })
1278 /// })
1279 /// .collect();
1280 ///
1281 /// // Run all tasks concurrently and wait for them to complete.
1282 /// futures_util::future::join_all(tasks).await;
1283 /// }
1284 /// ```
1285 ///
1286 /// **A Sample Result**
1287 ///
1288 /// - `get_html()` was called exactly once by task 2.
1289 /// - Other tasks were blocked until task 2 inserted the value.
1290 ///
1291 /// ```console
1292 /// Task 1 started.
1293 /// Task 0 started.
1294 /// Task 2 started.
1295 /// Task 3 started.
1296 /// get_html() called by task 2.
1297 /// Task 2 got the value. (len: 19419)
1298 /// Task 1 got the value. (len: 19419)
1299 /// Task 0 got the value. (len: 19419)
1300 /// Task 3 got the value. (len: 19419)
1301 /// ```
1302 ///
1303 /// # Panics
1304 ///
1305 /// This method panics when the `init` future has panicked. When it happens, only
1306 /// the caller whose `init` future panicked will get the panic (e.g. only task 2
1307 /// in the above sample). If there are other calls in progress (e.g. task 0, 1
1308 /// and 3 above), this method will restart and resolve one of the remaining
1309 /// `init` futures.
1310 ///
1311 pub async fn try_get_with<F, E>(&self, key: K, init: F) -> Result<V, Arc<E>>
1312 where
1313 F: Future<Output = Result<V, E>>,
1314 E: Send + Sync + 'static,
1315 {
1316 futures_util::pin_mut!(init);
1317 let hash = self.base.hash(&key);
1318 let key = Arc::new(key);
1319 self.get_or_try_insert_with_hash_and_fun(key, hash, init, false)
1320 .await
1321 .map(Entry::into_value)
1322 }
1323
1324 /// Similar to [`try_get_with`](#method.try_get_with), but instead of passing an
1325 /// owned key, you can pass a reference to the key. If the key does not exist in
1326 /// the cache, the key will be cloned to create new entry in the cache.
1327 pub async fn try_get_with_by_ref<F, E, Q>(&self, key: &Q, init: F) -> Result<V, Arc<E>>
1328 where
1329 F: Future<Output = Result<V, E>>,
1330 E: Send + Sync + 'static,
1331 K: Borrow<Q>,
1332 Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
1333 {
1334 futures_util::pin_mut!(init);
1335 let hash = self.base.hash(key);
1336 self.get_or_try_insert_with_hash_by_ref_and_fun(key, hash, init, false)
1337 .await
1338 .map(Entry::into_value)
1339 }
1340
1341 /// Inserts a key-value pair into the cache.
1342 ///
1343 /// If the cache has this key present, the value is updated.
1344 pub async fn insert(&self, key: K, value: V) {
1345 let hash = self.base.hash(&key);
1346 let key = Arc::new(key);
1347 self.insert_with_hash(key, hash, value).await;
1348 }
1349
1350 /// Discards any cached value for the key.
1351 ///
1352 /// If you need to get the value that has been discarded, use the
1353 /// [`remove`](#method.remove) method instead.
1354 ///
1355 /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
1356 /// on the borrowed form _must_ match those for the key type.
1357 pub async fn invalidate<Q>(&self, key: &Q)
1358 where
1359 K: Borrow<Q>,
1360 Q: Hash + Eq + ?Sized,
1361 {
1362 let hash = self.base.hash(key);
1363 self.invalidate_with_hash(key, hash, false).await;
1364 }
1365
1366 /// Discards any cached value for the key and returns a _clone_ of the value.
1367 ///
1368 /// If you do not need to get the value that has been discarded, use the
1369 /// [`invalidate`](#method.invalidate) method instead.
1370 ///
1371 /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
1372 /// on the borrowed form _must_ match those for the key type.
1373 pub async fn remove<Q>(&self, key: &Q) -> Option<V>
1374 where
1375 K: Borrow<Q>,
1376 Q: Hash + Eq + ?Sized,
1377 {
1378 let hash = self.base.hash(key);
1379 self.invalidate_with_hash(key, hash, true).await
1380 }
1381
1382 /// Discards all cached values.
1383 ///
1384 /// This method returns immediately by just setting the current time as the
1385 /// invalidation time. `get` and other retrieval methods are guaranteed not to
1386 /// return the entries inserted before or at the invalidation time.
1387 ///
1388 /// The actual removal of the invalidated entries is done as a maintenance task
1389 /// driven by a user thread. For more details, see
1390 /// [the Maintenance Tasks section](../index.html#maintenance-tasks) in the crate
1391 /// level documentation.
1392 ///
1393 /// Like the `invalidate` method, this method does not clear the historic
1394 /// popularity estimator of keys so that it retains the client activities of
1395 /// trying to retrieve an item.
1396 pub fn invalidate_all(&self) {
1397 self.base.invalidate_all();
1398 }
1399
1400 /// Discards cached values that satisfy a predicate.
1401 ///
1402 /// `invalidate_entries_if` takes a closure that returns `true` or `false`. The
1403 /// closure is called against each cached entry inserted before or at the time
1404 /// when this method was called. If the closure returns `true` that entry will be
1405 /// evicted from the cache.
1406 ///
1407 /// This method returns immediately by not actually removing the invalidated
1408 /// entries. Instead, it just sets the predicate to the cache with the time when
1409 /// this method was called. The actual removal of the invalidated entries is done
1410 /// as a maintenance task driven by a user thread. For more details, see
1411 /// [the Maintenance Tasks section](../index.html#maintenance-tasks) in the crate
1412 /// level documentation.
1413 ///
1414 /// Also the `get` and other retrieval methods will apply the closure to a cached
1415 /// entry to determine if it should have been invalidated. Therefore, it is
1416 /// guaranteed that these methods must not return invalidated values.
1417 ///
1418 /// Note that you must call
1419 /// [`CacheBuilder::support_invalidation_closures`][support-invalidation-closures]
1420 /// at the cache creation time as the cache needs to maintain additional internal
1421 /// data structures to support this method. Otherwise, calling this method will
1422 /// fail with a
1423 /// [`PredicateError::InvalidationClosuresDisabled`][invalidation-disabled-error].
1424 ///
1425 /// Like the `invalidate` method, this method does not clear the historic
1426 /// popularity estimator of keys so that it retains the client activities of
1427 /// trying to retrieve an item.
1428 ///
1429 /// [support-invalidation-closures]:
1430 /// ./struct.CacheBuilder.html#method.support_invalidation_closures
1431 /// [invalidation-disabled-error]:
1432 /// ../enum.PredicateError.html#variant.InvalidationClosuresDisabled
1433 pub fn invalidate_entries_if<F>(&self, predicate: F) -> Result<PredicateId, PredicateError>
1434 where
1435 F: Fn(&K, &V) -> bool + Send + Sync + 'static,
1436 {
1437 self.base.invalidate_entries_if(Arc::new(predicate))
1438 }
1439
1440 /// Creates an iterator visiting all key-value pairs in arbitrary order. The
1441 /// iterator element type is `(Arc<K>, V)`, where `V` is a clone of a stored
1442 /// value.
1443 ///
1444 /// Iterators do not block concurrent reads and writes on the cache. An entry can
1445 /// be inserted to, invalidated or evicted from a cache while iterators are alive
1446 /// on the same cache.
1447 ///
1448 /// Unlike the `get` method, visiting entries via an iterator do not update the
1449 /// historic popularity estimator or reset idle timers for keys.
1450 ///
1451 /// # Guarantees
1452 ///
1453 /// In order to allow concurrent access to the cache, iterator's `next` method
1454 /// does _not_ guarantee the following:
1455 ///
1456 /// - It does not guarantee to return a key-value pair (an entry) if its key has
1457 /// been inserted to the cache _after_ the iterator was created.
1458 /// - Such an entry may or may not be returned depending on key's hash and
1459 /// timing.
1460 ///
1461 /// and the `next` method guarantees the followings:
1462 ///
1463 /// - It guarantees not to return the same entry more than once.
1464 /// - It guarantees not to return an entry if it has been removed from the cache
1465 /// after the iterator was created.
1466 /// - Note: An entry can be removed by following reasons:
1467 /// - Manually invalidated.
1468 /// - Expired (e.g. time-to-live).
1469 /// - Evicted as the cache capacity exceeded.
1470 ///
1471 /// # Examples
1472 ///
1473 /// ```rust
1474 /// // Cargo.toml
1475 /// //
1476 /// // [dependencies]
1477 /// // moka = { version = "0.12", features = ["future"] }
1478 /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
1479 /// use moka::future::Cache;
1480 ///
1481 /// #[tokio::main]
1482 /// async fn main() {
1483 /// let cache = Cache::new(100);
1484 /// cache.insert("Julia", 14).await;
1485 ///
1486 /// let mut iter = cache.iter();
1487 /// let (k, v) = iter.next().unwrap(); // (Arc<K>, V)
1488 /// assert_eq!(*k, "Julia");
1489 /// assert_eq!(v, 14);
1490 ///
1491 /// assert!(iter.next().is_none());
1492 /// }
1493 /// ```
1494 ///
1495 pub fn iter(&self) -> Iter<'_, K, V> {
1496 use crate::sync_base::iter::{Iter as InnerIter, ScanningGet};
1497
1498 let inner = InnerIter::with_single_cache_segment(&self.base, self.base.num_cht_segments());
1499 Iter::new(inner)
1500 }
1501
1502 /// Performs any pending maintenance operations needed by the cache.
1503 pub async fn run_pending_tasks(&self) {
1504 if let Some(hk) = &self.base.housekeeper {
1505 self.base.retry_interrupted_ops().await;
1506 hk.run_pending_tasks(Arc::clone(&self.base.inner)).await;
1507 }
1508 }
1509}
1510
1511impl<'a, K, V, S> IntoIterator for &'a Cache<K, V, S>
1512where
1513 K: Hash + Eq + Send + Sync + 'static,
1514 V: Clone + Send + Sync + 'static,
1515 S: BuildHasher + Clone + Send + Sync + 'static,
1516{
1517 type Item = (Arc<K>, V);
1518
1519 type IntoIter = Iter<'a, K, V>;
1520
1521 fn into_iter(self) -> Self::IntoIter {
1522 self.iter()
1523 }
1524}
1525
1526//
1527// private methods
1528//
1529impl<K, V, S> Cache<K, V, S>
1530where
1531 K: Hash + Eq + Send + Sync + 'static,
1532 V: Clone + Send + Sync + 'static,
1533 S: BuildHasher + Clone + Send + Sync + 'static,
1534{
1535 pub(crate) async fn get_or_insert_with_hash_and_fun(
1536 &self,
1537 key: Arc<K>,
1538 hash: u64,
1539 init: Pin<&mut impl Future<Output = V>>,
1540 mut replace_if: Option<impl FnMut(&V) -> bool + Send>,
1541 need_key: bool,
1542 ) -> Entry<K, V> {
1543 let maybe_entry = self
1544 .base
1545 .get_with_hash(&key, hash, replace_if.as_mut(), need_key, true)
1546 .await;
1547 if let Some(entry) = maybe_entry {
1548 entry
1549 } else {
1550 self.insert_with_hash_and_fun(key, hash, init, replace_if, need_key)
1551 .await
1552 }
1553 }
1554
1555 pub(crate) async fn get_or_insert_with_hash_by_ref_and_fun<Q>(
1556 &self,
1557 key: &Q,
1558 hash: u64,
1559 init: Pin<&mut impl Future<Output = V>>,
1560 mut replace_if: Option<impl FnMut(&V) -> bool + Send>,
1561 need_key: bool,
1562 ) -> Entry<K, V>
1563 where
1564 K: Borrow<Q>,
1565 Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
1566 {
1567 let maybe_entry = self
1568 .base
1569 .get_with_hash(key, hash, replace_if.as_mut(), need_key, true)
1570 .await;
1571 if let Some(entry) = maybe_entry {
1572 entry
1573 } else {
1574 let key = Arc::new(key.to_owned());
1575 self.insert_with_hash_and_fun(key, hash, init, replace_if, need_key)
1576 .await
1577 }
1578 }
1579
1580 async fn insert_with_hash_and_fun(
1581 &self,
1582 key: Arc<K>,
1583 hash: u64,
1584 init: Pin<&mut impl Future<Output = V>>,
1585 replace_if: Option<impl FnMut(&V) -> bool + Send>,
1586 need_key: bool,
1587 ) -> Entry<K, V> {
1588 let k = if need_key {
1589 Some(Arc::clone(&key))
1590 } else {
1591 None
1592 };
1593
1594 let type_id = ValueInitializer::<K, V, S>::type_id_for_get_with();
1595 let post_init = ValueInitializer::<K, V, S>::post_init_for_get_with;
1596
1597 match self
1598 .value_initializer
1599 .try_init_or_read(&key, hash, type_id, self, replace_if, init, post_init)
1600 .await
1601 {
1602 InitResult::Initialized(v) => {
1603 crossbeam_epoch::pin().flush();
1604 Entry::new(k, v, true, false)
1605 }
1606 InitResult::ReadExisting(v) => Entry::new(k, v, false, false),
1607 InitResult::InitErr(_) => unreachable!(),
1608 }
1609 }
1610
1611 pub(crate) async fn get_or_insert_with_hash(
1612 &self,
1613 key: Arc<K>,
1614 hash: u64,
1615 init: impl FnOnce() -> V,
1616 ) -> Entry<K, V> {
1617 match self
1618 .base
1619 .get_with_hash(&key, hash, never_ignore(), true, true)
1620 .await
1621 {
1622 Some(entry) => entry,
1623 None => {
1624 let value = init();
1625 self.insert_with_hash(Arc::clone(&key), hash, value.clone())
1626 .await;
1627 Entry::new(Some(key), value, true, false)
1628 }
1629 }
1630 }
1631
1632 pub(crate) async fn get_or_insert_with_hash_by_ref<Q>(
1633 &self,
1634 key: &Q,
1635 hash: u64,
1636 init: impl FnOnce() -> V,
1637 ) -> Entry<K, V>
1638 where
1639 K: Borrow<Q>,
1640 Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
1641 {
1642 match self
1643 .base
1644 .get_with_hash(key, hash, never_ignore(), true, true)
1645 .await
1646 {
1647 Some(entry) => entry,
1648 None => {
1649 let key = Arc::new(key.to_owned());
1650 let value = init();
1651 self.insert_with_hash(Arc::clone(&key), hash, value.clone())
1652 .await;
1653 Entry::new(Some(key), value, true, false)
1654 }
1655 }
1656 }
1657
1658 pub(crate) async fn get_or_optionally_insert_with_hash_and_fun<F>(
1659 &self,
1660 key: Arc<K>,
1661 hash: u64,
1662 init: Pin<&mut F>,
1663 need_key: bool,
1664 ) -> Option<Entry<K, V>>
1665 where
1666 F: Future<Output = Option<V>>,
1667 {
1668 let entry = self
1669 .base
1670 .get_with_hash(&key, hash, never_ignore(), need_key, true)
1671 .await;
1672 if entry.is_some() {
1673 return entry;
1674 }
1675
1676 self.optionally_insert_with_hash_and_fun(key, hash, init, need_key)
1677 .await
1678 }
1679
1680 pub(crate) async fn get_or_optionally_insert_with_hash_by_ref_and_fun<F, Q>(
1681 &self,
1682 key: &Q,
1683 hash: u64,
1684 init: Pin<&mut F>,
1685 need_key: bool,
1686 ) -> Option<Entry<K, V>>
1687 where
1688 F: Future<Output = Option<V>>,
1689 K: Borrow<Q>,
1690 Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
1691 {
1692 let entry = self
1693 .base
1694 .get_with_hash(key, hash, never_ignore(), need_key, true)
1695 .await;
1696 if entry.is_some() {
1697 return entry;
1698 }
1699
1700 let key = Arc::new(key.to_owned());
1701 self.optionally_insert_with_hash_and_fun(key, hash, init, need_key)
1702 .await
1703 }
1704
1705 async fn optionally_insert_with_hash_and_fun<F>(
1706 &self,
1707 key: Arc<K>,
1708 hash: u64,
1709 init: Pin<&mut F>,
1710 need_key: bool,
1711 ) -> Option<Entry<K, V>>
1712 where
1713 F: Future<Output = Option<V>>,
1714 {
1715 let k = if need_key {
1716 Some(Arc::clone(&key))
1717 } else {
1718 None
1719 };
1720
1721 let type_id = ValueInitializer::<K, V, S>::type_id_for_optionally_get_with();
1722 let post_init = ValueInitializer::<K, V, S>::post_init_for_optionally_get_with;
1723
1724 match self
1725 .value_initializer
1726 .try_init_or_read(&key, hash, type_id, self, never_ignore(), init, post_init)
1727 .await
1728 {
1729 InitResult::Initialized(v) => {
1730 crossbeam_epoch::pin().flush();
1731 Some(Entry::new(k, v, true, false))
1732 }
1733 InitResult::ReadExisting(v) => Some(Entry::new(k, v, false, false)),
1734 InitResult::InitErr(_) => None,
1735 }
1736 }
1737
1738 pub(super) async fn get_or_try_insert_with_hash_and_fun<F, E>(
1739 &self,
1740 key: Arc<K>,
1741 hash: u64,
1742 init: Pin<&mut F>,
1743 need_key: bool,
1744 ) -> Result<Entry<K, V>, Arc<E>>
1745 where
1746 F: Future<Output = Result<V, E>>,
1747 E: Send + Sync + 'static,
1748 {
1749 if let Some(entry) = self
1750 .base
1751 .get_with_hash(&key, hash, never_ignore(), need_key, true)
1752 .await
1753 {
1754 return Ok(entry);
1755 }
1756
1757 self.try_insert_with_hash_and_fun(key, hash, init, need_key)
1758 .await
1759 }
1760
1761 pub(super) async fn get_or_try_insert_with_hash_by_ref_and_fun<F, E, Q>(
1762 &self,
1763 key: &Q,
1764 hash: u64,
1765 init: Pin<&mut F>,
1766 need_key: bool,
1767 ) -> Result<Entry<K, V>, Arc<E>>
1768 where
1769 F: Future<Output = Result<V, E>>,
1770 E: Send + Sync + 'static,
1771 K: Borrow<Q>,
1772 Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
1773 {
1774 if let Some(entry) = self
1775 .base
1776 .get_with_hash(key, hash, never_ignore(), need_key, true)
1777 .await
1778 {
1779 return Ok(entry);
1780 }
1781 let key = Arc::new(key.to_owned());
1782 self.try_insert_with_hash_and_fun(key, hash, init, need_key)
1783 .await
1784 }
1785
1786 async fn try_insert_with_hash_and_fun<F, E>(
1787 &self,
1788 key: Arc<K>,
1789 hash: u64,
1790 init: Pin<&mut F>,
1791 need_key: bool,
1792 ) -> Result<Entry<K, V>, Arc<E>>
1793 where
1794 F: Future<Output = Result<V, E>>,
1795 E: Send + Sync + 'static,
1796 {
1797 let k = if need_key {
1798 Some(Arc::clone(&key))
1799 } else {
1800 None
1801 };
1802
1803 let type_id = ValueInitializer::<K, V, S>::type_id_for_try_get_with::<E>();
1804 let post_init = ValueInitializer::<K, V, S>::post_init_for_try_get_with;
1805
1806 match self
1807 .value_initializer
1808 .try_init_or_read(&key, hash, type_id, self, never_ignore(), init, post_init)
1809 .await
1810 {
1811 InitResult::Initialized(v) => {
1812 crossbeam_epoch::pin().flush();
1813 Ok(Entry::new(k, v, true, false))
1814 }
1815 InitResult::ReadExisting(v) => Ok(Entry::new(k, v, false, false)),
1816 InitResult::InitErr(e) => {
1817 crossbeam_epoch::pin().flush();
1818 Err(e)
1819 }
1820 }
1821 }
1822
1823 pub(crate) async fn insert_with_hash(&self, key: Arc<K>, hash: u64, value: V) {
1824 if self.base.is_map_disabled() {
1825 return;
1826 }
1827
1828 let (op, ts) = self.base.do_insert_with_hash(key, hash, value).await;
1829 let mut cancel_guard = CancelGuard::new(&self.base.interrupted_op_ch_snd, ts);
1830 cancel_guard.set_op(op.clone());
1831
1832 let should_block;
1833 #[cfg(not(test))]
1834 {
1835 should_block = false;
1836 }
1837 #[cfg(test)]
1838 {
1839 should_block = self.schedule_write_op_should_block.load(Ordering::Acquire);
1840 }
1841
1842 let hk = self.base.housekeeper.as_ref();
1843 let event = self.base.write_op_ch_ready_event();
1844
1845 BaseCache::<K, V, S>::schedule_write_op(
1846 &self.base.inner,
1847 &self.base.write_op_ch,
1848 event,
1849 op,
1850 ts,
1851 hk,
1852 should_block,
1853 )
1854 .await
1855 .expect("Failed to schedule write op for insert");
1856 cancel_guard.clear();
1857 }
1858
1859 pub(crate) async fn compute_with_hash_and_fun<F, Fut>(
1860 &self,
1861 key: Arc<K>,
1862 hash: u64,
1863 f: F,
1864 ) -> compute::CompResult<K, V>
1865 where
1866 F: FnOnce(Option<Entry<K, V>>) -> Fut,
1867 Fut: Future<Output = compute::Op<V>>,
1868 {
1869 let post_init = ValueInitializer::<K, V, S>::post_init_for_compute_with;
1870 match self
1871 .value_initializer
1872 .try_compute(key, hash, self, f, post_init, true)
1873 .await
1874 {
1875 Ok(result) => result,
1876 Err(_) => unreachable!(),
1877 }
1878 }
1879
1880 pub(crate) async fn try_compute_with_hash_and_fun<F, Fut, E>(
1881 &self,
1882 key: Arc<K>,
1883 hash: u64,
1884 f: F,
1885 ) -> Result<compute::CompResult<K, V>, E>
1886 where
1887 F: FnOnce(Option<Entry<K, V>>) -> Fut,
1888 Fut: Future<Output = Result<compute::Op<V>, E>>,
1889 E: Send + Sync + 'static,
1890 {
1891 let post_init = ValueInitializer::<K, V, S>::post_init_for_try_compute_with;
1892 self.value_initializer
1893 .try_compute(key, hash, self, f, post_init, true)
1894 .await
1895 }
1896
1897 pub(crate) async fn try_compute_if_nobody_else_with_hash_and_fun<F, Fut, E>(
1898 &self,
1899 key: Arc<K>,
1900 hash: u64,
1901 f: F,
1902 ) -> Result<compute::CompResult<K, V>, E>
1903 where
1904 F: FnOnce(Option<Entry<K, V>>) -> Fut,
1905 Fut: Future<Output = Result<compute::Op<V>, E>>,
1906 E: Send + Sync + 'static,
1907 {
1908 let post_init = ValueInitializer::<K, V, S>::post_init_for_try_compute_with_if_nobody_else;
1909 self.value_initializer
1910 .try_compute_if_nobody_else(key, hash, self, f, post_init, true)
1911 .await
1912 }
1913
1914 pub(crate) async fn upsert_with_hash_and_fun<F, Fut>(
1915 &self,
1916 key: Arc<K>,
1917 hash: u64,
1918 f: F,
1919 ) -> Entry<K, V>
1920 where
1921 F: FnOnce(Option<Entry<K, V>>) -> Fut,
1922 Fut: Future<Output = V>,
1923 {
1924 let post_init = ValueInitializer::<K, V, S>::post_init_for_upsert_with;
1925 match self
1926 .value_initializer
1927 .try_compute(key, hash, self, f, post_init, false)
1928 .await
1929 {
1930 Ok(CompResult::Inserted(entry) | CompResult::ReplacedWith(entry)) => entry,
1931 _ => unreachable!(),
1932 }
1933 }
1934
1935 pub(crate) async fn invalidate_with_hash<Q>(
1936 &self,
1937 key: &Q,
1938 hash: u64,
1939 need_value: bool,
1940 ) -> Option<V>
1941 where
1942 K: Borrow<Q>,
1943 Q: Hash + Eq + ?Sized,
1944 {
1945 use futures_util::FutureExt;
1946
1947 self.base.retry_interrupted_ops().await;
1948
1949 // Lock the key for removal if blocking removal notification is enabled.
1950 let mut kl = None;
1951 let mut klg = None;
1952 if self.base.is_removal_notifier_enabled() {
1953 // To lock the key, we have to get Arc<K> for key (&Q).
1954 //
1955 // TODO: Enhance this if possible. This is rather hack now because
1956 // it cannot prevent race conditions like this:
1957 //
1958 // 1. We miss the key because it does not exist. So we do not lock
1959 // the key.
1960 // 2. Somebody else (other thread) inserts the key.
1961 // 3. We remove the entry for the key, but without the key lock!
1962 //
1963 if let Some(arc_key) = self.base.get_key_with_hash(key, hash) {
1964 kl = self.base.maybe_key_lock(&arc_key);
1965 klg = if let Some(lock) = &kl {
1966 Some(lock.lock().await)
1967 } else {
1968 None
1969 };
1970 }
1971 }
1972
1973 match self.base.remove_entry(key, hash) {
1974 None => None,
1975 Some(kv) => {
1976 let now = self.base.current_time();
1977
1978 let maybe_v = if need_value {
1979 Some(kv.entry.value.clone())
1980 } else {
1981 None
1982 };
1983
1984 let info = kv.entry.entry_info();
1985 let entry_gen = info.incr_entry_gen();
1986
1987 let op: WriteOp<K, V> = WriteOp::Remove {
1988 kv_entry: kv.clone(),
1989 entry_gen,
1990 };
1991
1992 // Async Cancellation Safety: To ensure the below future should be
1993 // executed even if our caller async task is cancelled, we create a
1994 // cancel guard for the future (and the op). If our caller is
1995 // cancelled while we are awaiting for the future, the cancel guard
1996 // will save the future and the op to the interrupted_op_ch channel,
1997 // so that we can resume/retry later.
1998 let mut cancel_guard = CancelGuard::new(&self.base.interrupted_op_ch_snd, now);
1999
2000 if self.base.is_removal_notifier_enabled() {
2001 let future = self
2002 .base
2003 .notify_invalidate(&kv.key, &kv.entry)
2004 .boxed()
2005 .shared();
2006 cancel_guard.set_future_and_op(future.clone(), op.clone());
2007 // Send notification to the eviction listener.
2008 future.await;
2009 cancel_guard.unset_future();
2010 } else {
2011 cancel_guard.set_op(op.clone());
2012 }
2013
2014 // Drop the locks before scheduling write op to avoid a potential
2015 // dead lock. (Scheduling write can do spin lock when the queue is
2016 // full, and queue will be drained by the housekeeping thread that
2017 // can lock the same key)
2018 std::mem::drop(klg);
2019 std::mem::drop(kl);
2020
2021 let should_block;
2022 #[cfg(not(test))]
2023 {
2024 should_block = false;
2025 }
2026 #[cfg(test)]
2027 {
2028 should_block = self.schedule_write_op_should_block.load(Ordering::Acquire);
2029 }
2030
2031 let event = self.base.write_op_ch_ready_event();
2032 let hk = self.base.housekeeper.as_ref();
2033
2034 BaseCache::<K, V, S>::schedule_write_op(
2035 &self.base.inner,
2036 &self.base.write_op_ch,
2037 event,
2038 op,
2039 now,
2040 hk,
2041 should_block,
2042 )
2043 .await
2044 .expect("Failed to schedule write op for remove");
2045 cancel_guard.clear();
2046
2047 crossbeam_epoch::pin().flush();
2048 maybe_v
2049 }
2050 }
2051 }
2052}
2053
2054// For unit tests.
2055// For unit tests.
2056#[cfg(test)]
2057impl<K, V, S> Cache<K, V, S> {
2058 pub(crate) fn is_table_empty(&self) -> bool {
2059 self.entry_count() == 0
2060 }
2061
2062 pub(crate) fn is_waiter_map_empty(&self) -> bool {
2063 self.value_initializer.waiter_count() == 0
2064 }
2065}
2066
2067#[cfg(test)]
2068impl<K, V, S> Cache<K, V, S>
2069where
2070 K: Hash + Eq + Send + Sync + 'static,
2071 V: Clone + Send + Sync + 'static,
2072 S: BuildHasher + Clone + Send + Sync + 'static,
2073{
2074 fn invalidation_predicate_count(&self) -> usize {
2075 self.base.invalidation_predicate_count()
2076 }
2077
2078 async fn reconfigure_for_testing(&mut self) {
2079 self.base.reconfigure_for_testing().await;
2080 }
2081
2082 fn key_locks_map_is_empty(&self) -> bool {
2083 self.base.key_locks_map_is_empty()
2084 }
2085
2086 fn run_pending_tasks_initiation_count(&self) -> usize {
2087 self.base
2088 .housekeeper
2089 .as_ref()
2090 .map(|hk| hk.start_count.load(Ordering::Acquire))
2091 .expect("housekeeper is not set")
2092 }
2093
2094 fn run_pending_tasks_completion_count(&self) -> usize {
2095 self.base
2096 .housekeeper
2097 .as_ref()
2098 .map(|hk| hk.complete_count.load(Ordering::Acquire))
2099 .expect("housekeeper is not set")
2100 }
2101}
2102
2103// AS of Rust 1.71, we cannot make this function into a `const fn` because mutable
2104// references are not allowed.
2105// See [#57349](https://github.com/rust-lang/rust/issues/57349).
2106#[inline]
2107fn never_ignore<'a, V>() -> Option<&'a mut fn(&V) -> bool> {
2108 None
2109}
2110
2111// To see the debug prints, run test as `cargo test -- --nocapture`
2112#[cfg(test)]
2113mod tests {
2114 use super::Cache;
2115 use crate::{
2116 common::{time::Clock, HousekeeperConfig},
2117 future::FutureExt,
2118 notification::{ListenerFuture, RemovalCause},
2119 ops::compute,
2120 policy::{test_utils::ExpiryCallCounters, EvictionPolicy},
2121 Expiry,
2122 };
2123
2124 use async_lock::{Barrier, Mutex};
2125 use std::{
2126 convert::Infallible,
2127 sync::{
2128 atomic::{AtomicU32, AtomicU8, Ordering},
2129 Arc,
2130 },
2131 time::{Duration, Instant as StdInstant},
2132 vec,
2133 };
2134 use tokio::time::sleep;
2135
2136 #[test]
2137 fn futures_are_send() {
2138 let cache = Cache::new(0);
2139
2140 fn is_send(_: impl Send) {}
2141
2142 // pub fns
2143 is_send(cache.get(&()));
2144 is_send(cache.get_with((), async {}));
2145 is_send(cache.get_with_by_ref(&(), async {}));
2146 #[allow(deprecated)]
2147 is_send(cache.get_with_if((), async {}, |_| false));
2148 is_send(cache.insert((), ()));
2149 is_send(cache.invalidate(&()));
2150 is_send(cache.optionally_get_with((), async { None }));
2151 is_send(cache.optionally_get_with_by_ref(&(), async { None }));
2152 is_send(cache.remove(&()));
2153 is_send(cache.run_pending_tasks());
2154 is_send(cache.try_get_with((), async { Err(()) }));
2155 is_send(cache.try_get_with_by_ref(&(), async { Err(()) }));
2156
2157 // entry fns
2158 is_send(
2159 cache
2160 .entry(())
2161 .and_compute_with(|_| async { compute::Op::Nop }),
2162 );
2163 is_send(
2164 cache
2165 .entry(())
2166 .and_try_compute_with(|_| async { Ok(compute::Op::Nop) as Result<_, Infallible> }),
2167 );
2168 is_send(cache.entry(()).and_upsert_with(|_| async {}));
2169 is_send(cache.entry(()).or_default());
2170 is_send(cache.entry(()).or_insert(()));
2171 is_send(cache.entry(()).or_insert_with(async {}));
2172 is_send(cache.entry(()).or_insert_with_if(async {}, |_| false));
2173 is_send(cache.entry(()).or_optionally_insert_with(async { None }));
2174 is_send(cache.entry(()).or_try_insert_with(async { Err(()) }));
2175
2176 // entry_by_ref fns
2177 is_send(
2178 cache
2179 .entry_by_ref(&())
2180 .and_compute_with(|_| async { compute::Op::Nop }),
2181 );
2182 is_send(
2183 cache
2184 .entry_by_ref(&())
2185 .and_try_compute_with(|_| async { Ok(compute::Op::Nop) as Result<_, Infallible> }),
2186 );
2187 is_send(cache.entry_by_ref(&()).and_upsert_with(|_| async {}));
2188 is_send(cache.entry_by_ref(&()).or_default());
2189 is_send(cache.entry_by_ref(&()).or_insert(()));
2190 is_send(cache.entry_by_ref(&()).or_insert_with(async {}));
2191 is_send(
2192 cache
2193 .entry_by_ref(&())
2194 .or_insert_with_if(async {}, |_| false),
2195 );
2196 is_send(
2197 cache
2198 .entry_by_ref(&())
2199 .or_optionally_insert_with(async { None }),
2200 );
2201 is_send(
2202 cache
2203 .entry_by_ref(&())
2204 .or_try_insert_with(async { Err(()) }),
2205 );
2206 }
2207
2208 #[tokio::test]
2209 async fn max_capacity_zero() {
2210 let mut cache = Cache::new(0);
2211 cache.reconfigure_for_testing().await;
2212
2213 // Make the cache exterior immutable.
2214 let cache = cache;
2215
2216 cache.insert(0, ()).await;
2217
2218 assert!(!cache.contains_key(&0));
2219 assert!(cache.get(&0).await.is_none());
2220 cache.run_pending_tasks().await;
2221 assert!(!cache.contains_key(&0));
2222 assert!(cache.get(&0).await.is_none());
2223 assert_eq!(cache.entry_count(), 0)
2224 }
2225
2226 #[tokio::test]
2227 async fn basic_single_async_task() {
2228 // The following `Vec`s will hold actual and expected notifications.
2229 let actual = Arc::new(Mutex::new(Vec::new()));
2230 let mut expected = Vec::new();
2231
2232 // Create an eviction listener.
2233 let a1 = Arc::clone(&actual);
2234 let listener = move |k, v, cause| -> ListenerFuture {
2235 let a2 = Arc::clone(&a1);
2236 async move {
2237 a2.lock().await.push((k, v, cause));
2238 }
2239 .boxed()
2240 };
2241
2242 // Create a cache with the eviction listener.
2243 let mut cache = Cache::builder()
2244 .max_capacity(3)
2245 .async_eviction_listener(listener)
2246 .build();
2247 cache.reconfigure_for_testing().await;
2248
2249 // Make the cache exterior immutable.
2250 let cache = cache;
2251
2252 cache.insert("a", "alice").await;
2253 cache.insert("b", "bob").await;
2254 assert_eq!(cache.get(&"a").await, Some("alice"));
2255 assert!(cache.contains_key(&"a"));
2256 assert!(cache.contains_key(&"b"));
2257 assert_eq!(cache.get(&"b").await, Some("bob"));
2258 cache.run_pending_tasks().await;
2259 // counts: a -> 1, b -> 1
2260
2261 cache.insert("c", "cindy").await;
2262 assert_eq!(cache.get(&"c").await, Some("cindy"));
2263 assert!(cache.contains_key(&"c"));
2264 // counts: a -> 1, b -> 1, c -> 1
2265 cache.run_pending_tasks().await;
2266
2267 assert!(cache.contains_key(&"a"));
2268 assert_eq!(cache.get(&"a").await, Some("alice"));
2269 assert_eq!(cache.get(&"b").await, Some("bob"));
2270 assert!(cache.contains_key(&"b"));
2271 cache.run_pending_tasks().await;
2272 // counts: a -> 2, b -> 2, c -> 1
2273
2274 // "d" should not be admitted because its frequency is too low.
2275 cache.insert("d", "david").await; // count: d -> 0
2276 expected.push((Arc::new("d"), "david", RemovalCause::Size));
2277 cache.run_pending_tasks().await;
2278 assert_eq!(cache.get(&"d").await, None); // d -> 1
2279 assert!(!cache.contains_key(&"d"));
2280
2281 cache.insert("d", "david").await;
2282 expected.push((Arc::new("d"), "david", RemovalCause::Size));
2283 cache.run_pending_tasks().await;
2284 assert!(!cache.contains_key(&"d"));
2285 assert_eq!(cache.get(&"d").await, None); // d -> 2
2286
2287 // "d" should be admitted and "c" should be evicted
2288 // because d's frequency is higher than c's.
2289 cache.insert("d", "dennis").await;
2290 expected.push((Arc::new("c"), "cindy", RemovalCause::Size));
2291 cache.run_pending_tasks().await;
2292 assert_eq!(cache.get(&"a").await, Some("alice"));
2293 assert_eq!(cache.get(&"b").await, Some("bob"));
2294 assert_eq!(cache.get(&"c").await, None);
2295 assert_eq!(cache.get(&"d").await, Some("dennis"));
2296 assert!(cache.contains_key(&"a"));
2297 assert!(cache.contains_key(&"b"));
2298 assert!(!cache.contains_key(&"c"));
2299 assert!(cache.contains_key(&"d"));
2300
2301 cache.invalidate(&"b").await;
2302 expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
2303 cache.run_pending_tasks().await;
2304 assert_eq!(cache.get(&"b").await, None);
2305 assert!(!cache.contains_key(&"b"));
2306
2307 assert!(cache.remove(&"b").await.is_none());
2308 assert_eq!(cache.remove(&"d").await, Some("dennis"));
2309 expected.push((Arc::new("d"), "dennis", RemovalCause::Explicit));
2310 cache.run_pending_tasks().await;
2311 assert_eq!(cache.get(&"d").await, None);
2312 assert!(!cache.contains_key(&"d"));
2313
2314 verify_notification_vec(&cache, actual, &expected).await;
2315 assert!(cache.key_locks_map_is_empty());
2316 }
2317
2318 #[tokio::test]
2319 async fn basic_lru_single_thread() {
2320 // The following `Vec`s will hold actual and expected notifications.
2321 let actual = Arc::new(Mutex::new(Vec::new()));
2322 let mut expected = Vec::new();
2323
2324 // Create an eviction listener.
2325 let a1 = Arc::clone(&actual);
2326 let listener = move |k, v, cause| -> ListenerFuture {
2327 let a2 = Arc::clone(&a1);
2328 async move {
2329 a2.lock().await.push((k, v, cause));
2330 }
2331 .boxed()
2332 };
2333
2334 // Create a cache with the eviction listener.
2335 let mut cache = Cache::builder()
2336 .max_capacity(3)
2337 .eviction_policy(EvictionPolicy::lru())
2338 .async_eviction_listener(listener)
2339 .build();
2340 cache.reconfigure_for_testing().await;
2341
2342 // Make the cache exterior immutable.
2343 let cache = cache;
2344
2345 cache.insert("a", "alice").await;
2346 cache.insert("b", "bob").await;
2347 assert_eq!(cache.get(&"a").await, Some("alice"));
2348 assert!(cache.contains_key(&"a"));
2349 assert!(cache.contains_key(&"b"));
2350 assert_eq!(cache.get(&"b").await, Some("bob"));
2351 cache.run_pending_tasks().await;
2352 // a -> b
2353
2354 cache.insert("c", "cindy").await;
2355 assert_eq!(cache.get(&"c").await, Some("cindy"));
2356 assert!(cache.contains_key(&"c"));
2357 cache.run_pending_tasks().await;
2358 // a -> b -> c
2359
2360 assert!(cache.contains_key(&"a"));
2361 assert_eq!(cache.get(&"a").await, Some("alice"));
2362 assert_eq!(cache.get(&"b").await, Some("bob"));
2363 assert!(cache.contains_key(&"b"));
2364 cache.run_pending_tasks().await;
2365 // c -> a -> b
2366
2367 // "d" should be admitted because the cache uses the LRU strategy.
2368 cache.insert("d", "david").await;
2369 // "c" is the LRU and should have be evicted.
2370 expected.push((Arc::new("c"), "cindy", RemovalCause::Size));
2371 cache.run_pending_tasks().await;
2372
2373 assert_eq!(cache.get(&"a").await, Some("alice"));
2374 assert_eq!(cache.get(&"b").await, Some("bob"));
2375 assert_eq!(cache.get(&"c").await, None);
2376 assert_eq!(cache.get(&"d").await, Some("david"));
2377 assert!(cache.contains_key(&"a"));
2378 assert!(cache.contains_key(&"b"));
2379 assert!(!cache.contains_key(&"c"));
2380 assert!(cache.contains_key(&"d"));
2381 cache.run_pending_tasks().await;
2382 // a -> b -> d
2383
2384 cache.invalidate(&"b").await;
2385 expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
2386 cache.run_pending_tasks().await;
2387 // a -> d
2388 assert_eq!(cache.get(&"b").await, None);
2389 assert!(!cache.contains_key(&"b"));
2390
2391 assert!(cache.remove(&"b").await.is_none());
2392 assert_eq!(cache.remove(&"d").await, Some("david"));
2393 expected.push((Arc::new("d"), "david", RemovalCause::Explicit));
2394 cache.run_pending_tasks().await;
2395 // a
2396 assert_eq!(cache.get(&"d").await, None);
2397 assert!(!cache.contains_key(&"d"));
2398
2399 cache.insert("e", "emily").await;
2400 cache.insert("f", "frank").await;
2401 // "a" should be evicted because it is the LRU.
2402 cache.insert("g", "gina").await;
2403 expected.push((Arc::new("a"), "alice", RemovalCause::Size));
2404 cache.run_pending_tasks().await;
2405 // e -> f -> g
2406 assert_eq!(cache.get(&"a").await, None);
2407 assert_eq!(cache.get(&"e").await, Some("emily"));
2408 assert_eq!(cache.get(&"f").await, Some("frank"));
2409 assert_eq!(cache.get(&"g").await, Some("gina"));
2410 assert!(!cache.contains_key(&"a"));
2411 assert!(cache.contains_key(&"e"));
2412 assert!(cache.contains_key(&"f"));
2413 assert!(cache.contains_key(&"g"));
2414
2415 verify_notification_vec(&cache, actual, &expected).await;
2416 assert!(cache.key_locks_map_is_empty());
2417 }
2418
2419 #[tokio::test]
2420 async fn size_aware_eviction() {
2421 let weigher = |_k: &&str, v: &(&str, u32)| v.1;
2422
2423 let alice = ("alice", 10);
2424 let bob = ("bob", 15);
2425 let bill = ("bill", 20);
2426 let cindy = ("cindy", 5);
2427 let david = ("david", 15);
2428 let dennis = ("dennis", 15);
2429
2430 // The following `Vec`s will hold actual and expected notifications.
2431 let actual = Arc::new(Mutex::new(Vec::new()));
2432 let mut expected = Vec::new();
2433
2434 // Create an eviction listener.
2435 let a1 = Arc::clone(&actual);
2436 let listener = move |k, v, cause| -> ListenerFuture {
2437 let a2 = Arc::clone(&a1);
2438 async move {
2439 a2.lock().await.push((k, v, cause));
2440 }
2441 .boxed()
2442 };
2443
2444 // Create a cache with the eviction listener.
2445 let mut cache = Cache::builder()
2446 .max_capacity(31)
2447 .weigher(weigher)
2448 .async_eviction_listener(listener)
2449 .build();
2450 cache.reconfigure_for_testing().await;
2451
2452 // Make the cache exterior immutable.
2453 let cache = cache;
2454
2455 cache.insert("a", alice).await;
2456 cache.insert("b", bob).await;
2457 assert_eq!(cache.get(&"a").await, Some(alice));
2458 assert!(cache.contains_key(&"a"));
2459 assert!(cache.contains_key(&"b"));
2460 assert_eq!(cache.get(&"b").await, Some(bob));
2461 cache.run_pending_tasks().await;
2462 // order (LRU -> MRU) and counts: a -> 1, b -> 1
2463
2464 cache.insert("c", cindy).await;
2465 assert_eq!(cache.get(&"c").await, Some(cindy));
2466 assert!(cache.contains_key(&"c"));
2467 // order and counts: a -> 1, b -> 1, c -> 1
2468 cache.run_pending_tasks().await;
2469
2470 assert!(cache.contains_key(&"a"));
2471 assert_eq!(cache.get(&"a").await, Some(alice));
2472 assert_eq!(cache.get(&"b").await, Some(bob));
2473 assert!(cache.contains_key(&"b"));
2474 cache.run_pending_tasks().await;
2475 // order and counts: c -> 1, a -> 2, b -> 2
2476
2477 // To enter "d" (weight: 15), it needs to evict "c" (w: 5) and "a" (w: 10).
2478 // "d" must have higher count than 3, which is the aggregated count
2479 // of "a" and "c".
2480 cache.insert("d", david).await; // count: d -> 0
2481 expected.push((Arc::new("d"), david, RemovalCause::Size));
2482 cache.run_pending_tasks().await;
2483 assert_eq!(cache.get(&"d").await, None); // d -> 1
2484 assert!(!cache.contains_key(&"d"));
2485
2486 cache.insert("d", david).await;
2487 expected.push((Arc::new("d"), david, RemovalCause::Size));
2488 cache.run_pending_tasks().await;
2489 assert!(!cache.contains_key(&"d"));
2490 assert_eq!(cache.get(&"d").await, None); // d -> 2
2491
2492 cache.insert("d", david).await;
2493 expected.push((Arc::new("d"), david, RemovalCause::Size));
2494 cache.run_pending_tasks().await;
2495 assert_eq!(cache.get(&"d").await, None); // d -> 3
2496 assert!(!cache.contains_key(&"d"));
2497
2498 cache.insert("d", david).await;
2499 expected.push((Arc::new("d"), david, RemovalCause::Size));
2500 cache.run_pending_tasks().await;
2501 assert!(!cache.contains_key(&"d"));
2502 assert_eq!(cache.get(&"d").await, None); // d -> 4
2503
2504 // Finally "d" should be admitted by evicting "c" and "a".
2505 cache.insert("d", dennis).await;
2506 expected.push((Arc::new("c"), cindy, RemovalCause::Size));
2507 expected.push((Arc::new("a"), alice, RemovalCause::Size));
2508 cache.run_pending_tasks().await;
2509 assert_eq!(cache.get(&"a").await, None);
2510 assert_eq!(cache.get(&"b").await, Some(bob));
2511 assert_eq!(cache.get(&"c").await, None);
2512 assert_eq!(cache.get(&"d").await, Some(dennis));
2513 assert!(!cache.contains_key(&"a"));
2514 assert!(cache.contains_key(&"b"));
2515 assert!(!cache.contains_key(&"c"));
2516 assert!(cache.contains_key(&"d"));
2517
2518 // Update "b" with "bill" (w: 15 -> 20). This should evict "d" (w: 15).
2519 cache.insert("b", bill).await;
2520 expected.push((Arc::new("b"), bob, RemovalCause::Replaced));
2521 expected.push((Arc::new("d"), dennis, RemovalCause::Size));
2522 cache.run_pending_tasks().await;
2523 assert_eq!(cache.get(&"b").await, Some(bill));
2524 assert_eq!(cache.get(&"d").await, None);
2525 assert!(cache.contains_key(&"b"));
2526 assert!(!cache.contains_key(&"d"));
2527
2528 // Re-add "a" (w: 10) and update "b" with "bob" (w: 20 -> 15).
2529 cache.insert("a", alice).await;
2530 cache.insert("b", bob).await;
2531 expected.push((Arc::new("b"), bill, RemovalCause::Replaced));
2532 cache.run_pending_tasks().await;
2533 assert_eq!(cache.get(&"a").await, Some(alice));
2534 assert_eq!(cache.get(&"b").await, Some(bob));
2535 assert_eq!(cache.get(&"d").await, None);
2536 assert!(cache.contains_key(&"a"));
2537 assert!(cache.contains_key(&"b"));
2538 assert!(!cache.contains_key(&"d"));
2539
2540 // Verify the sizes.
2541 assert_eq!(cache.entry_count(), 2);
2542 assert_eq!(cache.weighted_size(), 25);
2543
2544 verify_notification_vec(&cache, actual, &expected).await;
2545 assert!(cache.key_locks_map_is_empty());
2546 }
2547
2548 #[tokio::test]
2549 async fn basic_multi_async_tasks() {
2550 let num_tasks = 2;
2551 let num_threads = 2;
2552
2553 let cache = Cache::new(100);
2554 let barrier = Arc::new(Barrier::new(num_tasks + num_threads as usize));
2555
2556 let tasks = (0..num_tasks)
2557 .map(|id| {
2558 let cache = cache.clone();
2559 let barrier = Arc::clone(&barrier);
2560
2561 tokio::spawn(async move {
2562 barrier.wait().await;
2563
2564 cache.insert(10, format!("{id}-100")).await;
2565 cache.get(&10).await;
2566 cache.insert(20, format!("{id}-200")).await;
2567 cache.invalidate(&10).await;
2568 })
2569 })
2570 .collect::<Vec<_>>();
2571
2572 let threads = (0..num_threads)
2573 .map(|id| {
2574 let cache = cache.clone();
2575 let barrier = Arc::clone(&barrier);
2576 let rt = tokio::runtime::Handle::current();
2577
2578 std::thread::spawn(move || {
2579 rt.block_on(barrier.wait());
2580
2581 rt.block_on(cache.insert(10, format!("{id}-100")));
2582 rt.block_on(cache.get(&10));
2583 rt.block_on(cache.insert(20, format!("{id}-200")));
2584 rt.block_on(cache.invalidate(&10));
2585 })
2586 })
2587 .collect::<Vec<_>>();
2588
2589 let _ = futures_util::future::join_all(tasks).await;
2590 threads.into_iter().for_each(|t| t.join().unwrap());
2591
2592 assert!(cache.get(&10).await.is_none());
2593 assert!(cache.get(&20).await.is_some());
2594 assert!(!cache.contains_key(&10));
2595 assert!(cache.contains_key(&20));
2596 }
2597
2598 #[tokio::test]
2599 async fn invalidate_all() {
2600 // The following `Vec`s will hold actual and expected notifications.
2601 let actual = Arc::new(Mutex::new(Vec::new()));
2602 let mut expected = Vec::new();
2603
2604 // Create an eviction listener.
2605 let a1 = Arc::clone(&actual);
2606 let listener = move |k, v, cause| -> ListenerFuture {
2607 let a2 = Arc::clone(&a1);
2608 async move {
2609 a2.lock().await.push((k, v, cause));
2610 }
2611 .boxed()
2612 };
2613
2614 // Create a cache with the eviction listener.
2615 let mut cache = Cache::builder()
2616 .max_capacity(100)
2617 .async_eviction_listener(listener)
2618 .build();
2619 cache.reconfigure_for_testing().await;
2620
2621 // Make the cache exterior immutable.
2622 let cache = cache;
2623
2624 cache.insert("a", "alice").await;
2625 cache.insert("b", "bob").await;
2626 cache.insert("c", "cindy").await;
2627 assert_eq!(cache.get(&"a").await, Some("alice"));
2628 assert_eq!(cache.get(&"b").await, Some("bob"));
2629 assert_eq!(cache.get(&"c").await, Some("cindy"));
2630 assert!(cache.contains_key(&"a"));
2631 assert!(cache.contains_key(&"b"));
2632 assert!(cache.contains_key(&"c"));
2633
2634 // `cache.run_pending_tasks().await` is no longer needed here before invalidating. The last
2635 // modified timestamp of the entries were updated when they were inserted.
2636 // https://github.com/moka-rs/moka/issues/155
2637
2638 cache.invalidate_all();
2639 expected.push((Arc::new("a"), "alice", RemovalCause::Explicit));
2640 expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
2641 expected.push((Arc::new("c"), "cindy", RemovalCause::Explicit));
2642 cache.run_pending_tasks().await;
2643
2644 cache.insert("d", "david").await;
2645 cache.run_pending_tasks().await;
2646
2647 assert!(cache.get(&"a").await.is_none());
2648 assert!(cache.get(&"b").await.is_none());
2649 assert!(cache.get(&"c").await.is_none());
2650 assert_eq!(cache.get(&"d").await, Some("david"));
2651 assert!(!cache.contains_key(&"a"));
2652 assert!(!cache.contains_key(&"b"));
2653 assert!(!cache.contains_key(&"c"));
2654 assert!(cache.contains_key(&"d"));
2655
2656 verify_notification_vec(&cache, actual, &expected).await;
2657 }
2658
2659 // This test is for https://github.com/moka-rs/moka/issues/155
2660 #[tokio::test]
2661 async fn invalidate_all_without_running_pending_tasks() {
2662 let cache = Cache::new(1024);
2663
2664 assert_eq!(cache.get(&0).await, None);
2665 cache.insert(0, 1).await;
2666 assert_eq!(cache.get(&0).await, Some(1));
2667
2668 cache.invalidate_all();
2669 assert_eq!(cache.get(&0).await, None);
2670 }
2671
2672 #[tokio::test]
2673 async fn invalidate_entries_if() -> Result<(), Box<dyn std::error::Error>> {
2674 use std::collections::HashSet;
2675
2676 // The following `Vec`s will hold actual and expected notifications.
2677 let actual = Arc::new(Mutex::new(Vec::new()));
2678 let mut expected = Vec::new();
2679
2680 // Create an eviction listener.
2681 let a1 = Arc::clone(&actual);
2682 let listener = move |k, v, cause| -> ListenerFuture {
2683 let a2 = Arc::clone(&a1);
2684 async move {
2685 a2.lock().await.push((k, v, cause));
2686 }
2687 .boxed()
2688 };
2689
2690 let (clock, mock) = Clock::mock();
2691
2692 // Create a cache with the eviction listener.
2693 let mut cache = Cache::builder()
2694 .max_capacity(100)
2695 .support_invalidation_closures()
2696 .async_eviction_listener(listener)
2697 .clock(clock)
2698 .build();
2699 cache.reconfigure_for_testing().await;
2700
2701 // Make the cache exterior immutable.
2702 let cache = cache;
2703
2704 cache.insert(0, "alice").await;
2705 cache.insert(1, "bob").await;
2706 cache.insert(2, "alex").await;
2707 cache.run_pending_tasks().await;
2708
2709 mock.increment(Duration::from_secs(5)); // 5 secs from the start.
2710 cache.run_pending_tasks().await;
2711
2712 assert_eq!(cache.get(&0).await, Some("alice"));
2713 assert_eq!(cache.get(&1).await, Some("bob"));
2714 assert_eq!(cache.get(&2).await, Some("alex"));
2715 assert!(cache.contains_key(&0));
2716 assert!(cache.contains_key(&1));
2717 assert!(cache.contains_key(&2));
2718
2719 let names = ["alice", "alex"].iter().cloned().collect::<HashSet<_>>();
2720 cache.invalidate_entries_if(move |_k, &v| names.contains(v))?;
2721 assert_eq!(cache.invalidation_predicate_count(), 1);
2722 expected.push((Arc::new(0), "alice", RemovalCause::Explicit));
2723 expected.push((Arc::new(2), "alex", RemovalCause::Explicit));
2724
2725 mock.increment(Duration::from_secs(5)); // 10 secs from the start.
2726
2727 cache.insert(3, "alice").await;
2728
2729 // Run the invalidation task and wait for it to finish. (TODO: Need a better way than sleeping)
2730 cache.run_pending_tasks().await; // To submit the invalidation task.
2731 std::thread::sleep(Duration::from_millis(200));
2732 cache.run_pending_tasks().await; // To process the task result.
2733 std::thread::sleep(Duration::from_millis(200));
2734
2735 assert!(cache.get(&0).await.is_none());
2736 assert!(cache.get(&2).await.is_none());
2737 assert_eq!(cache.get(&1).await, Some("bob"));
2738 // This should survive as it was inserted after calling invalidate_entries_if.
2739 assert_eq!(cache.get(&3).await, Some("alice"));
2740
2741 assert!(!cache.contains_key(&0));
2742 assert!(cache.contains_key(&1));
2743 assert!(!cache.contains_key(&2));
2744 assert!(cache.contains_key(&3));
2745
2746 assert_eq!(cache.entry_count(), 2);
2747 assert_eq!(cache.invalidation_predicate_count(), 0);
2748
2749 mock.increment(Duration::from_secs(5)); // 15 secs from the start.
2750
2751 cache.invalidate_entries_if(|_k, &v| v == "alice")?;
2752 cache.invalidate_entries_if(|_k, &v| v == "bob")?;
2753 assert_eq!(cache.invalidation_predicate_count(), 2);
2754 // key 1 was inserted before key 3.
2755 expected.push((Arc::new(1), "bob", RemovalCause::Explicit));
2756 expected.push((Arc::new(3), "alice", RemovalCause::Explicit));
2757
2758 // Run the invalidation task and wait for it to finish. (TODO: Need a better way than sleeping)
2759 cache.run_pending_tasks().await; // To submit the invalidation task.
2760 std::thread::sleep(Duration::from_millis(200));
2761 cache.run_pending_tasks().await; // To process the task result.
2762 std::thread::sleep(Duration::from_millis(200));
2763
2764 assert!(cache.get(&1).await.is_none());
2765 assert!(cache.get(&3).await.is_none());
2766
2767 assert!(!cache.contains_key(&1));
2768 assert!(!cache.contains_key(&3));
2769
2770 assert_eq!(cache.entry_count(), 0);
2771 assert_eq!(cache.invalidation_predicate_count(), 0);
2772
2773 verify_notification_vec(&cache, actual, &expected).await;
2774
2775 Ok(())
2776 }
2777
2778 #[tokio::test]
2779 async fn time_to_live() {
2780 // The following `Vec`s will hold actual and expected notifications.
2781 let actual = Arc::new(Mutex::new(Vec::new()));
2782 let mut expected = Vec::new();
2783
2784 // Create an eviction listener.
2785 let a1 = Arc::clone(&actual);
2786 let listener = move |k, v, cause| -> ListenerFuture {
2787 let a2 = Arc::clone(&a1);
2788 async move {
2789 a2.lock().await.push((k, v, cause));
2790 }
2791 .boxed()
2792 };
2793
2794 let (clock, mock) = Clock::mock();
2795
2796 // Create a cache with the eviction listener.
2797 let mut cache = Cache::builder()
2798 .max_capacity(100)
2799 .time_to_live(Duration::from_secs(10))
2800 .async_eviction_listener(listener)
2801 .clock(clock)
2802 .build();
2803 cache.reconfigure_for_testing().await;
2804
2805 // Make the cache exterior immutable.
2806 let cache = cache;
2807
2808 cache.insert("a", "alice").await;
2809 cache.run_pending_tasks().await;
2810
2811 mock.increment(Duration::from_secs(5)); // 5 secs from the start.
2812 cache.run_pending_tasks().await;
2813
2814 assert_eq!(cache.get(&"a").await, Some("alice"));
2815 assert!(cache.contains_key(&"a"));
2816
2817 mock.increment(Duration::from_secs(5)); // 10 secs.
2818 expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
2819 assert_eq!(cache.get(&"a").await, None);
2820 assert!(!cache.contains_key(&"a"));
2821
2822 assert_eq!(cache.iter().count(), 0);
2823
2824 cache.run_pending_tasks().await;
2825 assert!(cache.is_table_empty());
2826
2827 cache.insert("b", "bob").await;
2828 cache.run_pending_tasks().await;
2829
2830 assert_eq!(cache.entry_count(), 1);
2831
2832 mock.increment(Duration::from_secs(5)); // 15 secs.
2833 cache.run_pending_tasks().await;
2834
2835 assert_eq!(cache.get(&"b").await, Some("bob"));
2836 assert!(cache.contains_key(&"b"));
2837 assert_eq!(cache.entry_count(), 1);
2838
2839 cache.insert("b", "bill").await;
2840 expected.push((Arc::new("b"), "bob", RemovalCause::Replaced));
2841 cache.run_pending_tasks().await;
2842
2843 mock.increment(Duration::from_secs(5)); // 20 secs
2844 cache.run_pending_tasks().await;
2845
2846 assert_eq!(cache.get(&"b").await, Some("bill"));
2847 assert!(cache.contains_key(&"b"));
2848 assert_eq!(cache.entry_count(), 1);
2849
2850 mock.increment(Duration::from_secs(5)); // 25 secs
2851 expected.push((Arc::new("b"), "bill", RemovalCause::Expired));
2852
2853 assert_eq!(cache.get(&"a").await, None);
2854 assert_eq!(cache.get(&"b").await, None);
2855 assert!(!cache.contains_key(&"a"));
2856 assert!(!cache.contains_key(&"b"));
2857
2858 assert_eq!(cache.iter().count(), 0);
2859
2860 cache.run_pending_tasks().await;
2861 assert!(cache.is_table_empty());
2862
2863 verify_notification_vec(&cache, actual, &expected).await;
2864 }
2865
2866 #[tokio::test]
2867 async fn time_to_idle() {
2868 // The following `Vec`s will hold actual and expected notifications.
2869 let actual = Arc::new(Mutex::new(Vec::new()));
2870 let mut expected = Vec::new();
2871
2872 // Create an eviction listener.
2873 let a1 = Arc::clone(&actual);
2874 let listener = move |k, v, cause| -> ListenerFuture {
2875 let a2 = Arc::clone(&a1);
2876 async move {
2877 a2.lock().await.push((k, v, cause));
2878 }
2879 .boxed()
2880 };
2881
2882 let (clock, mock) = Clock::mock();
2883
2884 // Create a cache with the eviction listener.
2885 let mut cache = Cache::builder()
2886 .max_capacity(100)
2887 .time_to_idle(Duration::from_secs(10))
2888 .async_eviction_listener(listener)
2889 .clock(clock)
2890 .build();
2891 cache.reconfigure_for_testing().await;
2892
2893 // Make the cache exterior immutable.
2894 let cache = cache;
2895
2896 cache.insert("a", "alice").await;
2897 cache.run_pending_tasks().await;
2898
2899 mock.increment(Duration::from_secs(5)); // 5 secs from the start.
2900 cache.run_pending_tasks().await;
2901
2902 assert_eq!(cache.get(&"a").await, Some("alice"));
2903
2904 mock.increment(Duration::from_secs(5)); // 10 secs.
2905 cache.run_pending_tasks().await;
2906
2907 cache.insert("b", "bob").await;
2908 cache.run_pending_tasks().await;
2909
2910 assert_eq!(cache.entry_count(), 2);
2911
2912 mock.increment(Duration::from_secs(2)); // 12 secs.
2913 cache.run_pending_tasks().await;
2914
2915 // contains_key does not reset the idle timer for the key.
2916 assert!(cache.contains_key(&"a"));
2917 assert!(cache.contains_key(&"b"));
2918 cache.run_pending_tasks().await;
2919
2920 assert_eq!(cache.entry_count(), 2);
2921
2922 mock.increment(Duration::from_secs(3)); // 15 secs.
2923 expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
2924
2925 assert_eq!(cache.get(&"a").await, None);
2926 assert_eq!(cache.get(&"b").await, Some("bob"));
2927 assert!(!cache.contains_key(&"a"));
2928 assert!(cache.contains_key(&"b"));
2929
2930 assert_eq!(cache.iter().count(), 1);
2931
2932 cache.run_pending_tasks().await;
2933 assert_eq!(cache.entry_count(), 1);
2934
2935 mock.increment(Duration::from_secs(10)); // 25 secs
2936 expected.push((Arc::new("b"), "bob", RemovalCause::Expired));
2937
2938 assert_eq!(cache.get(&"a").await, None);
2939 assert_eq!(cache.get(&"b").await, None);
2940 assert!(!cache.contains_key(&"a"));
2941 assert!(!cache.contains_key(&"b"));
2942
2943 assert_eq!(cache.iter().count(), 0);
2944
2945 cache.run_pending_tasks().await;
2946 assert!(cache.is_table_empty());
2947
2948 verify_notification_vec(&cache, actual, &expected).await;
2949 }
2950
2951 // https://github.com/moka-rs/moka/issues/359
2952 #[tokio::test]
2953 async fn ensure_access_time_is_updated_immediately_after_read() {
2954 let (clock, mock) = Clock::mock();
2955 let mut cache = Cache::builder()
2956 .max_capacity(10)
2957 .time_to_idle(Duration::from_secs(5))
2958 .clock(clock)
2959 .build();
2960 cache.reconfigure_for_testing().await;
2961 // Make the cache exterior immutable.
2962 let cache = cache;
2963
2964 cache.insert(1, 1).await;
2965
2966 mock.increment(Duration::from_secs(4));
2967 assert_eq!(cache.get(&1).await, Some(1));
2968
2969 mock.increment(Duration::from_secs(2));
2970 assert_eq!(cache.get(&1).await, Some(1));
2971 cache.run_pending_tasks().await;
2972 assert_eq!(cache.get(&1).await, Some(1));
2973 }
2974
2975 #[tokio::test]
2976 async fn time_to_live_by_expiry_type() {
2977 // The following `Vec`s will hold actual and expected notifications.
2978 let actual = Arc::new(Mutex::new(Vec::new()));
2979 let mut expected = Vec::new();
2980
2981 // Define an expiry type.
2982 struct MyExpiry {
2983 counters: Arc<ExpiryCallCounters>,
2984 }
2985
2986 impl MyExpiry {
2987 fn new(counters: Arc<ExpiryCallCounters>) -> Self {
2988 Self { counters }
2989 }
2990 }
2991
2992 impl Expiry<&str, &str> for MyExpiry {
2993 fn expire_after_create(
2994 &self,
2995 _key: &&str,
2996 _value: &&str,
2997 _current_time: StdInstant,
2998 ) -> Option<Duration> {
2999 self.counters.incl_actual_creations();
3000 Some(Duration::from_secs(10))
3001 }
3002
3003 fn expire_after_update(
3004 &self,
3005 _key: &&str,
3006 _value: &&str,
3007 _current_time: StdInstant,
3008 _current_duration: Option<Duration>,
3009 ) -> Option<Duration> {
3010 self.counters.incl_actual_updates();
3011 Some(Duration::from_secs(10))
3012 }
3013 }
3014
3015 // Create an eviction listener.
3016 let a1 = Arc::clone(&actual);
3017 let listener = move |k, v, cause| -> ListenerFuture {
3018 let a2 = Arc::clone(&a1);
3019 async move {
3020 a2.lock().await.push((k, v, cause));
3021 }
3022 .boxed()
3023 };
3024
3025 // Create expiry counters and the expiry.
3026 let expiry_counters = Arc::new(ExpiryCallCounters::default());
3027 let expiry = MyExpiry::new(Arc::clone(&expiry_counters));
3028
3029 let (clock, mock) = Clock::mock();
3030
3031 // Create a cache with the expiry and eviction listener.
3032 let mut cache = Cache::builder()
3033 .max_capacity(100)
3034 .expire_after(expiry)
3035 .async_eviction_listener(listener)
3036 .clock(clock)
3037 .build();
3038 cache.reconfigure_for_testing().await;
3039
3040 // Make the cache exterior immutable.
3041 let cache = cache;
3042
3043 cache.insert("a", "alice").await;
3044 expiry_counters.incl_expected_creations();
3045 cache.run_pending_tasks().await;
3046
3047 mock.increment(Duration::from_secs(5)); // 5 secs from the start.
3048 cache.run_pending_tasks().await;
3049
3050 assert_eq!(cache.get(&"a").await, Some("alice"));
3051 assert!(cache.contains_key(&"a"));
3052
3053 mock.increment(Duration::from_secs(5)); // 10 secs.
3054 expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
3055 assert_eq!(cache.get(&"a").await, None);
3056 assert!(!cache.contains_key(&"a"));
3057
3058 assert_eq!(cache.iter().count(), 0);
3059
3060 cache.run_pending_tasks().await;
3061 assert!(cache.is_table_empty());
3062
3063 cache.insert("b", "bob").await;
3064 expiry_counters.incl_expected_creations();
3065 cache.run_pending_tasks().await;
3066
3067 assert_eq!(cache.entry_count(), 1);
3068
3069 mock.increment(Duration::from_secs(5)); // 15 secs.
3070 cache.run_pending_tasks().await;
3071
3072 assert_eq!(cache.get(&"b").await, Some("bob"));
3073 assert!(cache.contains_key(&"b"));
3074 assert_eq!(cache.entry_count(), 1);
3075
3076 cache.insert("b", "bill").await;
3077 expected.push((Arc::new("b"), "bob", RemovalCause::Replaced));
3078 expiry_counters.incl_expected_updates();
3079 cache.run_pending_tasks().await;
3080
3081 mock.increment(Duration::from_secs(5)); // 20 secs
3082 cache.run_pending_tasks().await;
3083
3084 assert_eq!(cache.get(&"b").await, Some("bill"));
3085 assert!(cache.contains_key(&"b"));
3086 assert_eq!(cache.entry_count(), 1);
3087
3088 mock.increment(Duration::from_secs(5)); // 25 secs
3089 expected.push((Arc::new("b"), "bill", RemovalCause::Expired));
3090
3091 assert_eq!(cache.get(&"a").await, None);
3092 assert_eq!(cache.get(&"b").await, None);
3093 assert!(!cache.contains_key(&"a"));
3094 assert!(!cache.contains_key(&"b"));
3095
3096 assert_eq!(cache.iter().count(), 0);
3097
3098 cache.run_pending_tasks().await;
3099 assert!(cache.is_table_empty());
3100
3101 expiry_counters.verify();
3102 verify_notification_vec(&cache, actual, &expected).await;
3103 }
3104
3105 #[tokio::test]
3106 async fn time_to_idle_by_expiry_type() {
3107 // The following `Vec`s will hold actual and expected notifications.
3108 let actual = Arc::new(Mutex::new(Vec::new()));
3109 let mut expected = Vec::new();
3110
3111 // Define an expiry type.
3112 struct MyExpiry {
3113 counters: Arc<ExpiryCallCounters>,
3114 }
3115
3116 impl MyExpiry {
3117 fn new(counters: Arc<ExpiryCallCounters>) -> Self {
3118 Self { counters }
3119 }
3120 }
3121
3122 impl Expiry<&str, &str> for MyExpiry {
3123 fn expire_after_read(
3124 &self,
3125 _key: &&str,
3126 _value: &&str,
3127 _current_time: StdInstant,
3128 _current_duration: Option<Duration>,
3129 _last_modified_at: StdInstant,
3130 ) -> Option<Duration> {
3131 self.counters.incl_actual_reads();
3132 Some(Duration::from_secs(10))
3133 }
3134 }
3135
3136 // Create an eviction listener.
3137 let a1 = Arc::clone(&actual);
3138 let listener = move |k, v, cause| -> ListenerFuture {
3139 let a2 = Arc::clone(&a1);
3140 async move {
3141 a2.lock().await.push((k, v, cause));
3142 }
3143 .boxed()
3144 };
3145
3146 // Create expiry counters and the expiry.
3147 let expiry_counters = Arc::new(ExpiryCallCounters::default());
3148 let expiry = MyExpiry::new(Arc::clone(&expiry_counters));
3149
3150 let (clock, mock) = Clock::mock();
3151
3152 // Create a cache with the expiry and eviction listener.
3153 let mut cache = Cache::builder()
3154 .max_capacity(100)
3155 .expire_after(expiry)
3156 .async_eviction_listener(listener)
3157 .clock(clock)
3158 .build();
3159 cache.reconfigure_for_testing().await;
3160
3161 // Make the cache exterior immutable.
3162 let cache = cache;
3163
3164 cache.insert("a", "alice").await;
3165 cache.run_pending_tasks().await;
3166
3167 mock.increment(Duration::from_secs(5)); // 5 secs from the start.
3168 cache.run_pending_tasks().await;
3169
3170 assert_eq!(cache.get(&"a").await, Some("alice"));
3171 expiry_counters.incl_expected_reads();
3172
3173 mock.increment(Duration::from_secs(5)); // 10 secs.
3174 cache.run_pending_tasks().await;
3175
3176 cache.insert("b", "bob").await;
3177 cache.run_pending_tasks().await;
3178
3179 assert_eq!(cache.entry_count(), 2);
3180
3181 mock.increment(Duration::from_secs(2)); // 12 secs.
3182 cache.run_pending_tasks().await;
3183
3184 // contains_key does not reset the idle timer for the key.
3185 assert!(cache.contains_key(&"a"));
3186 assert!(cache.contains_key(&"b"));
3187 cache.run_pending_tasks().await;
3188
3189 assert_eq!(cache.entry_count(), 2);
3190
3191 mock.increment(Duration::from_secs(3)); // 15 secs.
3192 expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
3193
3194 assert_eq!(cache.get(&"a").await, None);
3195 assert_eq!(cache.get(&"b").await, Some("bob"));
3196 expiry_counters.incl_expected_reads();
3197 assert!(!cache.contains_key(&"a"));
3198 assert!(cache.contains_key(&"b"));
3199
3200 assert_eq!(cache.iter().count(), 1);
3201
3202 cache.run_pending_tasks().await;
3203 assert_eq!(cache.entry_count(), 1);
3204
3205 mock.increment(Duration::from_secs(10)); // 25 secs
3206 expected.push((Arc::new("b"), "bob", RemovalCause::Expired));
3207
3208 assert_eq!(cache.get(&"a").await, None);
3209 assert_eq!(cache.get(&"b").await, None);
3210 assert!(!cache.contains_key(&"a"));
3211 assert!(!cache.contains_key(&"b"));
3212
3213 assert_eq!(cache.iter().count(), 0);
3214
3215 cache.run_pending_tasks().await;
3216 assert!(cache.is_table_empty());
3217
3218 expiry_counters.verify();
3219 verify_notification_vec(&cache, actual, &expected).await;
3220 }
3221
3222 /// Verify that the `Expiry::expire_after_read()` method is called in `get_with`
3223 /// only when the key was already present in the cache.
3224 #[tokio::test]
3225 async fn test_expiry_using_get_with() {
3226 // Define an expiry type, which always return `None`.
3227 struct NoExpiry {
3228 counters: Arc<ExpiryCallCounters>,
3229 }
3230
3231 impl NoExpiry {
3232 fn new(counters: Arc<ExpiryCallCounters>) -> Self {
3233 Self { counters }
3234 }
3235 }
3236
3237 impl Expiry<&str, &str> for NoExpiry {
3238 fn expire_after_create(
3239 &self,
3240 _key: &&str,
3241 _value: &&str,
3242 _current_time: StdInstant,
3243 ) -> Option<Duration> {
3244 self.counters.incl_actual_creations();
3245 None
3246 }
3247
3248 fn expire_after_read(
3249 &self,
3250 _key: &&str,
3251 _value: &&str,
3252 _current_time: StdInstant,
3253 _current_duration: Option<Duration>,
3254 _last_modified_at: StdInstant,
3255 ) -> Option<Duration> {
3256 self.counters.incl_actual_reads();
3257 None
3258 }
3259
3260 fn expire_after_update(
3261 &self,
3262 _key: &&str,
3263 _value: &&str,
3264 _current_time: StdInstant,
3265 _current_duration: Option<Duration>,
3266 ) -> Option<Duration> {
3267 unreachable!("The `expire_after_update()` method should not be called.");
3268 }
3269 }
3270
3271 // Create expiry counters and the expiry.
3272 let expiry_counters = Arc::new(ExpiryCallCounters::default());
3273 let expiry = NoExpiry::new(Arc::clone(&expiry_counters));
3274
3275 // Create a cache with the expiry and eviction listener.
3276 let mut cache = Cache::builder()
3277 .max_capacity(100)
3278 .expire_after(expiry)
3279 .build();
3280 cache.reconfigure_for_testing().await;
3281
3282 // Make the cache exterior immutable.
3283 let cache = cache;
3284
3285 // The key is not present.
3286 cache.get_with("a", async { "alice" }).await;
3287 expiry_counters.incl_expected_creations();
3288 cache.run_pending_tasks().await;
3289
3290 // The key is present.
3291 cache.get_with("a", async { "alex" }).await;
3292 expiry_counters.incl_expected_reads();
3293 cache.run_pending_tasks().await;
3294
3295 // The key is not present.
3296 cache.invalidate("a").await;
3297 cache.get_with("a", async { "amanda" }).await;
3298 expiry_counters.incl_expected_creations();
3299 cache.run_pending_tasks().await;
3300
3301 expiry_counters.verify();
3302 }
3303
3304 // https://github.com/moka-rs/moka/issues/345
3305 #[tokio::test]
3306 async fn test_race_between_updating_entry_and_processing_its_write_ops() {
3307 let (clock, mock) = Clock::mock();
3308 let cache = Cache::builder()
3309 .max_capacity(2)
3310 .time_to_idle(Duration::from_secs(1))
3311 .clock(clock)
3312 .build();
3313
3314 cache.insert("a", "alice").await;
3315 cache.insert("b", "bob").await;
3316 cache.insert("c", "cathy").await; // c1
3317 mock.increment(Duration::from_secs(2));
3318
3319 // The following `insert` will do the followings:
3320 // 1. Replaces current "c" (c1) in the concurrent hash table (cht).
3321 // 2. Runs the pending tasks implicitly.
3322 // (1) "a" will be admitted.
3323 // (2) "b" will be admitted.
3324 // (3) c1 will be evicted by size constraint.
3325 // (4) "a" will be evicted due to expiration.
3326 // (5) "b" will be evicted due to expiration.
3327 // 3. Send its `WriteOp` log to the channel.
3328 cache.insert("c", "cindy").await; // c2
3329
3330 // Remove "c" (c2) from the cht.
3331 assert_eq!(cache.remove(&"c").await, Some("cindy")); // c-remove
3332
3333 mock.increment(Duration::from_secs(2));
3334
3335 // The following `run_pending_tasks` will do the followings:
3336 // 1. Admits "c" (c2) to the cache. (Create a node in the LRU deque)
3337 // 2. Because of c-remove, removes c2's node from the LRU deque.
3338 cache.run_pending_tasks().await;
3339 assert_eq!(cache.entry_count(), 0);
3340 }
3341
3342 #[tokio::test]
3343 async fn test_race_between_recreating_entry_and_processing_its_write_ops() {
3344 let cache = Cache::builder().max_capacity(2).build();
3345
3346 cache.insert('a', "a").await;
3347 cache.insert('b', "b").await;
3348 cache.run_pending_tasks().await;
3349
3350 cache.insert('c', "c1").await; // (a) `EntryInfo` 1, gen: 1
3351 assert!(cache.remove(&'a').await.is_some()); // (b)
3352 assert!(cache.remove(&'b').await.is_some()); // (c)
3353 assert!(cache.remove(&'c').await.is_some()); // (d) `EntryInfo` 1, gen: 2
3354 cache.insert('c', "c2").await; // (e) `EntryInfo` 2, gen: 1
3355
3356 // Now the `write_op_ch` channel contains the following `WriteOp`s:
3357 //
3358 // - 0: (a) insert "c1" (`EntryInfo` 1, gen: 1)
3359 // - 1: (b) remove "a"
3360 // - 2: (c) remove "b"
3361 // - 3: (d) remove "c1" (`EntryInfo` 1, gen: 2)
3362 // - 4: (e) insert "c2" (`EntryInfo` 2, gen: 1)
3363 //
3364 // 0 for "c1" is going to be rejected because the cache is full. Let's ensure
3365 // processing 0 must not remove "c2" from the concurrent hash table. (Their
3366 // gen are the same, but `EntryInfo`s are different)
3367 cache.run_pending_tasks().await;
3368 assert_eq!(cache.get(&'c').await, Some("c2"));
3369 }
3370
3371 #[tokio::test]
3372 async fn test_iter() {
3373 const NUM_KEYS: usize = 50;
3374
3375 fn make_value(key: usize) -> String {
3376 format!("val: {key}")
3377 }
3378
3379 let cache = Cache::builder()
3380 .max_capacity(100)
3381 .time_to_idle(Duration::from_secs(10))
3382 .build();
3383
3384 for key in 0..NUM_KEYS {
3385 cache.insert(key, make_value(key)).await;
3386 }
3387
3388 let mut key_set = std::collections::HashSet::new();
3389
3390 for (key, value) in &cache {
3391 assert_eq!(value, make_value(*key));
3392
3393 key_set.insert(*key);
3394 }
3395
3396 // Ensure there are no missing or duplicate keys in the iteration.
3397 assert_eq!(key_set.len(), NUM_KEYS);
3398 }
3399
3400 /// Runs 16 async tasks at the same time and ensures no deadlock occurs.
3401 ///
3402 /// - Eight of the task will update key-values in the cache.
3403 /// - Eight others will iterate the cache.
3404 ///
3405 #[tokio::test]
3406 async fn test_iter_multi_async_tasks() {
3407 use std::collections::HashSet;
3408
3409 const NUM_KEYS: usize = 1024;
3410 const NUM_TASKS: usize = 16;
3411
3412 fn make_value(key: usize) -> String {
3413 format!("val: {key}")
3414 }
3415
3416 let cache = Cache::builder()
3417 .max_capacity(2048)
3418 .time_to_idle(Duration::from_secs(10))
3419 .build();
3420
3421 // Initialize the cache.
3422 for key in 0..NUM_KEYS {
3423 cache.insert(key, make_value(key)).await;
3424 }
3425
3426 let rw_lock = Arc::new(tokio::sync::RwLock::<()>::default());
3427 let write_lock = rw_lock.write().await;
3428
3429 let tasks = (0..NUM_TASKS)
3430 .map(|n| {
3431 let cache = cache.clone();
3432 let rw_lock = Arc::clone(&rw_lock);
3433
3434 if n % 2 == 0 {
3435 // This thread will update the cache.
3436 tokio::spawn(async move {
3437 let read_lock = rw_lock.read().await;
3438 for key in 0..NUM_KEYS {
3439 // TODO: Update keys in a random order?
3440 cache.insert(key, make_value(key)).await;
3441 }
3442 std::mem::drop(read_lock);
3443 })
3444 } else {
3445 // This thread will iterate the cache.
3446 tokio::spawn(async move {
3447 let read_lock = rw_lock.read().await;
3448 let mut key_set = HashSet::new();
3449 // let mut key_count = 0usize;
3450 for (key, value) in &cache {
3451 assert_eq!(value, make_value(*key));
3452 key_set.insert(*key);
3453 // key_count += 1;
3454 }
3455 // Ensure there are no missing or duplicate keys in the iteration.
3456 assert_eq!(key_set.len(), NUM_KEYS);
3457 std::mem::drop(read_lock);
3458 })
3459 }
3460 })
3461 .collect::<Vec<_>>();
3462
3463 // Let these threads to run by releasing the write lock.
3464 std::mem::drop(write_lock);
3465
3466 let _ = futures_util::future::join_all(tasks).await;
3467
3468 // Ensure there are no missing or duplicate keys in the iteration.
3469 let key_set = cache.iter().map(|(k, _v)| *k).collect::<HashSet<_>>();
3470 assert_eq!(key_set.len(), NUM_KEYS);
3471 }
3472
3473 #[tokio::test]
3474 async fn get_with() {
3475 let cache = Cache::new(100);
3476 const KEY: u32 = 0;
3477
3478 // This test will run five async tasks:
3479 //
3480 // Task1 will be the first task to call `get_with` for a key, so its async
3481 // block will be evaluated and then a &str value "task1" will be inserted to
3482 // the cache.
3483 let task1 = {
3484 let cache1 = cache.clone();
3485 async move {
3486 // Call `get_with` immediately.
3487 let v = cache1
3488 .get_with(KEY, async {
3489 // Wait for 300 ms and return a &str value.
3490 sleep(Duration::from_millis(300)).await;
3491 "task1"
3492 })
3493 .await;
3494 assert_eq!(v, "task1");
3495 }
3496 };
3497
3498 // Task2 will be the second task to call `get_with` for the same key, so its
3499 // async block will not be evaluated. Once task1's async block finishes, it
3500 // will get the value inserted by task1's async block.
3501 let task2 = {
3502 let cache2 = cache.clone();
3503 async move {
3504 // Wait for 100 ms before calling `get_with`.
3505 sleep(Duration::from_millis(100)).await;
3506 let v = cache2.get_with(KEY, async { unreachable!() }).await;
3507 assert_eq!(v, "task1");
3508 }
3509 };
3510
3511 // Task3 will be the third task to call `get_with` for the same key. By the
3512 // time it calls, task1's async block should have finished already and the
3513 // value should be already inserted to the cache. So its async block will not
3514 // be evaluated and will get the value inserted by task1's async block
3515 // immediately.
3516 let task3 = {
3517 let cache3 = cache.clone();
3518 async move {
3519 // Wait for 400 ms before calling `get_with`.
3520 sleep(Duration::from_millis(400)).await;
3521 let v = cache3.get_with(KEY, async { unreachable!() }).await;
3522 assert_eq!(v, "task1");
3523 }
3524 };
3525
3526 // Task4 will call `get` for the same key. It will call when task1's async
3527 // block is still running, so it will get none for the key.
3528 let task4 = {
3529 let cache4 = cache.clone();
3530 async move {
3531 // Wait for 200 ms before calling `get`.
3532 sleep(Duration::from_millis(200)).await;
3533 let maybe_v = cache4.get(&KEY).await;
3534 assert!(maybe_v.is_none());
3535 }
3536 };
3537
3538 // Task5 will call `get` for the same key. It will call after task1's async
3539 // block finished, so it will get the value insert by task1's async block.
3540 let task5 = {
3541 let cache5 = cache.clone();
3542 async move {
3543 // Wait for 400 ms before calling `get`.
3544 sleep(Duration::from_millis(400)).await;
3545 let maybe_v = cache5.get(&KEY).await;
3546 assert_eq!(maybe_v, Some("task1"));
3547 }
3548 };
3549
3550 futures_util::join!(task1, task2, task3, task4, task5);
3551
3552 assert!(cache.is_waiter_map_empty());
3553 }
3554
3555 #[tokio::test]
3556 async fn get_with_by_ref() {
3557 let cache = Cache::new(100);
3558 const KEY: &u32 = &0;
3559
3560 // This test will run five async tasks:
3561 //
3562 // Task1 will be the first task to call `get_with_by_ref` for a key, so its async
3563 // block will be evaluated and then a &str value "task1" will be inserted to
3564 // the cache.
3565 let task1 = {
3566 let cache1 = cache.clone();
3567 async move {
3568 // Call `get_with_by_ref` immediately.
3569 let v = cache1
3570 .get_with_by_ref(KEY, async {
3571 // Wait for 300 ms and return a &str value.
3572 sleep(Duration::from_millis(300)).await;
3573 "task1"
3574 })
3575 .await;
3576 assert_eq!(v, "task1");
3577 }
3578 };
3579
3580 // Task2 will be the second task to call `get_with_by_ref` for the same key, so its
3581 // async block will not be evaluated. Once task1's async block finishes, it
3582 // will get the value inserted by task1's async block.
3583 let task2 = {
3584 let cache2 = cache.clone();
3585 async move {
3586 // Wait for 100 ms before calling `get_with_by_ref`.
3587 sleep(Duration::from_millis(100)).await;
3588 let v = cache2.get_with_by_ref(KEY, async { unreachable!() }).await;
3589 assert_eq!(v, "task1");
3590 }
3591 };
3592
3593 // Task3 will be the third task to call `get_with_by_ref` for the same key. By the
3594 // time it calls, task1's async block should have finished already and the
3595 // value should be already inserted to the cache. So its async block will not
3596 // be evaluated and will get the value inserted by task1's async block
3597 // immediately.
3598 let task3 = {
3599 let cache3 = cache.clone();
3600 async move {
3601 // Wait for 400 ms before calling `get_with_by_ref`.
3602 sleep(Duration::from_millis(400)).await;
3603 let v = cache3.get_with_by_ref(KEY, async { unreachable!() }).await;
3604 assert_eq!(v, "task1");
3605 }
3606 };
3607
3608 // Task4 will call `get` for the same key. It will call when task1's async
3609 // block is still running, so it will get none for the key.
3610 let task4 = {
3611 let cache4 = cache.clone();
3612 async move {
3613 // Wait for 200 ms before calling `get`.
3614 sleep(Duration::from_millis(200)).await;
3615 let maybe_v = cache4.get(KEY).await;
3616 assert!(maybe_v.is_none());
3617 }
3618 };
3619
3620 // Task5 will call `get` for the same key. It will call after task1's async
3621 // block finished, so it will get the value insert by task1's async block.
3622 let task5 = {
3623 let cache5 = cache.clone();
3624 async move {
3625 // Wait for 400 ms before calling `get`.
3626 sleep(Duration::from_millis(400)).await;
3627 let maybe_v = cache5.get(KEY).await;
3628 assert_eq!(maybe_v, Some("task1"));
3629 }
3630 };
3631
3632 futures_util::join!(task1, task2, task3, task4, task5);
3633
3634 assert!(cache.is_waiter_map_empty());
3635 }
3636
3637 #[tokio::test]
3638 async fn entry_or_insert_with_if() {
3639 let cache = Cache::new(100);
3640 const KEY: u32 = 0;
3641
3642 // This test will run seven async tasks:
3643 //
3644 // Task1 will be the first task to call `or_insert_with_if` for a key, so its
3645 // async block will be evaluated and then a &str value "task1" will be
3646 // inserted to the cache.
3647 let task1 = {
3648 let cache1 = cache.clone();
3649 async move {
3650 // Call `or_insert_with_if` immediately.
3651 let entry = cache1
3652 .entry(KEY)
3653 .or_insert_with_if(
3654 async {
3655 // Wait for 300 ms and return a &str value.
3656 sleep(Duration::from_millis(300)).await;
3657 "task1"
3658 },
3659 |_v| unreachable!(),
3660 )
3661 .await;
3662 // Entry should be fresh because our async block should have been
3663 // evaluated.
3664 assert!(entry.is_fresh());
3665 assert_eq!(entry.into_value(), "task1");
3666 }
3667 };
3668
3669 // Task2 will be the second task to call `or_insert_with_if` for the same
3670 // key, so its async block will not be evaluated. Once task1's async block
3671 // finishes, it will get the value inserted by task1's async block.
3672 let task2 = {
3673 let cache2 = cache.clone();
3674 async move {
3675 // Wait for 100 ms before calling `or_insert_with_if`.
3676 sleep(Duration::from_millis(100)).await;
3677 let entry = cache2
3678 .entry(KEY)
3679 .or_insert_with_if(async { unreachable!() }, |_v| unreachable!())
3680 .await;
3681 // Entry should not be fresh because task1's async block should have
3682 // been evaluated instead of ours.
3683 assert!(!entry.is_fresh());
3684 assert_eq!(entry.into_value(), "task1");
3685 }
3686 };
3687
3688 // Task3 will be the third task to call `or_insert_with_if` for the same key.
3689 // By the time it calls, task1's async block should have finished already and
3690 // the value should be already inserted to the cache. Also task3's
3691 // `replace_if` closure returns `false`. So its async block will not be
3692 // evaluated and will get the value inserted by task1's async block
3693 // immediately.
3694 let task3 = {
3695 let cache3 = cache.clone();
3696 async move {
3697 // Wait for 350 ms before calling `or_insert_with_if`.
3698 sleep(Duration::from_millis(350)).await;
3699 let entry = cache3
3700 .entry(KEY)
3701 .or_insert_with_if(async { unreachable!() }, |v| {
3702 assert_eq!(v, &"task1");
3703 false
3704 })
3705 .await;
3706 assert!(!entry.is_fresh());
3707 assert_eq!(entry.into_value(), "task1");
3708 }
3709 };
3710
3711 // Task4 will be the fourth task to call `or_insert_with_if` for the same
3712 // key. The value should have been already inserted to the cache by task1.
3713 // However task4's `replace_if` closure returns `true`. So its async block
3714 // will be evaluated to replace the current value.
3715 let task4 = {
3716 let cache4 = cache.clone();
3717 async move {
3718 // Wait for 400 ms before calling `or_insert_with_if`.
3719 sleep(Duration::from_millis(400)).await;
3720 let entry = cache4
3721 .entry(KEY)
3722 .or_insert_with_if(async { "task4" }, |v| {
3723 assert_eq!(v, &"task1");
3724 true
3725 })
3726 .await;
3727 assert!(entry.is_fresh());
3728 assert_eq!(entry.into_value(), "task4");
3729 }
3730 };
3731
3732 // Task5 will call `get` for the same key. It will call when task1's async
3733 // block is still running, so it will get none for the key.
3734 let task5 = {
3735 let cache5 = cache.clone();
3736 async move {
3737 // Wait for 200 ms before calling `get`.
3738 sleep(Duration::from_millis(200)).await;
3739 let maybe_v = cache5.get(&KEY).await;
3740 assert!(maybe_v.is_none());
3741 }
3742 };
3743
3744 // Task6 will call `get` for the same key. It will call after task1's async
3745 // block finished, so it will get the value insert by task1's async block.
3746 let task6 = {
3747 let cache6 = cache.clone();
3748 async move {
3749 // Wait for 350 ms before calling `get`.
3750 sleep(Duration::from_millis(350)).await;
3751 let maybe_v = cache6.get(&KEY).await;
3752 assert_eq!(maybe_v, Some("task1"));
3753 }
3754 };
3755
3756 // Task7 will call `get` for the same key. It will call after task4's async
3757 // block finished, so it will get the value insert by task4's async block.
3758 let task7 = {
3759 let cache7 = cache.clone();
3760 async move {
3761 // Wait for 450 ms before calling `get`.
3762 sleep(Duration::from_millis(450)).await;
3763 let maybe_v = cache7.get(&KEY).await;
3764 assert_eq!(maybe_v, Some("task4"));
3765 }
3766 };
3767
3768 futures_util::join!(task1, task2, task3, task4, task5, task6, task7);
3769
3770 assert!(cache.is_waiter_map_empty());
3771 }
3772
3773 #[tokio::test]
3774 async fn entry_by_ref_or_insert_with_if() {
3775 let cache = Cache::new(100);
3776 const KEY: &u32 = &0;
3777
3778 // This test will run seven async tasks:
3779 //
3780 // Task1 will be the first task to call `or_insert_with_if` for a key, so its
3781 // async block will be evaluated and then a &str value "task1" will be
3782 // inserted to the cache.
3783 let task1 = {
3784 let cache1 = cache.clone();
3785 async move {
3786 // Call `or_insert_with_if` immediately.
3787 let entry = cache1
3788 .entry_by_ref(KEY)
3789 .or_insert_with_if(
3790 async {
3791 // Wait for 300 ms and return a &str value.
3792 sleep(Duration::from_millis(300)).await;
3793 "task1"
3794 },
3795 |_v| unreachable!(),
3796 )
3797 .await;
3798 // Entry should be fresh because our async block should have been
3799 // evaluated.
3800 assert!(entry.is_fresh());
3801 assert_eq!(entry.into_value(), "task1");
3802 }
3803 };
3804
3805 // Task2 will be the second task to call `or_insert_with_if` for the same
3806 // key, so its async block will not be evaluated. Once task1's async block
3807 // finishes, it will get the value inserted by task1's async block.
3808 let task2 = {
3809 let cache2 = cache.clone();
3810 async move {
3811 // Wait for 100 ms before calling `or_insert_with_if`.
3812 sleep(Duration::from_millis(100)).await;
3813 let entry = cache2
3814 .entry_by_ref(KEY)
3815 .or_insert_with_if(async { unreachable!() }, |_v| unreachable!())
3816 .await;
3817 // Entry should not be fresh because task1's async block should have
3818 // been evaluated instead of ours.
3819 assert!(!entry.is_fresh());
3820 assert_eq!(entry.into_value(), "task1");
3821 }
3822 };
3823
3824 // Task3 will be the third task to call `or_insert_with_if` for the same key.
3825 // By the time it calls, task1's async block should have finished already and
3826 // the value should be already inserted to the cache. Also task3's
3827 // `replace_if` closure returns `false`. So its async block will not be
3828 // evaluated and will get the value inserted by task1's async block
3829 // immediately.
3830 let task3 = {
3831 let cache3 = cache.clone();
3832 async move {
3833 // Wait for 350 ms before calling `or_insert_with_if`.
3834 sleep(Duration::from_millis(350)).await;
3835 let entry = cache3
3836 .entry_by_ref(KEY)
3837 .or_insert_with_if(async { unreachable!() }, |v| {
3838 assert_eq!(v, &"task1");
3839 false
3840 })
3841 .await;
3842 assert!(!entry.is_fresh());
3843 assert_eq!(entry.into_value(), "task1");
3844 }
3845 };
3846
3847 // Task4 will be the fourth task to call `or_insert_with_if` for the same
3848 // key. The value should have been already inserted to the cache by task1.
3849 // However task4's `replace_if` closure returns `true`. So its async block
3850 // will be evaluated to replace the current value.
3851 let task4 = {
3852 let cache4 = cache.clone();
3853 async move {
3854 // Wait for 400 ms before calling `or_insert_with_if`.
3855 sleep(Duration::from_millis(400)).await;
3856 let entry = cache4
3857 .entry_by_ref(KEY)
3858 .or_insert_with_if(async { "task4" }, |v| {
3859 assert_eq!(v, &"task1");
3860 true
3861 })
3862 .await;
3863 assert!(entry.is_fresh());
3864 assert_eq!(entry.into_value(), "task4");
3865 }
3866 };
3867
3868 // Task5 will call `get` for the same key. It will call when task1's async
3869 // block is still running, so it will get none for the key.
3870 let task5 = {
3871 let cache5 = cache.clone();
3872 async move {
3873 // Wait for 200 ms before calling `get`.
3874 sleep(Duration::from_millis(200)).await;
3875 let maybe_v = cache5.get(KEY).await;
3876 assert!(maybe_v.is_none());
3877 }
3878 };
3879
3880 // Task6 will call `get` for the same key. It will call after task1's async
3881 // block finished, so it will get the value insert by task1's async block.
3882 let task6 = {
3883 let cache6 = cache.clone();
3884 async move {
3885 // Wait for 350 ms before calling `get`.
3886 sleep(Duration::from_millis(350)).await;
3887 let maybe_v = cache6.get(KEY).await;
3888 assert_eq!(maybe_v, Some("task1"));
3889 }
3890 };
3891
3892 // Task7 will call `get` for the same key. It will call after task4's async
3893 // block finished, so it will get the value insert by task4's async block.
3894 let task7 = {
3895 let cache7 = cache.clone();
3896 async move {
3897 // Wait for 450 ms before calling `get`.
3898 sleep(Duration::from_millis(450)).await;
3899 let maybe_v = cache7.get(KEY).await;
3900 assert_eq!(maybe_v, Some("task4"));
3901 }
3902 };
3903
3904 futures_util::join!(task1, task2, task3, task4, task5, task6, task7);
3905
3906 assert!(cache.is_waiter_map_empty());
3907 }
3908
3909 #[tokio::test]
3910 async fn try_get_with() {
3911 use std::sync::Arc;
3912
3913 // Note that MyError does not implement std::error::Error trait
3914 // like anyhow::Error.
3915 #[derive(Debug)]
3916 pub struct MyError(#[allow(dead_code)] String);
3917
3918 type MyResult<T> = Result<T, Arc<MyError>>;
3919
3920 let cache = Cache::new(100);
3921 const KEY: u32 = 0;
3922
3923 // This test will run eight async tasks:
3924 //
3925 // Task1 will be the first task to call `get_with` for a key, so its async
3926 // block will be evaluated and then an error will be returned. Nothing will
3927 // be inserted to the cache.
3928 let task1 = {
3929 let cache1 = cache.clone();
3930 async move {
3931 // Call `try_get_with` immediately.
3932 let v = cache1
3933 .try_get_with(KEY, async {
3934 // Wait for 300 ms and return an error.
3935 sleep(Duration::from_millis(300)).await;
3936 Err(MyError("task1 error".into()))
3937 })
3938 .await;
3939 assert!(v.is_err());
3940 }
3941 };
3942
3943 // Task2 will be the second task to call `get_with` for the same key, so its
3944 // async block will not be evaluated. Once task1's async block finishes, it
3945 // will get the same error value returned by task1's async block.
3946 let task2 = {
3947 let cache2 = cache.clone();
3948 async move {
3949 // Wait for 100 ms before calling `try_get_with`.
3950 sleep(Duration::from_millis(100)).await;
3951 let v: MyResult<_> = cache2.try_get_with(KEY, async { unreachable!() }).await;
3952 assert!(v.is_err());
3953 }
3954 };
3955
3956 // Task3 will be the third task to call `get_with` for the same key. By the
3957 // time it calls, task1's async block should have finished already, but the
3958 // key still does not exist in the cache. So its async block will be
3959 // evaluated and then an okay &str value will be returned. That value will be
3960 // inserted to the cache.
3961 let task3 = {
3962 let cache3 = cache.clone();
3963 async move {
3964 // Wait for 400 ms before calling `try_get_with`.
3965 sleep(Duration::from_millis(400)).await;
3966 let v: MyResult<_> = cache3
3967 .try_get_with(KEY, async {
3968 // Wait for 300 ms and return an Ok(&str) value.
3969 sleep(Duration::from_millis(300)).await;
3970 Ok("task3")
3971 })
3972 .await;
3973 assert_eq!(v.unwrap(), "task3");
3974 }
3975 };
3976
3977 // Task4 will be the fourth task to call `get_with` for the same key. So its
3978 // async block will not be evaluated. Once task3's async block finishes, it
3979 // will get the same okay &str value.
3980 let task4 = {
3981 let cache4 = cache.clone();
3982 async move {
3983 // Wait for 500 ms before calling `try_get_with`.
3984 sleep(Duration::from_millis(500)).await;
3985 let v: MyResult<_> = cache4.try_get_with(KEY, async { unreachable!() }).await;
3986 assert_eq!(v.unwrap(), "task3");
3987 }
3988 };
3989
3990 // Task5 will be the fifth task to call `get_with` for the same key. So its
3991 // async block will not be evaluated. By the time it calls, task3's async
3992 // block should have finished already, so its async block will not be
3993 // evaluated and will get the value insert by task3's async block
3994 // immediately.
3995 let task5 = {
3996 let cache5 = cache.clone();
3997 async move {
3998 // Wait for 800 ms before calling `try_get_with`.
3999 sleep(Duration::from_millis(800)).await;
4000 let v: MyResult<_> = cache5.try_get_with(KEY, async { unreachable!() }).await;
4001 assert_eq!(v.unwrap(), "task3");
4002 }
4003 };
4004
4005 // Task6 will call `get` for the same key. It will call when task1's async
4006 // block is still running, so it will get none for the key.
4007 let task6 = {
4008 let cache6 = cache.clone();
4009 async move {
4010 // Wait for 200 ms before calling `get`.
4011 sleep(Duration::from_millis(200)).await;
4012 let maybe_v = cache6.get(&KEY).await;
4013 assert!(maybe_v.is_none());
4014 }
4015 };
4016
4017 // Task7 will call `get` for the same key. It will call after task1's async
4018 // block finished with an error. So it will get none for the key.
4019 let task7 = {
4020 let cache7 = cache.clone();
4021 async move {
4022 // Wait for 400 ms before calling `get`.
4023 sleep(Duration::from_millis(400)).await;
4024 let maybe_v = cache7.get(&KEY).await;
4025 assert!(maybe_v.is_none());
4026 }
4027 };
4028
4029 // Task8 will call `get` for the same key. It will call after task3's async
4030 // block finished, so it will get the value insert by task3's async block.
4031 let task8 = {
4032 let cache8 = cache.clone();
4033 async move {
4034 // Wait for 800 ms before calling `get`.
4035 sleep(Duration::from_millis(800)).await;
4036 let maybe_v = cache8.get(&KEY).await;
4037 assert_eq!(maybe_v, Some("task3"));
4038 }
4039 };
4040
4041 futures_util::join!(task1, task2, task3, task4, task5, task6, task7, task8);
4042
4043 assert!(cache.is_waiter_map_empty());
4044 }
4045
4046 #[tokio::test]
4047 async fn try_get_with_by_ref() {
4048 use std::sync::Arc;
4049
4050 // Note that MyError does not implement std::error::Error trait
4051 // like anyhow::Error.
4052 #[derive(Debug)]
4053 pub struct MyError(#[allow(dead_code)] String);
4054
4055 type MyResult<T> = Result<T, Arc<MyError>>;
4056
4057 let cache = Cache::new(100);
4058 const KEY: &u32 = &0;
4059
4060 // This test will run eight async tasks:
4061 //
4062 // Task1 will be the first task to call `try_get_with_by_ref` for a key, so
4063 // its async block will be evaluated and then an error will be returned.
4064 // Nothing will be inserted to the cache.
4065 let task1 = {
4066 let cache1 = cache.clone();
4067 async move {
4068 // Call `try_get_with_by_ref` immediately.
4069 let v = cache1
4070 .try_get_with_by_ref(KEY, async {
4071 // Wait for 300 ms and return an error.
4072 sleep(Duration::from_millis(300)).await;
4073 Err(MyError("task1 error".into()))
4074 })
4075 .await;
4076 assert!(v.is_err());
4077 }
4078 };
4079
4080 // Task2 will be the second task to call `get_with` for the same key, so its
4081 // async block will not be evaluated. Once task1's async block finishes, it
4082 // will get the same error value returned by task1's async block.
4083 let task2 = {
4084 let cache2 = cache.clone();
4085 async move {
4086 // Wait for 100 ms before calling `try_get_with_by_ref`.
4087 sleep(Duration::from_millis(100)).await;
4088 let v: MyResult<_> = cache2
4089 .try_get_with_by_ref(KEY, async { unreachable!() })
4090 .await;
4091 assert!(v.is_err());
4092 }
4093 };
4094
4095 // Task3 will be the third task to call `get_with` for the same key. By the
4096 // time it calls, task1's async block should have finished already, but the
4097 // key still does not exist in the cache. So its async block will be
4098 // evaluated and then an okay &str value will be returned. That value will be
4099 // inserted to the cache.
4100 let task3 = {
4101 let cache3 = cache.clone();
4102 async move {
4103 // Wait for 400 ms before calling `try_get_with_by_ref`.
4104 sleep(Duration::from_millis(400)).await;
4105 let v: MyResult<_> = cache3
4106 .try_get_with_by_ref(KEY, async {
4107 // Wait for 300 ms and return an Ok(&str) value.
4108 sleep(Duration::from_millis(300)).await;
4109 Ok("task3")
4110 })
4111 .await;
4112 assert_eq!(v.unwrap(), "task3");
4113 }
4114 };
4115
4116 // Task4 will be the fourth task to call `get_with` for the same key. So its
4117 // async block will not be evaluated. Once task3's async block finishes, it
4118 // will get the same okay &str value.
4119 let task4 = {
4120 let cache4 = cache.clone();
4121 async move {
4122 // Wait for 500 ms before calling `try_get_with_by_ref`.
4123 sleep(Duration::from_millis(500)).await;
4124 let v: MyResult<_> = cache4
4125 .try_get_with_by_ref(KEY, async { unreachable!() })
4126 .await;
4127 assert_eq!(v.unwrap(), "task3");
4128 }
4129 };
4130
4131 // Task5 will be the fifth task to call `get_with` for the same key. So its
4132 // async block will not be evaluated. By the time it calls, task3's async
4133 // block should have finished already, so its async block will not be
4134 // evaluated and will get the value insert by task3's async block
4135 // immediately.
4136 let task5 = {
4137 let cache5 = cache.clone();
4138 async move {
4139 // Wait for 800 ms before calling `try_get_with_by_ref`.
4140 sleep(Duration::from_millis(800)).await;
4141 let v: MyResult<_> = cache5
4142 .try_get_with_by_ref(KEY, async { unreachable!() })
4143 .await;
4144 assert_eq!(v.unwrap(), "task3");
4145 }
4146 };
4147
4148 // Task6 will call `get` for the same key. It will call when task1's async
4149 // block is still running, so it will get none for the key.
4150 let task6 = {
4151 let cache6 = cache.clone();
4152 async move {
4153 // Wait for 200 ms before calling `get`.
4154 sleep(Duration::from_millis(200)).await;
4155 let maybe_v = cache6.get(KEY).await;
4156 assert!(maybe_v.is_none());
4157 }
4158 };
4159
4160 // Task7 will call `get` for the same key. It will call after task1's async
4161 // block finished with an error. So it will get none for the key.
4162 let task7 = {
4163 let cache7 = cache.clone();
4164 async move {
4165 // Wait for 400 ms before calling `get`.
4166 sleep(Duration::from_millis(400)).await;
4167 let maybe_v = cache7.get(KEY).await;
4168 assert!(maybe_v.is_none());
4169 }
4170 };
4171
4172 // Task8 will call `get` for the same key. It will call after task3's async
4173 // block finished, so it will get the value insert by task3's async block.
4174 let task8 = {
4175 let cache8 = cache.clone();
4176 async move {
4177 // Wait for 800 ms before calling `get`.
4178 sleep(Duration::from_millis(800)).await;
4179 let maybe_v = cache8.get(KEY).await;
4180 assert_eq!(maybe_v, Some("task3"));
4181 }
4182 };
4183
4184 futures_util::join!(task1, task2, task3, task4, task5, task6, task7, task8);
4185
4186 assert!(cache.is_waiter_map_empty());
4187 }
4188
4189 #[tokio::test]
4190 async fn optionally_get_with() {
4191 let cache = Cache::new(100);
4192 const KEY: u32 = 0;
4193
4194 // This test will run eight async tasks:
4195 //
4196 // Task1 will be the first task to call `optionally_get_with` for a key,
4197 // so its async block will be evaluated and then an None will be
4198 // returned. Nothing will be inserted to the cache.
4199 let task1 = {
4200 let cache1 = cache.clone();
4201 async move {
4202 // Call `try_get_with` immediately.
4203 let v = cache1
4204 .optionally_get_with(KEY, async {
4205 // Wait for 300 ms and return an None.
4206 sleep(Duration::from_millis(300)).await;
4207 None
4208 })
4209 .await;
4210 assert!(v.is_none());
4211 }
4212 };
4213
4214 // Task2 will be the second task to call `optionally_get_with` for the same
4215 // key, so its async block will not be evaluated. Once task1's async block
4216 // finishes, it will get the same error value returned by task1's async
4217 // block.
4218 let task2 = {
4219 let cache2 = cache.clone();
4220 async move {
4221 // Wait for 100 ms before calling `optionally_get_with`.
4222 sleep(Duration::from_millis(100)).await;
4223 let v = cache2
4224 .optionally_get_with(KEY, async { unreachable!() })
4225 .await;
4226 assert!(v.is_none());
4227 }
4228 };
4229
4230 // Task3 will be the third task to call `optionally_get_with` for the
4231 // same key. By the time it calls, task1's async block should have
4232 // finished already, but the key still does not exist in the cache. So
4233 // its async block will be evaluated and then an okay &str value will be
4234 // returned. That value will be inserted to the cache.
4235 let task3 = {
4236 let cache3 = cache.clone();
4237 async move {
4238 // Wait for 400 ms before calling `optionally_get_with`.
4239 sleep(Duration::from_millis(400)).await;
4240 let v = cache3
4241 .optionally_get_with(KEY, async {
4242 // Wait for 300 ms and return an Some(&str) value.
4243 sleep(Duration::from_millis(300)).await;
4244 Some("task3")
4245 })
4246 .await;
4247 assert_eq!(v.unwrap(), "task3");
4248 }
4249 };
4250
4251 // Task4 will be the fourth task to call `optionally_get_with` for the
4252 // same key. So its async block will not be evaluated. Once task3's
4253 // async block finishes, it will get the same okay &str value.
4254 let task4 = {
4255 let cache4 = cache.clone();
4256 async move {
4257 // Wait for 500 ms before calling `try_get_with`.
4258 sleep(Duration::from_millis(500)).await;
4259 let v = cache4
4260 .optionally_get_with(KEY, async { unreachable!() })
4261 .await;
4262 assert_eq!(v.unwrap(), "task3");
4263 }
4264 };
4265
4266 // Task5 will be the fifth task to call `optionally_get_with` for the
4267 // same key. So its async block will not be evaluated. By the time it
4268 // calls, task3's async block should have finished already, so its async
4269 // block will not be evaluated and will get the value insert by task3's
4270 // async block immediately.
4271 let task5 = {
4272 let cache5 = cache.clone();
4273 async move {
4274 // Wait for 800 ms before calling `optionally_get_with`.
4275 sleep(Duration::from_millis(800)).await;
4276 let v = cache5
4277 .optionally_get_with(KEY, async { unreachable!() })
4278 .await;
4279 assert_eq!(v.unwrap(), "task3");
4280 }
4281 };
4282
4283 // Task6 will call `get` for the same key. It will call when task1's async
4284 // block is still running, so it will get none for the key.
4285 let task6 = {
4286 let cache6 = cache.clone();
4287 async move {
4288 // Wait for 200 ms before calling `get`.
4289 sleep(Duration::from_millis(200)).await;
4290 let maybe_v = cache6.get(&KEY).await;
4291 assert!(maybe_v.is_none());
4292 }
4293 };
4294
4295 // Task7 will call `get` for the same key. It will call after task1's async
4296 // block finished with an error. So it will get none for the key.
4297 let task7 = {
4298 let cache7 = cache.clone();
4299 async move {
4300 // Wait for 400 ms before calling `get`.
4301 sleep(Duration::from_millis(400)).await;
4302 let maybe_v = cache7.get(&KEY).await;
4303 assert!(maybe_v.is_none());
4304 }
4305 };
4306
4307 // Task8 will call `get` for the same key. It will call after task3's async
4308 // block finished, so it will get the value insert by task3's async block.
4309 let task8 = {
4310 let cache8 = cache.clone();
4311 async move {
4312 // Wait for 800 ms before calling `get`.
4313 sleep(Duration::from_millis(800)).await;
4314 let maybe_v = cache8.get(&KEY).await;
4315 assert_eq!(maybe_v, Some("task3"));
4316 }
4317 };
4318
4319 futures_util::join!(task1, task2, task3, task4, task5, task6, task7, task8);
4320 }
4321
4322 #[tokio::test]
4323 async fn optionally_get_with_by_ref() {
4324 let cache = Cache::new(100);
4325 const KEY: &u32 = &0;
4326
4327 // This test will run eight async tasks:
4328 //
4329 // Task1 will be the first task to call `optionally_get_with_by_ref` for a
4330 // key, so its async block will be evaluated and then an None will be
4331 // returned. Nothing will be inserted to the cache.
4332 let task1 = {
4333 let cache1 = cache.clone();
4334 async move {
4335 // Call `try_get_with` immediately.
4336 let v = cache1
4337 .optionally_get_with_by_ref(KEY, async {
4338 // Wait for 300 ms and return an None.
4339 sleep(Duration::from_millis(300)).await;
4340 None
4341 })
4342 .await;
4343 assert!(v.is_none());
4344 }
4345 };
4346
4347 // Task2 will be the second task to call `optionally_get_with_by_ref` for the
4348 // same key, so its async block will not be evaluated. Once task1's async
4349 // block finishes, it will get the same error value returned by task1's async
4350 // block.
4351 let task2 = {
4352 let cache2 = cache.clone();
4353 async move {
4354 // Wait for 100 ms before calling `optionally_get_with_by_ref`.
4355 sleep(Duration::from_millis(100)).await;
4356 let v = cache2
4357 .optionally_get_with_by_ref(KEY, async { unreachable!() })
4358 .await;
4359 assert!(v.is_none());
4360 }
4361 };
4362
4363 // Task3 will be the third task to call `optionally_get_with_by_ref` for the
4364 // same key. By the time it calls, task1's async block should have
4365 // finished already, but the key still does not exist in the cache. So
4366 // its async block will be evaluated and then an okay &str value will be
4367 // returned. That value will be inserted to the cache.
4368 let task3 = {
4369 let cache3 = cache.clone();
4370 async move {
4371 // Wait for 400 ms before calling `optionally_get_with_by_ref`.
4372 sleep(Duration::from_millis(400)).await;
4373 let v = cache3
4374 .optionally_get_with_by_ref(KEY, async {
4375 // Wait for 300 ms and return an Some(&str) value.
4376 sleep(Duration::from_millis(300)).await;
4377 Some("task3")
4378 })
4379 .await;
4380 assert_eq!(v.unwrap(), "task3");
4381 }
4382 };
4383
4384 // Task4 will be the fourth task to call `optionally_get_with_by_ref` for the
4385 // same key. So its async block will not be evaluated. Once task3's
4386 // async block finishes, it will get the same okay &str value.
4387 let task4 = {
4388 let cache4 = cache.clone();
4389 async move {
4390 // Wait for 500 ms before calling `try_get_with`.
4391 sleep(Duration::from_millis(500)).await;
4392 let v = cache4
4393 .optionally_get_with_by_ref(KEY, async { unreachable!() })
4394 .await;
4395 assert_eq!(v.unwrap(), "task3");
4396 }
4397 };
4398
4399 // Task5 will be the fifth task to call `optionally_get_with_by_ref` for the
4400 // same key. So its async block will not be evaluated. By the time it
4401 // calls, task3's async block should have finished already, so its async
4402 // block will not be evaluated and will get the value insert by task3's
4403 // async block immediately.
4404 let task5 = {
4405 let cache5 = cache.clone();
4406 async move {
4407 // Wait for 800 ms before calling `optionally_get_with_by_ref`.
4408 sleep(Duration::from_millis(800)).await;
4409 let v = cache5
4410 .optionally_get_with_by_ref(KEY, async { unreachable!() })
4411 .await;
4412 assert_eq!(v.unwrap(), "task3");
4413 }
4414 };
4415
4416 // Task6 will call `get` for the same key. It will call when task1's async
4417 // block is still running, so it will get none for the key.
4418 let task6 = {
4419 let cache6 = cache.clone();
4420 async move {
4421 // Wait for 200 ms before calling `get`.
4422 sleep(Duration::from_millis(200)).await;
4423 let maybe_v = cache6.get(KEY).await;
4424 assert!(maybe_v.is_none());
4425 }
4426 };
4427
4428 // Task7 will call `get` for the same key. It will call after task1's async
4429 // block finished with an error. So it will get none for the key.
4430 let task7 = {
4431 let cache7 = cache.clone();
4432 async move {
4433 // Wait for 400 ms before calling `get`.
4434 sleep(Duration::from_millis(400)).await;
4435 let maybe_v = cache7.get(KEY).await;
4436 assert!(maybe_v.is_none());
4437 }
4438 };
4439
4440 // Task8 will call `get` for the same key. It will call after task3's async
4441 // block finished, so it will get the value insert by task3's async block.
4442 let task8 = {
4443 let cache8 = cache.clone();
4444 async move {
4445 // Wait for 800 ms before calling `get`.
4446 sleep(Duration::from_millis(800)).await;
4447 let maybe_v = cache8.get(KEY).await;
4448 assert_eq!(maybe_v, Some("task3"));
4449 }
4450 };
4451
4452 futures_util::join!(task1, task2, task3, task4, task5, task6, task7, task8);
4453
4454 assert!(cache.is_waiter_map_empty());
4455 }
4456
4457 #[tokio::test]
4458 async fn upsert_with() {
4459 let cache = Cache::new(100);
4460 const KEY: u32 = 0;
4461
4462 // Spawn three async tasks to call `and_upsert_with` for the same key and
4463 // each task increments the current value by 1. Ensure the key-level lock is
4464 // working by verifying the value is 3 after all tasks finish.
4465 //
4466 // | | task 1 | task 2 | task 3 |
4467 // |--------|----------|----------|----------|
4468 // | 0 ms | get none | | |
4469 // | 100 ms | | blocked | |
4470 // | 200 ms | insert 1 | | |
4471 // | | | get 1 | |
4472 // | 300 ms | | | blocked |
4473 // | 400 ms | | insert 2 | |
4474 // | | | | get 2 |
4475 // | 500 ms | | | insert 3 |
4476
4477 let task1 = {
4478 let cache1 = cache.clone();
4479 async move {
4480 cache1
4481 .entry(KEY)
4482 .and_upsert_with(|maybe_entry| async move {
4483 sleep(Duration::from_millis(200)).await;
4484 assert!(maybe_entry.is_none());
4485 1
4486 })
4487 .await
4488 }
4489 };
4490
4491 let task2 = {
4492 let cache2 = cache.clone();
4493 async move {
4494 sleep(Duration::from_millis(100)).await;
4495 cache2
4496 .entry_by_ref(&KEY)
4497 .and_upsert_with(|maybe_entry| async move {
4498 sleep(Duration::from_millis(200)).await;
4499 let entry = maybe_entry.expect("The entry should exist");
4500 entry.into_value() + 1
4501 })
4502 .await
4503 }
4504 };
4505
4506 let task3 = {
4507 let cache3 = cache.clone();
4508 async move {
4509 sleep(Duration::from_millis(300)).await;
4510 cache3
4511 .entry_by_ref(&KEY)
4512 .and_upsert_with(|maybe_entry| async move {
4513 sleep(Duration::from_millis(100)).await;
4514 let entry = maybe_entry.expect("The entry should exist");
4515 entry.into_value() + 1
4516 })
4517 .await
4518 }
4519 };
4520
4521 let (ent1, ent2, ent3) = futures_util::join!(task1, task2, task3);
4522 assert_eq!(ent1.into_value(), 1);
4523 assert_eq!(ent2.into_value(), 2);
4524 assert_eq!(ent3.into_value(), 3);
4525
4526 assert_eq!(cache.get(&KEY).await, Some(3));
4527
4528 assert!(cache.is_waiter_map_empty());
4529 }
4530
4531 #[tokio::test]
4532 async fn compute_with() {
4533 use crate::ops::compute;
4534 use tokio::sync::RwLock;
4535
4536 let cache = Cache::new(100);
4537 const KEY: u32 = 0;
4538
4539 // Spawn six async tasks to call `and_compute_with` for the same key. Ensure
4540 // the key-level lock is working by verifying the value after all tasks
4541 // finish.
4542 //
4543 // | | task 1 | task 2 | task 3 | task 4 | task 5 | task 6 |
4544 // |---------|------------|---------------|------------|----------|------------|---------|
4545 // | 0 ms | get none | | | | | |
4546 // | 100 ms | | blocked | | | | |
4547 // | 200 ms | insert [1] | | | | | |
4548 // | | | get [1] | | | | |
4549 // | 300 ms | | | blocked | | | |
4550 // | 400 ms | | insert [1, 2] | | | | |
4551 // | | | | get [1, 2] | | | |
4552 // | 500 ms | | | | blocked | | |
4553 // | 600 ms | | | remove | | | |
4554 // | | | | | get none | | |
4555 // | 700 ms | | | | | blocked | |
4556 // | 800 ms | | | | nop | | |
4557 // | | | | | | get none | |
4558 // | 900 ms | | | | | | blocked |
4559 // | 1000 ms | | | | | insert [5] | |
4560 // | | | | | | | get [5] |
4561 // | 1100 ms | | | | | | nop |
4562
4563 let task1 = {
4564 let cache1 = cache.clone();
4565 async move {
4566 cache1
4567 .entry(KEY)
4568 .and_compute_with(|maybe_entry| async move {
4569 sleep(Duration::from_millis(200)).await;
4570 assert!(maybe_entry.is_none());
4571 compute::Op::Put(Arc::new(RwLock::new(vec![1])))
4572 })
4573 .await
4574 }
4575 };
4576
4577 let task2 = {
4578 let cache2 = cache.clone();
4579 async move {
4580 sleep(Duration::from_millis(100)).await;
4581 cache2
4582 .entry_by_ref(&KEY)
4583 .and_compute_with(|maybe_entry| async move {
4584 let entry = maybe_entry.expect("The entry should exist");
4585 let value = entry.into_value();
4586 assert_eq!(*value.read().await, vec![1]);
4587 sleep(Duration::from_millis(200)).await;
4588 value.write().await.push(2);
4589 compute::Op::Put(value)
4590 })
4591 .await
4592 }
4593 };
4594
4595 let task3 = {
4596 let cache3 = cache.clone();
4597 async move {
4598 sleep(Duration::from_millis(300)).await;
4599 cache3
4600 .entry(KEY)
4601 .and_compute_with(|maybe_entry| async move {
4602 let entry = maybe_entry.expect("The entry should exist");
4603 let value = entry.into_value();
4604 assert_eq!(*value.read().await, vec![1, 2]);
4605 sleep(Duration::from_millis(200)).await;
4606 compute::Op::Remove
4607 })
4608 .await
4609 }
4610 };
4611
4612 let task4 = {
4613 let cache4 = cache.clone();
4614 async move {
4615 sleep(Duration::from_millis(500)).await;
4616 cache4
4617 .entry(KEY)
4618 .and_compute_with(|maybe_entry| async move {
4619 assert!(maybe_entry.is_none());
4620 sleep(Duration::from_millis(200)).await;
4621 compute::Op::Nop
4622 })
4623 .await
4624 }
4625 };
4626
4627 let task5 = {
4628 let cache5 = cache.clone();
4629 async move {
4630 sleep(Duration::from_millis(700)).await;
4631 cache5
4632 .entry_by_ref(&KEY)
4633 .and_compute_with(|maybe_entry| async move {
4634 assert!(maybe_entry.is_none());
4635 sleep(Duration::from_millis(200)).await;
4636 compute::Op::Put(Arc::new(RwLock::new(vec![5])))
4637 })
4638 .await
4639 }
4640 };
4641
4642 let task6 = {
4643 let cache6 = cache.clone();
4644 async move {
4645 sleep(Duration::from_millis(900)).await;
4646 cache6
4647 .entry_by_ref(&KEY)
4648 .and_compute_with(|maybe_entry| async move {
4649 let entry = maybe_entry.expect("The entry should exist");
4650 let value = entry.into_value();
4651 assert_eq!(*value.read().await, vec![5]);
4652 sleep(Duration::from_millis(100)).await;
4653 compute::Op::Nop
4654 })
4655 .await
4656 }
4657 };
4658
4659 let (res1, res2, res3, res4, res5, res6) =
4660 futures_util::join!(task1, task2, task3, task4, task5, task6);
4661
4662 let compute::CompResult::Inserted(entry) = res1 else {
4663 panic!("Expected `Inserted`. Got {res1:?}")
4664 };
4665 assert_eq!(
4666 *entry.into_value().read().await,
4667 vec![1, 2] // The same Vec was modified by task2.
4668 );
4669
4670 let compute::CompResult::ReplacedWith(entry) = res2 else {
4671 panic!("Expected `ReplacedWith`. Got {res2:?}")
4672 };
4673 assert_eq!(*entry.into_value().read().await, vec![1, 2]);
4674
4675 let compute::CompResult::Removed(entry) = res3 else {
4676 panic!("Expected `Removed`. Got {res3:?}")
4677 };
4678 assert_eq!(*entry.into_value().read().await, vec![1, 2]);
4679
4680 let compute::CompResult::StillNone(key) = res4 else {
4681 panic!("Expected `StillNone`. Got {res4:?}")
4682 };
4683 assert_eq!(*key, KEY);
4684
4685 let compute::CompResult::Inserted(entry) = res5 else {
4686 panic!("Expected `Inserted`. Got {res5:?}")
4687 };
4688 assert_eq!(*entry.into_value().read().await, vec![5]);
4689
4690 let compute::CompResult::Unchanged(entry) = res6 else {
4691 panic!("Expected `Unchanged`. Got {res6:?}")
4692 };
4693 assert_eq!(*entry.into_value().read().await, vec![5]);
4694
4695 assert!(cache.is_waiter_map_empty());
4696 }
4697
4698 #[tokio::test]
4699 async fn try_compute_with() {
4700 use crate::ops::compute;
4701 use tokio::sync::RwLock;
4702
4703 let cache: Cache<u32, Arc<RwLock<Vec<i32>>>> = Cache::new(100);
4704 const KEY: u32 = 0;
4705
4706 // Spawn four async tasks to call `and_try_compute_with` for the same key.
4707 // Ensure the key-level lock is working by verifying the value after all
4708 // tasks finish.
4709 //
4710 // | | task 1 | task 2 | task 3 | task 4 |
4711 // |---------|------------|---------------|------------|------------|
4712 // | 0 ms | get none | | | |
4713 // | 100 ms | | blocked | | |
4714 // | 200 ms | insert [1] | | | |
4715 // | | | get [1] | | |
4716 // | 300 ms | | | blocked | |
4717 // | 400 ms | | insert [1, 2] | | |
4718 // | | | | get [1, 2] | |
4719 // | 500 ms | | | | blocked |
4720 // | 600 ms | | | err | |
4721 // | | | | | get [1, 2] |
4722 // | 700 ms | | | | remove |
4723 //
4724 // This test is shorter than `compute_with` test because this one omits `Nop`
4725 // cases.
4726
4727 let task1 = {
4728 let cache1 = cache.clone();
4729 async move {
4730 cache1
4731 .entry(KEY)
4732 .and_try_compute_with(|maybe_entry| async move {
4733 sleep(Duration::from_millis(200)).await;
4734 assert!(maybe_entry.is_none());
4735 Ok(compute::Op::Put(Arc::new(RwLock::new(vec![1])))) as Result<_, ()>
4736 })
4737 .await
4738 }
4739 };
4740
4741 let task2 = {
4742 let cache2 = cache.clone();
4743 async move {
4744 sleep(Duration::from_millis(100)).await;
4745 cache2
4746 .entry_by_ref(&KEY)
4747 .and_try_compute_with(|maybe_entry| async move {
4748 let entry = maybe_entry.expect("The entry should exist");
4749 let value = entry.into_value();
4750 assert_eq!(*value.read().await, vec![1]);
4751 sleep(Duration::from_millis(200)).await;
4752 value.write().await.push(2);
4753 Ok(compute::Op::Put(value)) as Result<_, ()>
4754 })
4755 .await
4756 }
4757 };
4758
4759 let task3 = {
4760 let cache3 = cache.clone();
4761 async move {
4762 sleep(Duration::from_millis(300)).await;
4763 cache3
4764 .entry(KEY)
4765 .and_try_compute_with(|maybe_entry| async move {
4766 let entry = maybe_entry.expect("The entry should exist");
4767 let value = entry.into_value();
4768 assert_eq!(*value.read().await, vec![1, 2]);
4769 sleep(Duration::from_millis(200)).await;
4770 Err(())
4771 })
4772 .await
4773 }
4774 };
4775
4776 let task4 = {
4777 let cache4 = cache.clone();
4778 async move {
4779 sleep(Duration::from_millis(500)).await;
4780 cache4
4781 .entry(KEY)
4782 .and_try_compute_with(|maybe_entry| async move {
4783 let entry = maybe_entry.expect("The entry should exist");
4784 let value = entry.into_value();
4785 assert_eq!(*value.read().await, vec![1, 2]);
4786 sleep(Duration::from_millis(100)).await;
4787 Ok(compute::Op::Remove) as Result<_, ()>
4788 })
4789 .await
4790 }
4791 };
4792
4793 let (res1, res2, res3, res4) = futures_util::join!(task1, task2, task3, task4);
4794
4795 let Ok(compute::CompResult::Inserted(entry)) = res1 else {
4796 panic!("Expected `Inserted`. Got {res1:?}")
4797 };
4798 assert_eq!(
4799 *entry.into_value().read().await,
4800 vec![1, 2] // The same Vec was modified by task2.
4801 );
4802
4803 let Ok(compute::CompResult::ReplacedWith(entry)) = res2 else {
4804 panic!("Expected `ReplacedWith`. Got {res2:?}")
4805 };
4806 assert_eq!(*entry.into_value().read().await, vec![1, 2]);
4807
4808 assert!(res3.is_err());
4809
4810 let Ok(compute::CompResult::Removed(entry)) = res4 else {
4811 panic!("Expected `Removed`. Got {res4:?}")
4812 };
4813 assert_eq!(
4814 *entry.into_value().read().await,
4815 vec![1, 2] // Removed value.
4816 );
4817
4818 assert!(cache.is_waiter_map_empty());
4819 }
4820
4821 #[tokio::test]
4822 // https://github.com/moka-rs/moka/issues/43
4823 async fn handle_panic_in_get_with() {
4824 use tokio::time::{sleep, Duration};
4825
4826 let cache = Cache::new(16);
4827 let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
4828 {
4829 let cache_ref = cache.clone();
4830 let semaphore_ref = semaphore.clone();
4831 tokio::task::spawn(async move {
4832 let _ = cache_ref
4833 .get_with(1, async move {
4834 semaphore_ref.add_permits(1);
4835 sleep(Duration::from_millis(50)).await;
4836 panic!("Panic during try_get_with");
4837 })
4838 .await;
4839 });
4840 }
4841 let _ = semaphore.acquire().await.expect("semaphore acquire failed");
4842 assert_eq!(cache.get_with(1, async { 5 }).await, 5);
4843 }
4844
4845 #[tokio::test]
4846 // https://github.com/moka-rs/moka/issues/43
4847 async fn handle_panic_in_try_get_with() {
4848 use tokio::time::{sleep, Duration};
4849
4850 let cache = Cache::new(16);
4851 let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
4852 {
4853 let cache_ref = cache.clone();
4854 let semaphore_ref = semaphore.clone();
4855 tokio::task::spawn(async move {
4856 let _ = cache_ref
4857 .try_get_with(1, async move {
4858 semaphore_ref.add_permits(1);
4859 sleep(Duration::from_millis(50)).await;
4860 panic!("Panic during try_get_with");
4861 })
4862 .await as Result<_, Arc<Infallible>>;
4863 });
4864 }
4865 let _ = semaphore.acquire().await.expect("semaphore acquire failed");
4866 assert_eq!(
4867 cache.try_get_with(1, async { Ok(5) }).await as Result<_, Arc<Infallible>>,
4868 Ok(5)
4869 );
4870
4871 assert!(cache.is_waiter_map_empty());
4872 }
4873
4874 #[tokio::test]
4875 // https://github.com/moka-rs/moka/issues/59
4876 async fn abort_get_with() {
4877 use tokio::time::{sleep, Duration};
4878
4879 let cache = Cache::new(16);
4880 let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
4881
4882 let handle;
4883 {
4884 let cache_ref = cache.clone();
4885 let semaphore_ref = semaphore.clone();
4886
4887 handle = tokio::task::spawn(async move {
4888 let _ = cache_ref
4889 .get_with(1, async move {
4890 semaphore_ref.add_permits(1);
4891 sleep(Duration::from_millis(50)).await;
4892 unreachable!();
4893 })
4894 .await;
4895 });
4896 }
4897
4898 let _ = semaphore.acquire().await.expect("semaphore acquire failed");
4899 handle.abort();
4900
4901 assert_eq!(cache.get_with(1, async { 5 }).await, 5);
4902
4903 assert!(cache.is_waiter_map_empty());
4904 }
4905
4906 #[tokio::test]
4907 // https://github.com/moka-rs/moka/issues/59
4908 async fn abort_try_get_with() {
4909 use tokio::time::{sleep, Duration};
4910
4911 let cache = Cache::new(16);
4912 let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
4913
4914 let handle;
4915 {
4916 let cache_ref = cache.clone();
4917 let semaphore_ref = semaphore.clone();
4918
4919 handle = tokio::task::spawn(async move {
4920 let _ = cache_ref
4921 .try_get_with(1, async move {
4922 semaphore_ref.add_permits(1);
4923 sleep(Duration::from_millis(50)).await;
4924 unreachable!();
4925 })
4926 .await as Result<_, Arc<Infallible>>;
4927 });
4928 }
4929
4930 let _ = semaphore.acquire().await.expect("semaphore acquire failed");
4931 handle.abort();
4932
4933 assert_eq!(
4934 cache.try_get_with(1, async { Ok(5) }).await as Result<_, Arc<Infallible>>,
4935 Ok(5)
4936 );
4937
4938 assert!(cache.is_waiter_map_empty());
4939 }
4940
4941 #[tokio::test]
4942 async fn test_removal_notifications() {
4943 // The following `Vec`s will hold actual and expected notifications.
4944 let actual = Arc::new(Mutex::new(Vec::new()));
4945 let mut expected = Vec::new();
4946
4947 // Create an eviction listener.
4948 let a1 = Arc::clone(&actual);
4949 let listener = move |k, v, cause| -> ListenerFuture {
4950 let a2 = Arc::clone(&a1);
4951 async move {
4952 a2.lock().await.push((k, v, cause));
4953 }
4954 .boxed()
4955 };
4956
4957 // Create a cache with the eviction listener.
4958 let mut cache = Cache::builder()
4959 .max_capacity(3)
4960 .async_eviction_listener(listener)
4961 .build();
4962 cache.reconfigure_for_testing().await;
4963
4964 // Make the cache exterior immutable.
4965 let cache = cache;
4966
4967 cache.insert('a', "alice").await;
4968 cache.invalidate(&'a').await;
4969 expected.push((Arc::new('a'), "alice", RemovalCause::Explicit));
4970
4971 cache.run_pending_tasks().await;
4972 assert_eq!(cache.entry_count(), 0);
4973
4974 cache.insert('b', "bob").await;
4975 cache.insert('c', "cathy").await;
4976 cache.insert('d', "david").await;
4977 cache.run_pending_tasks().await;
4978 assert_eq!(cache.entry_count(), 3);
4979
4980 // This will be rejected due to the size constraint.
4981 cache.insert('e', "emily").await;
4982 expected.push((Arc::new('e'), "emily", RemovalCause::Size));
4983 cache.run_pending_tasks().await;
4984 assert_eq!(cache.entry_count(), 3);
4985
4986 // Raise the popularity of 'e' so it will be accepted next time.
4987 cache.get(&'e').await;
4988 cache.run_pending_tasks().await;
4989
4990 // Retry.
4991 cache.insert('e', "eliza").await;
4992 // and the LRU entry will be evicted.
4993 expected.push((Arc::new('b'), "bob", RemovalCause::Size));
4994 cache.run_pending_tasks().await;
4995 assert_eq!(cache.entry_count(), 3);
4996
4997 // Replace an existing entry.
4998 cache.insert('d', "dennis").await;
4999 expected.push((Arc::new('d'), "david", RemovalCause::Replaced));
5000 cache.run_pending_tasks().await;
5001 assert_eq!(cache.entry_count(), 3);
5002
5003 verify_notification_vec(&cache, actual, &expected).await;
5004 }
5005
5006 #[tokio::test]
5007 async fn test_removal_notifications_with_updates() {
5008 // The following `Vec`s will hold actual and expected notifications.
5009 let actual = Arc::new(Mutex::new(Vec::new()));
5010 let mut expected = Vec::new();
5011
5012 // Create an eviction listener.
5013 let a1 = Arc::clone(&actual);
5014 let listener = move |k, v, cause| -> ListenerFuture {
5015 let a2 = Arc::clone(&a1);
5016 async move {
5017 a2.lock().await.push((k, v, cause));
5018 }
5019 .boxed()
5020 };
5021
5022 let (clock, mock) = Clock::mock();
5023
5024 // Create a cache with the eviction listener and also TTL and TTI.
5025 let mut cache = Cache::builder()
5026 .async_eviction_listener(listener)
5027 .time_to_live(Duration::from_secs(7))
5028 .time_to_idle(Duration::from_secs(5))
5029 .clock(clock)
5030 .build();
5031 cache.reconfigure_for_testing().await;
5032
5033 // Make the cache exterior immutable.
5034 let cache = cache;
5035
5036 cache.insert("alice", "a0").await;
5037 cache.run_pending_tasks().await;
5038
5039 // Now alice (a0) has been expired by the idle timeout (TTI).
5040 mock.increment(Duration::from_secs(6));
5041 expected.push((Arc::new("alice"), "a0", RemovalCause::Expired));
5042 assert_eq!(cache.get(&"alice").await, None);
5043
5044 // We have not ran sync after the expiration of alice (a0), so it is
5045 // still in the cache.
5046 assert_eq!(cache.entry_count(), 1);
5047
5048 // Re-insert alice with a different value. Since alice (a0) is still
5049 // in the cache, this is actually a replace operation rather than an
5050 // insert operation. We want to verify that the RemovalCause of a0 is
5051 // Expired, not Replaced.
5052 cache.insert("alice", "a1").await;
5053 cache.run_pending_tasks().await;
5054
5055 mock.increment(Duration::from_secs(4));
5056 assert_eq!(cache.get(&"alice").await, Some("a1"));
5057 cache.run_pending_tasks().await;
5058
5059 // Now alice has been expired by time-to-live (TTL).
5060 mock.increment(Duration::from_secs(4));
5061 expected.push((Arc::new("alice"), "a1", RemovalCause::Expired));
5062 assert_eq!(cache.get(&"alice").await, None);
5063
5064 // But, again, it is still in the cache.
5065 assert_eq!(cache.entry_count(), 1);
5066
5067 // Re-insert alice with a different value and verify that the
5068 // RemovalCause of a1 is Expired (not Replaced).
5069 cache.insert("alice", "a2").await;
5070 cache.run_pending_tasks().await;
5071
5072 assert_eq!(cache.entry_count(), 1);
5073
5074 // Now alice (a2) has been expired by the idle timeout.
5075 mock.increment(Duration::from_secs(6));
5076 expected.push((Arc::new("alice"), "a2", RemovalCause::Expired));
5077 assert_eq!(cache.get(&"alice").await, None);
5078 assert_eq!(cache.entry_count(), 1);
5079
5080 // This invalidate will internally remove alice (a2).
5081 cache.invalidate(&"alice").await;
5082 cache.run_pending_tasks().await;
5083 assert_eq!(cache.entry_count(), 0);
5084
5085 // Re-insert, and this time, make it expired by the TTL.
5086 cache.insert("alice", "a3").await;
5087 cache.run_pending_tasks().await;
5088 mock.increment(Duration::from_secs(4));
5089 assert_eq!(cache.get(&"alice").await, Some("a3"));
5090 cache.run_pending_tasks().await;
5091 mock.increment(Duration::from_secs(4));
5092 expected.push((Arc::new("alice"), "a3", RemovalCause::Expired));
5093 assert_eq!(cache.get(&"alice").await, None);
5094 assert_eq!(cache.entry_count(), 1);
5095
5096 // This invalidate will internally remove alice (a2).
5097 cache.invalidate(&"alice").await;
5098 cache.run_pending_tasks().await;
5099 assert_eq!(cache.entry_count(), 0);
5100
5101 verify_notification_vec(&cache, actual, &expected).await;
5102 }
5103
5104 // When the eviction listener is not set, calling `run_pending_tasks` once should
5105 // evict all entries that can be removed.
5106 #[tokio::test]
5107 async fn no_batch_size_limit_on_eviction() {
5108 const MAX_CAPACITY: u64 = 20;
5109
5110 const EVICTION_TIMEOUT: Duration = Duration::from_nanos(0);
5111 const MAX_LOG_SYNC_REPEATS: u32 = 1;
5112 const EVICTION_BATCH_SIZE: u32 = 1;
5113
5114 let hk_conf = HousekeeperConfig::new(
5115 // Timeout should be ignored when the eviction listener is not provided.
5116 Some(EVICTION_TIMEOUT),
5117 Some(MAX_LOG_SYNC_REPEATS),
5118 Some(EVICTION_BATCH_SIZE),
5119 );
5120
5121 // Create a cache with the LRU policy.
5122 let mut cache = Cache::builder()
5123 .max_capacity(MAX_CAPACITY)
5124 .eviction_policy(EvictionPolicy::lru())
5125 .housekeeper_config(hk_conf)
5126 .build();
5127 cache.reconfigure_for_testing().await;
5128
5129 // Make the cache exterior immutable.
5130 let cache = cache;
5131
5132 // Fill the cache.
5133 for i in 0..MAX_CAPACITY {
5134 let v = format!("v{i}");
5135 cache.insert(i, v).await
5136 }
5137 // The max capacity should not change because we have not called
5138 // `run_pending_tasks` yet.
5139 assert_eq!(cache.entry_count(), 0);
5140
5141 cache.run_pending_tasks().await;
5142 assert_eq!(cache.entry_count(), MAX_CAPACITY);
5143
5144 // Insert more items to the cache.
5145 for i in MAX_CAPACITY..(MAX_CAPACITY * 2) {
5146 let v = format!("v{i}");
5147 cache.insert(i, v).await
5148 }
5149 // The max capacity should not change because we have not called
5150 // `run_pending_tasks` yet.
5151 assert_eq!(cache.entry_count(), MAX_CAPACITY);
5152 // Both old and new keys should exist.
5153 assert!(cache.contains_key(&0)); // old
5154 assert!(cache.contains_key(&(MAX_CAPACITY - 1))); // old
5155 assert!(cache.contains_key(&(MAX_CAPACITY * 2 - 1))); // new
5156
5157 // Process the remaining write op logs (there should be MAX_CAPACITY logs),
5158 // and evict the LRU entries.
5159 cache.run_pending_tasks().await;
5160 assert_eq!(cache.entry_count(), MAX_CAPACITY);
5161
5162 // Now all the old keys should be gone.
5163 assert!(!cache.contains_key(&0));
5164 assert!(!cache.contains_key(&(MAX_CAPACITY - 1)));
5165 // And the new keys should exist.
5166 assert!(cache.contains_key(&(MAX_CAPACITY * 2 - 1)));
5167 }
5168
5169 #[tokio::test]
5170 async fn slow_eviction_listener() {
5171 const MAX_CAPACITY: u64 = 20;
5172
5173 const EVICTION_TIMEOUT: Duration = Duration::from_millis(30);
5174 const LISTENER_DELAY: Duration = Duration::from_millis(11);
5175 const MAX_LOG_SYNC_REPEATS: u32 = 1;
5176 const EVICTION_BATCH_SIZE: u32 = 1;
5177
5178 let hk_conf = HousekeeperConfig::new(
5179 Some(EVICTION_TIMEOUT),
5180 Some(MAX_LOG_SYNC_REPEATS),
5181 Some(EVICTION_BATCH_SIZE),
5182 );
5183
5184 let (clock, mock) = Clock::mock();
5185 let listener_call_count = Arc::new(AtomicU8::new(0));
5186 let lcc = Arc::clone(&listener_call_count);
5187
5188 // A slow eviction listener that spend `LISTENER_DELAY` to process a removal
5189 // notification.
5190 let listener = move |_k, _v, _cause| {
5191 mock.increment(LISTENER_DELAY);
5192 lcc.fetch_add(1, Ordering::AcqRel);
5193 };
5194
5195 // Create a cache with the LRU policy.
5196 let mut cache = Cache::builder()
5197 .max_capacity(MAX_CAPACITY)
5198 .eviction_policy(EvictionPolicy::lru())
5199 .eviction_listener(listener)
5200 .housekeeper_config(hk_conf)
5201 .clock(clock)
5202 .build();
5203 cache.reconfigure_for_testing().await;
5204
5205 // Make the cache exterior immutable.
5206 let cache = cache;
5207
5208 // Fill the cache.
5209 for i in 0..MAX_CAPACITY {
5210 let v = format!("v{i}");
5211 cache.insert(i, v).await
5212 }
5213 // The max capacity should not change because we have not called
5214 // `run_pending_tasks` yet.
5215 assert_eq!(cache.entry_count(), 0);
5216
5217 cache.run_pending_tasks().await;
5218 assert_eq!(listener_call_count.load(Ordering::Acquire), 0);
5219 assert_eq!(cache.entry_count(), MAX_CAPACITY);
5220
5221 // Insert more items to the cache.
5222 for i in MAX_CAPACITY..(MAX_CAPACITY * 2) {
5223 let v = format!("v{i}");
5224 cache.insert(i, v).await
5225 }
5226 assert_eq!(cache.entry_count(), MAX_CAPACITY);
5227
5228 cache.run_pending_tasks().await;
5229 // Because of the slow listener, cache should get an over capacity.
5230 let mut expected_call_count = 3;
5231 assert_eq!(
5232 listener_call_count.load(Ordering::Acquire) as u64,
5233 expected_call_count
5234 );
5235 assert_eq!(cache.entry_count(), MAX_CAPACITY * 2 - expected_call_count);
5236
5237 loop {
5238 cache.run_pending_tasks().await;
5239
5240 expected_call_count += 3;
5241 if expected_call_count > MAX_CAPACITY {
5242 expected_call_count = MAX_CAPACITY;
5243 }
5244
5245 let actual_count = listener_call_count.load(Ordering::Acquire) as u64;
5246 assert_eq!(actual_count, expected_call_count);
5247 let expected_entry_count = MAX_CAPACITY * 2 - expected_call_count;
5248 assert_eq!(cache.entry_count(), expected_entry_count);
5249
5250 if expected_call_count >= MAX_CAPACITY {
5251 break;
5252 }
5253 }
5254
5255 assert_eq!(cache.entry_count(), MAX_CAPACITY);
5256 }
5257
5258 // NOTE: To enable the panic logging, run the following command:
5259 //
5260 // RUST_LOG=moka=info cargo test --features 'future, logging' -- \
5261 // future::cache::tests::recover_from_panicking_eviction_listener --exact --nocapture
5262 //
5263 #[tokio::test]
5264 async fn recover_from_panicking_eviction_listener() {
5265 #[cfg(feature = "logging")]
5266 let _ = env_logger::builder().is_test(true).try_init();
5267
5268 // The following `Vec`s will hold actual and expected notifications.
5269 let actual = Arc::new(Mutex::new(Vec::new()));
5270 let mut expected = Vec::new();
5271
5272 // Create an eviction listener that panics when it see
5273 // a value "panic now!".
5274 let a1 = Arc::clone(&actual);
5275 let listener = move |k, v, cause| -> ListenerFuture {
5276 let a2 = Arc::clone(&a1);
5277 async move {
5278 if v == "panic now!" {
5279 panic!("Panic now!");
5280 }
5281 a2.lock().await.push((k, v, cause));
5282 }
5283 .boxed()
5284 };
5285
5286 // Create a cache with the eviction listener.
5287 let mut cache = Cache::builder()
5288 .name("My Future Cache")
5289 .async_eviction_listener(listener)
5290 .build();
5291 cache.reconfigure_for_testing().await;
5292
5293 // Make the cache exterior immutable.
5294 let cache = cache;
5295
5296 // Insert an okay value.
5297 cache.insert("alice", "a0").await;
5298 cache.run_pending_tasks().await;
5299
5300 // Insert a value that will cause the eviction listener to panic.
5301 cache.insert("alice", "panic now!").await;
5302 expected.push((Arc::new("alice"), "a0", RemovalCause::Replaced));
5303 cache.run_pending_tasks().await;
5304
5305 // Insert an okay value. This will replace the previous
5306 // value "panic now!" so the eviction listener will panic.
5307 cache.insert("alice", "a2").await;
5308 cache.run_pending_tasks().await;
5309 // No more removal notification should be sent.
5310
5311 // Invalidate the okay value.
5312 cache.invalidate(&"alice").await;
5313 cache.run_pending_tasks().await;
5314
5315 verify_notification_vec(&cache, actual, &expected).await;
5316 }
5317
5318 #[tokio::test]
5319 async fn cancel_future_while_running_pending_tasks() {
5320 use crate::future::FutureExt;
5321 use futures_util::future::poll_immediate;
5322 use tokio::task::yield_now;
5323
5324 let listener_initiation_count: Arc<AtomicU32> = Default::default();
5325 let listener_completion_count: Arc<AtomicU32> = Default::default();
5326
5327 let listener = {
5328 // Variables to capture.
5329 let init_count = Arc::clone(&listener_initiation_count);
5330 let comp_count = Arc::clone(&listener_completion_count);
5331
5332 // Our eviction listener closure.
5333 move |_k, _v, _r| {
5334 init_count.fetch_add(1, Ordering::AcqRel);
5335 let comp_count1 = Arc::clone(&comp_count);
5336
5337 async move {
5338 yield_now().await;
5339 comp_count1.fetch_add(1, Ordering::AcqRel);
5340 }
5341 .boxed()
5342 }
5343 };
5344
5345 let (clock, mock) = Clock::mock();
5346
5347 let mut cache: Cache<u32, u32> = Cache::builder()
5348 .time_to_live(Duration::from_millis(10))
5349 .async_eviction_listener(listener)
5350 .clock(clock)
5351 .build();
5352
5353 cache.reconfigure_for_testing().await;
5354
5355 // Make the cache exterior immutable.
5356 let cache = cache;
5357
5358 cache.insert(1, 1).await;
5359 assert_eq!(cache.run_pending_tasks_initiation_count(), 0);
5360 assert_eq!(cache.run_pending_tasks_completion_count(), 0);
5361
5362 // Key 1 is not yet expired.
5363 mock.increment(Duration::from_millis(7));
5364
5365 cache.run_pending_tasks().await;
5366 assert_eq!(cache.run_pending_tasks_initiation_count(), 1);
5367 assert_eq!(cache.run_pending_tasks_completion_count(), 1);
5368 assert_eq!(listener_initiation_count.load(Ordering::Acquire), 0);
5369 assert_eq!(listener_completion_count.load(Ordering::Acquire), 0);
5370
5371 // Now key 1 is expired, so the eviction listener should be called when we
5372 // call run_pending_tasks() and poll the returned future.
5373 mock.increment(Duration::from_millis(7));
5374
5375 let fut = cache.run_pending_tasks();
5376 // Poll the fut only once, and drop it. The fut should not be completed (so
5377 // it is cancelled) because the eviction listener performed a yield_now().
5378 assert!(poll_immediate(fut).await.is_none());
5379
5380 // The task is initiated but not completed.
5381 assert_eq!(cache.run_pending_tasks_initiation_count(), 2);
5382 assert_eq!(cache.run_pending_tasks_completion_count(), 1);
5383 // The listener is initiated but not completed.
5384 assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
5385 assert_eq!(listener_completion_count.load(Ordering::Acquire), 0);
5386
5387 // This will resume the task and the listener, and continue polling
5388 // until complete.
5389 cache.run_pending_tasks().await;
5390 // Now the task is completed.
5391 assert_eq!(cache.run_pending_tasks_initiation_count(), 2);
5392 assert_eq!(cache.run_pending_tasks_completion_count(), 2);
5393 // Now the listener is completed.
5394 assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
5395 assert_eq!(listener_completion_count.load(Ordering::Acquire), 1);
5396 }
5397
5398 #[tokio::test]
5399 async fn cancel_future_while_calling_eviction_listener() {
5400 use crate::future::FutureExt;
5401 use futures_util::future::poll_immediate;
5402 use tokio::task::yield_now;
5403
5404 let listener_initiation_count: Arc<AtomicU32> = Default::default();
5405 let listener_completion_count: Arc<AtomicU32> = Default::default();
5406
5407 let listener = {
5408 // Variables to capture.
5409 let init_count = Arc::clone(&listener_initiation_count);
5410 let comp_count = Arc::clone(&listener_completion_count);
5411
5412 // Our eviction listener closure.
5413 move |_k, _v, _r| {
5414 init_count.fetch_add(1, Ordering::AcqRel);
5415 let comp_count1 = Arc::clone(&comp_count);
5416
5417 async move {
5418 yield_now().await;
5419 comp_count1.fetch_add(1, Ordering::AcqRel);
5420 }
5421 .boxed()
5422 }
5423 };
5424
5425 let mut cache: Cache<u32, u32> = Cache::builder()
5426 .time_to_live(Duration::from_millis(10))
5427 .async_eviction_listener(listener)
5428 .build();
5429
5430 cache.reconfigure_for_testing().await;
5431
5432 // Make the cache exterior immutable.
5433 let cache = cache;
5434
5435 // ------------------------------------------------------------
5436 // Interrupt the eviction listener while calling `insert`
5437 // ------------------------------------------------------------
5438
5439 cache.insert(1, 1).await;
5440
5441 let fut = cache.insert(1, 2);
5442 // Poll the fut only once, and drop it. The fut should not be completed (so
5443 // it is cancelled) because the eviction listener performed a yield_now().
5444 assert!(poll_immediate(fut).await.is_none());
5445
5446 // The listener is initiated but not completed.
5447 assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
5448 assert_eq!(listener_completion_count.load(Ordering::Acquire), 0);
5449
5450 // This will call retry_interrupted_ops() and resume the interrupted
5451 // listener.
5452 cache.run_pending_tasks().await;
5453 assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
5454 assert_eq!(listener_completion_count.load(Ordering::Acquire), 1);
5455
5456 // ------------------------------------------------------------
5457 // Interrupt the eviction listener while calling `invalidate`
5458 // ------------------------------------------------------------
5459
5460 let fut = cache.invalidate(&1);
5461 // Cancel the fut after one poll.
5462 assert!(poll_immediate(fut).await.is_none());
5463 // The listener is initiated but not completed.
5464 assert_eq!(listener_initiation_count.load(Ordering::Acquire), 2);
5465 assert_eq!(listener_completion_count.load(Ordering::Acquire), 1);
5466
5467 // This will call retry_interrupted_ops() and resume the interrupted
5468 // listener.
5469 cache.get(&99).await;
5470 assert_eq!(listener_initiation_count.load(Ordering::Acquire), 2);
5471 assert_eq!(listener_completion_count.load(Ordering::Acquire), 2);
5472
5473 // ------------------------------------------------------------
5474 // Ensure retry_interrupted_ops() is called
5475 // ------------------------------------------------------------
5476
5477 // Repeat the same test with `insert`, but this time, call different methods
5478 // to ensure retry_interrupted_ops() is called.
5479 let prepare = || async {
5480 cache.invalidate(&1).await;
5481
5482 // Reset the counters.
5483 listener_initiation_count.store(0, Ordering::Release);
5484 listener_completion_count.store(0, Ordering::Release);
5485
5486 cache.insert(1, 1).await;
5487
5488 let fut = cache.insert(1, 2);
5489 // Poll the fut only once, and drop it. The fut should not be completed (so
5490 // it is cancelled) because the eviction listener performed a yield_now().
5491 assert!(poll_immediate(fut).await.is_none());
5492
5493 // The listener is initiated but not completed.
5494 assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
5495 assert_eq!(listener_completion_count.load(Ordering::Acquire), 0);
5496 };
5497
5498 // Methods to test:
5499 //
5500 // - run_pending_tasks (Already tested in a previous test)
5501 // - get (Already tested in a previous test)
5502 // - insert
5503 // - invalidate
5504 // - remove
5505
5506 // insert
5507 prepare().await;
5508 cache.insert(99, 99).await;
5509 assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
5510 assert_eq!(listener_completion_count.load(Ordering::Acquire), 1);
5511
5512 // invalidate
5513 prepare().await;
5514 cache.invalidate(&88).await;
5515 assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
5516 assert_eq!(listener_completion_count.load(Ordering::Acquire), 1);
5517
5518 // remove
5519 prepare().await;
5520 cache.remove(&77).await;
5521 assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
5522 assert_eq!(listener_completion_count.load(Ordering::Acquire), 1);
5523 }
5524
5525 #[tokio::test]
5526 async fn cancel_future_while_scheduling_write_op() {
5527 use futures_util::future::poll_immediate;
5528
5529 let mut cache: Cache<u32, u32> = Cache::builder().build();
5530 cache.reconfigure_for_testing().await;
5531
5532 // Make the cache exterior immutable.
5533 let cache = cache;
5534
5535 // --------------------------------------------------------------
5536 // Interrupt `insert` while blocking in `schedule_write_op`
5537 // --------------------------------------------------------------
5538
5539 cache
5540 .schedule_write_op_should_block
5541 .store(true, Ordering::Release);
5542 let fut = cache.insert(1, 1);
5543 // Poll the fut only once, and drop it. The fut should not be completed (so
5544 // it is cancelled) because schedule_write_op should be awaiting for a lock.
5545 assert!(poll_immediate(fut).await.is_none());
5546
5547 assert_eq!(cache.base.interrupted_op_ch_snd.len(), 1);
5548 assert_eq!(cache.base.write_op_ch.len(), 0);
5549
5550 // This should retry the interrupted operation.
5551 cache
5552 .schedule_write_op_should_block
5553 .store(false, Ordering::Release);
5554 cache.get(&99).await;
5555 assert_eq!(cache.base.interrupted_op_ch_snd.len(), 0);
5556 assert_eq!(cache.base.write_op_ch.len(), 1);
5557
5558 cache.run_pending_tasks().await;
5559 assert_eq!(cache.base.write_op_ch.len(), 0);
5560
5561 // --------------------------------------------------------------
5562 // Interrupt `invalidate` while blocking in `schedule_write_op`
5563 // --------------------------------------------------------------
5564
5565 cache
5566 .schedule_write_op_should_block
5567 .store(true, Ordering::Release);
5568 let fut = cache.invalidate(&1);
5569 // Poll the fut only once, and drop it. The fut should not be completed (so
5570 // it is cancelled) because schedule_write_op should be awaiting for a lock.
5571 assert!(poll_immediate(fut).await.is_none());
5572
5573 assert_eq!(cache.base.interrupted_op_ch_snd.len(), 1);
5574 assert_eq!(cache.base.write_op_ch.len(), 0);
5575
5576 // This should retry the interrupted operation.
5577 cache
5578 .schedule_write_op_should_block
5579 .store(false, Ordering::Release);
5580 cache.get(&99).await;
5581 assert_eq!(cache.base.interrupted_op_ch_snd.len(), 0);
5582 assert_eq!(cache.base.write_op_ch.len(), 1);
5583
5584 cache.run_pending_tasks().await;
5585 assert_eq!(cache.base.write_op_ch.len(), 0);
5586 }
5587
5588 // This test ensures that the `contains_key`, `get` and `invalidate` can use
5589 // borrowed form `&[u8]` for key with type `Vec<u8>`.
5590 // https://github.com/moka-rs/moka/issues/166
5591 #[tokio::test]
5592 async fn borrowed_forms_of_key() {
5593 let cache: Cache<Vec<u8>, ()> = Cache::new(1);
5594
5595 let key = vec![1_u8];
5596 cache.insert(key.clone(), ()).await;
5597
5598 // key as &Vec<u8>
5599 let key_v: &Vec<u8> = &key;
5600 assert!(cache.contains_key(key_v));
5601 assert_eq!(cache.get(key_v).await, Some(()));
5602 cache.invalidate(key_v).await;
5603
5604 cache.insert(key, ()).await;
5605
5606 // key as &[u8]
5607 let key_s: &[u8] = &[1_u8];
5608 assert!(cache.contains_key(key_s));
5609 assert_eq!(cache.get(key_s).await, Some(()));
5610 cache.invalidate(key_s).await;
5611 }
5612
5613 #[tokio::test]
5614 async fn drop_value_immediately_after_eviction() {
5615 use crate::common::test_utils::{Counters, Value};
5616
5617 const MAX_CAPACITY: u32 = 500;
5618 const KEYS: u32 = ((MAX_CAPACITY as f64) * 1.2) as u32;
5619
5620 let counters = Arc::new(Counters::default());
5621 let counters1 = Arc::clone(&counters);
5622
5623 let listener = move |_k, _v, cause| match cause {
5624 RemovalCause::Size => counters1.incl_evicted(),
5625 RemovalCause::Explicit => counters1.incl_invalidated(),
5626 _ => (),
5627 };
5628
5629 let mut cache = Cache::builder()
5630 .max_capacity(MAX_CAPACITY as u64)
5631 .eviction_listener(listener)
5632 .build();
5633 cache.reconfigure_for_testing().await;
5634
5635 // Make the cache exterior immutable.
5636 let cache = cache;
5637
5638 for key in 0..KEYS {
5639 let value = Arc::new(Value::new(vec![0u8; 1024], &counters));
5640 cache.insert(key, value).await;
5641 counters.incl_inserted();
5642 cache.run_pending_tasks().await;
5643 }
5644
5645 let eviction_count = KEYS - MAX_CAPACITY;
5646
5647 // Retries will be needed when testing in a QEMU VM.
5648 const MAX_RETRIES: usize = 5;
5649 let mut retries = 0;
5650 loop {
5651 // Ensure all scheduled notifications have been processed.
5652 std::thread::sleep(Duration::from_millis(500));
5653
5654 if counters.evicted() != eviction_count || counters.value_dropped() != eviction_count {
5655 if retries <= MAX_RETRIES {
5656 retries += 1;
5657 cache.run_pending_tasks().await;
5658 continue;
5659 } else {
5660 assert_eq!(counters.evicted(), eviction_count, "Retries exhausted");
5661 assert_eq!(
5662 counters.value_dropped(),
5663 eviction_count,
5664 "Retries exhausted"
5665 );
5666 }
5667 }
5668
5669 assert_eq!(counters.inserted(), KEYS, "inserted");
5670 assert_eq!(counters.value_created(), KEYS, "value_created");
5671 assert_eq!(counters.evicted(), eviction_count, "evicted");
5672 assert_eq!(counters.invalidated(), 0, "invalidated");
5673 assert_eq!(counters.value_dropped(), eviction_count, "value_dropped");
5674
5675 break;
5676 }
5677
5678 for key in 0..KEYS {
5679 cache.invalidate(&key).await;
5680 cache.run_pending_tasks().await;
5681 }
5682
5683 let mut retries = 0;
5684 loop {
5685 // Ensure all scheduled notifications have been processed.
5686 std::thread::sleep(Duration::from_millis(500));
5687
5688 if counters.invalidated() != MAX_CAPACITY || counters.value_dropped() != KEYS {
5689 if retries <= MAX_RETRIES {
5690 retries += 1;
5691 cache.run_pending_tasks().await;
5692 continue;
5693 } else {
5694 assert_eq!(counters.invalidated(), MAX_CAPACITY, "Retries exhausted");
5695 assert_eq!(counters.value_dropped(), KEYS, "Retries exhausted");
5696 }
5697 }
5698
5699 assert_eq!(counters.inserted(), KEYS, "inserted");
5700 assert_eq!(counters.value_created(), KEYS, "value_created");
5701 assert_eq!(counters.evicted(), eviction_count, "evicted");
5702 assert_eq!(counters.invalidated(), MAX_CAPACITY, "invalidated");
5703 assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
5704
5705 break;
5706 }
5707
5708 std::mem::drop(cache);
5709 assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
5710 }
5711
5712 // https://github.com/moka-rs/moka/issues/383
5713 #[tokio::test]
5714 async fn ensure_gc_runs_when_dropping_cache() {
5715 let cache = Cache::builder().build();
5716 let val = Arc::new(0);
5717 cache
5718 .get_with(1, std::future::ready(Arc::clone(&val)))
5719 .await;
5720 drop(cache);
5721 assert_eq!(Arc::strong_count(&val), 1);
5722 }
5723
5724 #[tokio::test]
5725 async fn test_debug_format() {
5726 let cache = Cache::new(10);
5727 cache.insert('a', "alice").await;
5728 cache.insert('b', "bob").await;
5729 cache.insert('c', "cindy").await;
5730
5731 let debug_str = format!("{cache:?}");
5732 assert!(debug_str.starts_with('{'));
5733 assert!(debug_str.contains(r#"'a': "alice""#));
5734 assert!(debug_str.contains(r#"'b': "bob""#));
5735 assert!(debug_str.contains(r#"'c': "cindy""#));
5736 assert!(debug_str.ends_with('}'));
5737 }
5738
5739 type NotificationTuple<K, V> = (Arc<K>, V, RemovalCause);
5740
5741 async fn verify_notification_vec<K, V, S>(
5742 cache: &Cache<K, V, S>,
5743 actual: Arc<Mutex<Vec<NotificationTuple<K, V>>>>,
5744 expected: &[NotificationTuple<K, V>],
5745 ) where
5746 K: std::hash::Hash + Eq + std::fmt::Debug + Send + Sync + 'static,
5747 V: Eq + std::fmt::Debug + Clone + Send + Sync + 'static,
5748 S: std::hash::BuildHasher + Clone + Send + Sync + 'static,
5749 {
5750 // Retries will be needed when testing in a QEMU VM.
5751 const MAX_RETRIES: usize = 5;
5752 let mut retries = 0;
5753 loop {
5754 // Ensure all scheduled notifications have been processed.
5755 std::thread::sleep(Duration::from_millis(500));
5756
5757 let actual = &*actual.lock().await;
5758 if actual.len() != expected.len() {
5759 if retries <= MAX_RETRIES {
5760 retries += 1;
5761 cache.run_pending_tasks().await;
5762 continue;
5763 } else {
5764 assert_eq!(actual.len(), expected.len(), "Retries exhausted");
5765 }
5766 }
5767
5768 for (i, (actual, expected)) in actual.iter().zip(expected).enumerate() {
5769 assert_eq!(actual, expected, "expected[{i}]");
5770 }
5771
5772 break;
5773 }
5774 }
5775}