moka/future/cache.rs
1use equivalent::Equivalent;
2
3use super::{
4 base_cache::BaseCache,
5 value_initializer::{InitResult, ValueInitializer},
6 CacheBuilder, CancelGuard, Iter, OwnedKeyEntrySelector, PredicateId, RefKeyEntrySelector,
7 WriteOp,
8};
9use crate::{
10 common::{concurrent::Weigher, time::Clock, HousekeeperConfig},
11 notification::AsyncEvictionListener,
12 ops::compute::{self, CompResult},
13 policy::{EvictionPolicy, ExpirationPolicy},
14 Entry, Policy, PredicateError,
15};
16
17#[cfg(feature = "unstable-debug-counters")]
18use crate::common::concurrent::debug_counters::CacheDebugStats;
19
20use std::{
21 collections::hash_map::RandomState,
22 fmt,
23 future::Future,
24 hash::{BuildHasher, Hash},
25 pin::Pin,
26 sync::Arc,
27};
28
29#[cfg(test)]
30use std::sync::atomic::{AtomicBool, Ordering};
31
32/// A thread-safe, futures-aware concurrent in-memory cache.
33///
34/// `Cache` supports full concurrency of retrievals and a high expected concurrency
35/// for updates. It utilizes a lock-free concurrent hash table as the central
36/// key-value storage. It performs a best-effort bounding of the map using an entry
37/// replacement algorithm to determine which entries to evict when the capacity is
38/// exceeded.
39///
40/// To use this cache, enable a crate feature called "future".
41///
42/// # Table of Contents
43///
44/// - [Example: `insert`, `get` and `invalidate`](#example-insert-get-and-invalidate)
45/// - [Avoiding to clone the value at `get`](#avoiding-to-clone-the-value-at-get)
46/// - [Sharing a cache across asynchronous tasks](#sharing-a-cache-across-asynchronous-tasks)
47/// - [No lock is needed](#no-lock-is-needed)
48/// - [Hashing Algorithm](#hashing-algorithm)
49/// - [Example: Size-based Eviction](#example-size-based-eviction)
50/// - [Example: Time-based Expirations](#example-time-based-expirations)
51/// - [Cache-level TTL and TTI policies](#cache-level-ttl-and-tti-policies)
52/// - [Per-entry expiration policy](#per-entry-expiration-policy)
53/// - [Example: Eviction Listener](#example-eviction-listener)
54/// - [You should avoid eviction listener to panic](#you-should-avoid-eviction-listener-to-panic)
55///
56/// # Example: `insert`, `get` and `invalidate`
57///
58/// Cache entries are manually added using [`insert`](#method.insert) of
59/// [`get_with`](#method.get_with) method, and are stored in the cache until either
60/// evicted or manually invalidated:
61///
62/// Here's an example of reading and updating a cache by using multiple asynchronous
63/// tasks with [Tokio][tokio-crate] runtime:
64///
65/// [tokio-crate]: https://crates.io/crates/tokio
66///
67///```rust
68/// // Cargo.toml
69/// //
70/// // [dependencies]
71/// // moka = { version = "0.12", features = ["future"] }
72/// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
73/// // futures-util = "0.3"
74///
75/// use moka::future::Cache;
76///
77/// #[tokio::main]
78/// async fn main() {
79/// const NUM_TASKS: usize = 16;
80/// const NUM_KEYS_PER_TASK: usize = 64;
81///
82/// fn value(n: usize) -> String {
83/// format!("value {n}")
84/// }
85///
86/// // Create a cache that can store up to 10,000 entries.
87/// let cache = Cache::new(10_000);
88///
89/// // Spawn async tasks and write to and read from the cache.
90/// let tasks: Vec<_> = (0..NUM_TASKS)
91/// .map(|i| {
92/// // To share the same cache across the async tasks, clone it.
93/// // This is a cheap operation.
94/// let my_cache = cache.clone();
95/// let start = i * NUM_KEYS_PER_TASK;
96/// let end = (i + 1) * NUM_KEYS_PER_TASK;
97///
98/// tokio::spawn(async move {
99/// // Insert 64 entries. (NUM_KEYS_PER_TASK = 64)
100/// for key in start..end {
101/// my_cache.insert(key, value(key)).await;
102/// // get() returns Option<String>, a clone of the stored value.
103/// assert_eq!(my_cache.get(&key).await, Some(value(key)));
104/// }
105///
106/// // Invalidate every 4 element of the inserted entries.
107/// for key in (start..end).step_by(4) {
108/// my_cache.invalidate(&key).await;
109/// }
110/// })
111/// })
112/// .collect();
113///
114/// // Wait for all tasks to complete.
115/// futures_util::future::join_all(tasks).await;
116///
117/// // Verify the result.
118/// for key in 0..(NUM_TASKS * NUM_KEYS_PER_TASK) {
119/// if key % 4 == 0 {
120/// assert_eq!(cache.get(&key).await, None);
121/// } else {
122/// assert_eq!(cache.get(&key).await, Some(value(key)));
123/// }
124/// }
125/// }
126/// ```
127///
128/// If you want to atomically initialize and insert a value when the key is not
129/// present, you might want to check other insertion methods
130/// [`get_with`](#method.get_with) and [`try_get_with`](#method.try_get_with).
131///
132/// # Avoiding to clone the value at `get`
133///
134/// The return type of `get` method is `Option<V>` instead of `Option<&V>`. Every
135/// time `get` is called for an existing key, it creates a clone of the stored value
136/// `V` and returns it. This is because the `Cache` allows concurrent updates from
137/// threads so a value stored in the cache can be dropped or replaced at any time by
138/// any other thread. `get` cannot return a reference `&V` as it is impossible to
139/// guarantee the value outlives the reference.
140///
141/// If you want to store values that will be expensive to clone, wrap them by
142/// `std::sync::Arc` before storing in a cache. [`Arc`][rustdoc-std-arc] is a
143/// thread-safe reference-counted pointer and its `clone()` method is cheap.
144///
145/// [rustdoc-std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html
146///
147/// # Sharing a cache across asynchronous tasks
148///
149/// To share a cache across async tasks (or OS threads), do one of the followings:
150///
151/// - Create a clone of the cache by calling its `clone` method and pass it to other
152/// task.
153/// - If you are using a web application framework such as Actix Web or Axum, you can
154/// store a cache in Actix Web's [`web::Data`][actix-web-data] or Axum's
155/// [shared state][axum-state-extractor], and access it from each request handler.
156/// - Wrap the cache by a `sync::OnceCell` or `sync::Lazy` from
157/// [once_cell][once-cell-crate] create, and set it to a `static` variable.
158///
159/// Cloning is a cheap operation for `Cache` as it only creates thread-safe
160/// reference-counted pointers to the internal data structures.
161///
162/// [once-cell-crate]: https://crates.io/crates/once_cell
163/// [actix-web-data]: https://docs.rs/actix-web/4.3.1/actix_web/web/struct.Data.html
164/// [axum-state-extractor]: https://docs.rs/axum/latest/axum/#sharing-state-with-handlers
165///
166/// ## No lock is needed
167///
168/// Don't wrap a `Cache` by a lock such as `Mutex` or `RwLock`. All methods provided
169/// by the `Cache` are considered thread-safe, and can be safely called by multiple
170/// async tasks at the same time. No lock is needed.
171///
172/// [once-cell-crate]: https://crates.io/crates/once_cell
173///
174/// # Hashing Algorithm
175///
176/// By default, `Cache` uses a hashing algorithm selected to provide resistance
177/// against HashDoS attacks. It will be the same one used by
178/// `std::collections::HashMap`, which is currently SipHash 1-3.
179///
180/// While SipHash's performance is very competitive for medium sized keys, other
181/// hashing algorithms will outperform it for small keys such as integers as well as
182/// large keys such as long strings. However those algorithms will typically not
183/// protect against attacks such as HashDoS.
184///
185/// The hashing algorithm can be replaced on a per-`Cache` basis using the
186/// [`build_with_hasher`][build-with-hasher-method] method of the `CacheBuilder`.
187/// Many alternative algorithms are available on crates.io, such as the
188/// [AHash][ahash-crate] crate.
189///
190/// [build-with-hasher-method]: ./struct.CacheBuilder.html#method.build_with_hasher
191/// [ahash-crate]: https://crates.io/crates/ahash
192///
193/// # Example: Size-based Eviction
194///
195/// ```rust
196/// // Cargo.toml
197/// //
198/// // [dependencies]
199/// // moka = { version = "0.12", features = ["future"] }
200/// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
201/// // futures-util = "0.3"
202///
203/// use moka::future::Cache;
204///
205/// #[tokio::main]
206/// async fn main() {
207/// // Evict based on the number of entries in the cache.
208/// let cache = Cache::builder()
209/// // Up to 10,000 entries.
210/// .max_capacity(10_000)
211/// // Create the cache.
212/// .build();
213/// cache.insert(1, "one".to_string()).await;
214///
215/// // Evict based on the byte length of strings in the cache.
216/// let cache = Cache::builder()
217/// // A weigher closure takes &K and &V and returns a u32
218/// // representing the relative size of the entry.
219/// .weigher(|_key, value: &String| -> u32 {
220/// value.len().try_into().unwrap_or(u32::MAX)
221/// })
222/// // This cache will hold up to 32MiB of values.
223/// .max_capacity(32 * 1024 * 1024)
224/// .build();
225/// cache.insert(2, "two".to_string()).await;
226/// }
227/// ```
228///
229/// If your cache should not grow beyond a certain size, use the `max_capacity`
230/// method of the [`CacheBuilder`][builder-struct] to set the upper bound. The cache
231/// will try to evict entries that have not been used recently or very often.
232///
233/// At the cache creation time, a weigher closure can be set by the `weigher` method
234/// of the `CacheBuilder`. A weigher closure takes `&K` and `&V` as the arguments and
235/// returns a `u32` representing the relative size of the entry:
236///
237/// - If the `weigher` is _not_ set, the cache will treat each entry has the same
238/// size of `1`. This means the cache will be bounded by the number of entries.
239/// - If the `weigher` is set, the cache will call the weigher to calculate the
240/// weighted size (relative size) on an entry. This means the cache will be bounded
241/// by the total weighted size of entries.
242///
243/// Note that weighted sizes are not used when making eviction selections.
244///
245/// [builder-struct]: ./struct.CacheBuilder.html
246///
247/// # Example: Time-based Expirations
248///
249/// ## Cache-level TTL and TTI policies
250///
251/// `Cache` supports the following cache-level expiration policies:
252///
253/// - **Time to live (TTL)**: A cached entry will be expired after the specified
254/// duration past from `insert`.
255/// - **Time to idle (TTI)**: A cached entry will be expired after the specified
256/// duration past from `get` or `insert`.
257///
258/// They are a cache-level expiration policies; all entries in the cache will have
259/// the same TTL and/or TTI durations. If you want to set different expiration
260/// durations for different entries, see the next section.
261///
262/// ```rust
263/// // Cargo.toml
264/// //
265/// // [dependencies]
266/// // moka = { version = "0.12", features = ["future"] }
267/// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
268/// // futures-util = "0.3"
269///
270/// use moka::future::Cache;
271/// use std::time::Duration;
272///
273/// #[tokio::main]
274/// async fn main() {
275/// let cache = Cache::builder()
276/// // Time to live (TTL): 30 minutes
277/// .time_to_live(Duration::from_secs(30 * 60))
278/// // Time to idle (TTI): 5 minutes
279/// .time_to_idle(Duration::from_secs( 5 * 60))
280/// // Create the cache.
281/// .build();
282///
283/// // This entry will expire after 5 minutes (TTI) if there is no get().
284/// cache.insert(0, "zero").await;
285///
286/// // This get() will extend the entry life for another 5 minutes.
287/// cache.get(&0);
288///
289/// // Even though we keep calling get(), the entry will expire
290/// // after 30 minutes (TTL) from the insert().
291/// }
292/// ```
293///
294/// ## Per-entry expiration policy
295///
296/// `Cache` supports per-entry expiration policy through the `Expiry` trait.
297///
298/// `Expiry` trait provides three callback methods:
299/// [`expire_after_create`][exp-create], [`expire_after_read`][exp-read] and
300/// [`expire_after_update`][exp-update]. When a cache entry is inserted, read or
301/// updated, one of these methods is called. These methods return an
302/// `Option<Duration>`, which is used as the expiration duration of the entry.
303///
304/// `Expiry` trait provides the default implementations of these methods, so you will
305/// implement only the methods you want to customize.
306///
307/// [exp-create]: ../trait.Expiry.html#method.expire_after_create
308/// [exp-read]: ../trait.Expiry.html#method.expire_after_read
309/// [exp-update]: ../trait.Expiry.html#method.expire_after_update
310///
311/// ```rust
312/// // Cargo.toml
313/// //
314/// // [dependencies]
315/// // moka = { version = "0.12", features = ["future"] }
316/// // tokio = { version = "1", features = ["rt-multi-thread", "macros", "time" ] }
317///
318/// use moka::{future::Cache, Expiry};
319/// use std::time::{Duration, Instant};
320///
321/// // In this example, we will create a `future::Cache` with `u32` as the key, and
322/// // `(Expiration, String)` as the value. `Expiration` is an enum to represent the
323/// // expiration of the value, and `String` is the application data of the value.
324///
325/// /// An enum to represent the expiration of a value.
326/// #[derive(Clone, Copy, Debug, Eq, PartialEq)]
327/// pub enum Expiration {
328/// /// The value never expires.
329/// Never,
330/// /// The value expires after a short time. (5 seconds in this example)
331/// AfterShortTime,
332/// /// The value expires after a long time. (15 seconds in this example)
333/// AfterLongTime,
334/// }
335///
336/// impl Expiration {
337/// /// Returns the duration of this expiration.
338/// pub fn as_duration(&self) -> Option<Duration> {
339/// match self {
340/// Expiration::Never => None,
341/// Expiration::AfterShortTime => Some(Duration::from_secs(5)),
342/// Expiration::AfterLongTime => Some(Duration::from_secs(15)),
343/// }
344/// }
345/// }
346///
347/// /// An expiry that implements `moka::Expiry` trait. `Expiry` trait provides the
348/// /// default implementations of three callback methods `expire_after_create`,
349/// /// `expire_after_read`, and `expire_after_update`.
350/// ///
351/// /// In this example, we only override the `expire_after_create` method.
352/// pub struct MyExpiry;
353///
354/// impl Expiry<u32, (Expiration, String)> for MyExpiry {
355/// /// Returns the duration of the expiration of the value that was just
356/// /// created.
357/// fn expire_after_create(
358/// &self,
359/// _key: &u32,
360/// value: &(Expiration, String),
361/// _current_time: Instant,
362/// ) -> Option<Duration> {
363/// let duration = value.0.as_duration();
364/// println!("MyExpiry: expire_after_create called with key {_key} and value {value:?}. Returning {duration:?}.");
365/// duration
366/// }
367/// }
368///
369/// #[tokio::main]
370/// async fn main() {
371/// // Create a `Cache<u32, (Expiration, String)>` with an expiry `MyExpiry` and
372/// // eviction listener.
373/// let expiry = MyExpiry;
374///
375/// let eviction_listener = |key, _value, cause| {
376/// println!("Evicted key {key}. Cause: {cause:?}");
377/// };
378///
379/// let cache = Cache::builder()
380/// .max_capacity(100)
381/// .expire_after(expiry)
382/// .eviction_listener(eviction_listener)
383/// .build();
384///
385/// // Insert some entries into the cache with different expirations.
386/// cache
387/// .get_with(0, async { (Expiration::AfterShortTime, "a".to_string()) })
388/// .await;
389///
390/// cache
391/// .get_with(1, async { (Expiration::AfterLongTime, "b".to_string()) })
392/// .await;
393///
394/// cache
395/// .get_with(2, async { (Expiration::Never, "c".to_string()) })
396/// .await;
397///
398/// // Verify that all the inserted entries exist.
399/// assert!(cache.contains_key(&0));
400/// assert!(cache.contains_key(&1));
401/// assert!(cache.contains_key(&2));
402///
403/// // Sleep for 6 seconds. Key 0 should expire.
404/// println!("\nSleeping for 6 seconds...\n");
405/// tokio::time::sleep(Duration::from_secs(6)).await;
406/// cache.run_pending_tasks().await;
407/// println!("Entry count: {}", cache.entry_count());
408///
409/// // Verify that key 0 has been evicted.
410/// assert!(!cache.contains_key(&0));
411/// assert!(cache.contains_key(&1));
412/// assert!(cache.contains_key(&2));
413///
414/// // Sleep for 10 more seconds. Key 1 should expire.
415/// println!("\nSleeping for 10 seconds...\n");
416/// tokio::time::sleep(Duration::from_secs(10)).await;
417/// cache.run_pending_tasks().await;
418/// println!("Entry count: {}", cache.entry_count());
419///
420/// // Verify that key 1 has been evicted.
421/// assert!(!cache.contains_key(&1));
422/// assert!(cache.contains_key(&2));
423///
424/// // Manually invalidate key 2.
425/// cache.invalidate(&2).await;
426/// assert!(!cache.contains_key(&2));
427///
428/// println!("\nSleeping for a second...\n");
429/// tokio::time::sleep(Duration::from_secs(1)).await;
430/// cache.run_pending_tasks().await;
431/// println!("Entry count: {}", cache.entry_count());
432///
433/// println!("\nDone!");
434/// }
435/// ```
436///
437/// # Example: Eviction Listener
438///
439/// A `Cache` can be configured with an eviction listener, a closure that is called
440/// every time there is a cache eviction. The listener takes three parameters: the
441/// key and value of the evicted entry, and the
442/// [`RemovalCause`](../notification/enum.RemovalCause.html) to indicate why the
443/// entry was evicted.
444///
445/// An eviction listener can be used to keep other data structures in sync with the
446/// cache, for example.
447///
448/// The following example demonstrates how to use an eviction listener with
449/// time-to-live expiration to manage the lifecycle of temporary files on a
450/// filesystem. The cache stores the paths of the files, and when one of them has
451/// expired, the eviction listener will be called with the path, so it can remove the
452/// file from the filesystem.
453///
454/// ```rust
455/// // Cargo.toml
456/// //
457/// // [dependencies]
458/// // anyhow = "1.0"
459/// // uuid = { version = "1.1", features = ["v4"] }
460/// // tokio = { version = "1.18", features = ["fs", "macros", "rt-multi-thread", "sync", "time"] }
461///
462/// use moka::{future::Cache, notification::ListenerFuture};
463/// // FutureExt trait provides the boxed method.
464/// use moka::future::FutureExt;
465///
466/// use anyhow::{anyhow, Context};
467/// use std::{
468/// io,
469/// path::{Path, PathBuf},
470/// sync::Arc,
471/// time::Duration,
472/// };
473/// use tokio::{fs, sync::RwLock};
474/// use uuid::Uuid;
475///
476/// /// The DataFileManager writes, reads and removes data files.
477/// struct DataFileManager {
478/// base_dir: PathBuf,
479/// file_count: usize,
480/// }
481///
482/// impl DataFileManager {
483/// fn new(base_dir: PathBuf) -> Self {
484/// Self {
485/// base_dir,
486/// file_count: 0,
487/// }
488/// }
489///
490/// async fn write_data_file(
491/// &mut self,
492/// key: impl AsRef<str>,
493/// contents: String
494/// ) -> io::Result<PathBuf> {
495/// // Use the key as a part of the filename.
496/// let mut path = self.base_dir.to_path_buf();
497/// path.push(key.as_ref());
498///
499/// assert!(!path.exists(), "Path already exists: {path:?}");
500///
501/// // create the file at the path and write the contents to the file.
502/// fs::write(&path, contents).await?;
503/// self.file_count += 1;
504/// println!("Created a data file at {path:?} (file count: {})", self.file_count);
505/// Ok(path)
506/// }
507///
508/// async fn read_data_file(&self, path: impl AsRef<Path>) -> io::Result<String> {
509/// // Reads the contents of the file at the path, and return the contents.
510/// fs::read_to_string(path).await
511/// }
512///
513/// async fn remove_data_file(&mut self, path: impl AsRef<Path>) -> io::Result<()> {
514/// // Remove the file at the path.
515/// fs::remove_file(path.as_ref()).await?;
516/// self.file_count -= 1;
517/// println!(
518/// "Removed a data file at {:?} (file count: {})",
519/// path.as_ref(),
520/// self.file_count
521/// );
522///
523/// Ok(())
524/// }
525/// }
526///
527/// #[tokio::main]
528/// async fn main() -> anyhow::Result<()> {
529/// // Create an instance of the DataFileManager and wrap it with
530/// // Arc<RwLock<_>> so it can be shared across threads.
531/// let mut base_dir = std::env::temp_dir();
532/// base_dir.push(Uuid::new_v4().as_hyphenated().to_string());
533/// println!("base_dir: {base_dir:?}");
534/// std::fs::create_dir(&base_dir)?;
535///
536/// let file_mgr = DataFileManager::new(base_dir);
537/// let file_mgr = Arc::new(RwLock::new(file_mgr));
538///
539/// let file_mgr1 = Arc::clone(&file_mgr);
540/// let rt = tokio::runtime::Handle::current();
541///
542/// // Create an eviction listener closure.
543/// let eviction_listener = move |k, v: PathBuf, cause| -> ListenerFuture {
544/// println!("\n== An entry has been evicted. k: {k:?}, v: {v:?}, cause: {cause:?}");
545/// let file_mgr2 = Arc::clone(&file_mgr1);
546///
547/// // Create a Future that removes the data file at the path `v`.
548/// async move {
549/// // Acquire the write lock of the DataFileManager.
550/// let mut mgr = file_mgr2.write().await;
551/// // Remove the data file. We must handle error cases here to
552/// // prevent the listener from panicking.
553/// if let Err(_e) = mgr.remove_data_file(v.as_path()).await {
554/// eprintln!("Failed to remove a data file at {v:?}");
555/// }
556/// }
557/// // Convert the regular Future into ListenerFuture. This method is
558/// // provided by moka::future::FutureExt trait.
559/// .boxed()
560/// };
561///
562/// // Create the cache. Set time to live for two seconds and set the
563/// // eviction listener.
564/// let cache = Cache::builder()
565/// .max_capacity(100)
566/// .time_to_live(Duration::from_secs(2))
567/// .async_eviction_listener(eviction_listener)
568/// .build();
569///
570/// // Insert an entry to the cache.
571/// // This will create and write a data file for the key "user1", store the
572/// // path of the file to the cache, and return it.
573/// println!("== try_get_with()");
574/// let key = "user1";
575/// let path = cache
576/// .try_get_with(key, async {
577/// let mut mgr = file_mgr.write().await;
578/// let path = mgr
579/// .write_data_file(key, "user data".into())
580/// .await
581/// .with_context(|| format!("Failed to create a data file"))?;
582/// Ok(path) as anyhow::Result<_>
583/// })
584/// .await
585/// .map_err(|e| anyhow!("{e}"))?;
586///
587/// // Read the data file at the path and print the contents.
588/// println!("\n== read_data_file()");
589/// {
590/// let mgr = file_mgr.read().await;
591/// let contents = mgr
592/// .read_data_file(path.as_path())
593/// .await
594/// .with_context(|| format!("Failed to read data from {path:?}"))?;
595/// println!("contents: {contents}");
596/// }
597///
598/// // Sleep for five seconds. While sleeping, the cache entry for key "user1"
599/// // will be expired and evicted, so the eviction listener will be called to
600/// // remove the file.
601/// tokio::time::sleep(Duration::from_secs(5)).await;
602///
603/// cache.run_pending_tasks();
604///
605/// Ok(())
606/// }
607/// ```
608///
609/// ## You should avoid eviction listener to panic
610///
611/// It is very important to make an eviction listener closure not to panic.
612/// Otherwise, the cache will stop calling the listener after a panic. This is an
613/// intended behavior because the cache cannot know whether it is memory safe or not
614/// to call the panicked listener again.
615///
616/// When a listener panics, the cache will swallow the panic and disable the
617/// listener. If you want to know when a listener panics and the reason of the panic,
618/// you can enable an optional `logging` feature of Moka and check error-level logs.
619///
620/// To enable the `logging`, do the followings:
621///
622/// 1. In `Cargo.toml`, add the crate feature `logging` for `moka`.
623/// 2. Set the logging level for `moka` to `error` or any lower levels (`warn`,
624/// `info`, ...):
625/// - If you are using the `env_logger` crate, you can achieve this by setting
626/// `RUST_LOG` environment variable to `moka=error`.
627/// 3. If you have more than one caches, you may want to set a distinct name for each
628/// cache by using cache builder's [`name`][builder-name-method] method. The name
629/// will appear in the log.
630///
631/// [builder-name-method]: ./struct.CacheBuilder.html#method.name
632///
633pub struct Cache<K, V, S = RandomState> {
634 pub(crate) base: BaseCache<K, V, S>,
635 value_initializer: Arc<ValueInitializer<K, V, S>>,
636
637 #[cfg(test)]
638 schedule_write_op_should_block: AtomicBool,
639}
640
641unsafe impl<K, V, S> Send for Cache<K, V, S>
642where
643 K: Send + Sync,
644 V: Send + Sync,
645 S: Send,
646{
647}
648
649unsafe impl<K, V, S> Sync for Cache<K, V, S>
650where
651 K: Send + Sync,
652 V: Send + Sync,
653 S: Sync,
654{
655}
656
657// NOTE: We cannot do `#[derive(Clone)]` because it will add `Clone` bound to `K`.
658impl<K, V, S> Clone for Cache<K, V, S> {
659 /// Makes a clone of this shared cache.
660 ///
661 /// This operation is cheap as it only creates thread-safe reference counted
662 /// pointers to the shared internal data structures.
663 fn clone(&self) -> Self {
664 Self {
665 base: self.base.clone(),
666 value_initializer: Arc::clone(&self.value_initializer),
667
668 #[cfg(test)]
669 schedule_write_op_should_block: AtomicBool::new(
670 self.schedule_write_op_should_block.load(Ordering::Acquire),
671 ),
672 }
673 }
674}
675
676impl<K, V, S> fmt::Debug for Cache<K, V, S>
677where
678 K: fmt::Debug + Eq + Hash + Send + Sync + 'static,
679 V: fmt::Debug + Clone + Send + Sync + 'static,
680 // TODO: Remove these bounds from S.
681 S: BuildHasher + Clone + Send + Sync + 'static,
682{
683 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
684 let mut d_map = f.debug_map();
685
686 for (k, v) in self {
687 d_map.entry(&k, &v);
688 }
689
690 d_map.finish()
691 }
692}
693
694impl<K, V, S> Cache<K, V, S> {
695 /// Returns cache’s name.
696 pub fn name(&self) -> Option<&str> {
697 self.base.name()
698 }
699
700 /// Returns a read-only cache policy of this cache.
701 ///
702 /// At this time, cache policy cannot be modified after cache creation.
703 /// A future version may support to modify it.
704 pub fn policy(&self) -> Policy {
705 self.base.policy()
706 }
707
708 /// Returns an approximate number of entries in this cache.
709 ///
710 /// The value returned is _an estimate_; the actual count may differ if there are
711 /// concurrent insertions or removals, or if some entries are pending removal due
712 /// to expiration. This inaccuracy can be mitigated by calling
713 /// `run_pending_tasks` first.
714 ///
715 /// # Example
716 ///
717 /// ```rust
718 /// // Cargo.toml
719 /// //
720 /// // [dependencies]
721 /// // moka = { version = "0.12", features = ["future"] }
722 /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
723 /// use moka::future::Cache;
724 ///
725 /// #[tokio::main]
726 /// async fn main() {
727 /// let cache = Cache::new(10);
728 /// cache.insert('n', "Netherland Dwarf").await;
729 /// cache.insert('l', "Lop Eared").await;
730 /// cache.insert('d', "Dutch").await;
731 ///
732 /// // Ensure an entry exists.
733 /// assert!(cache.contains_key(&'n'));
734 ///
735 /// // However, followings may print stale number zeros instead of threes.
736 /// println!("{}", cache.entry_count()); // -> 0
737 /// println!("{}", cache.weighted_size()); // -> 0
738 ///
739 /// // To mitigate the inaccuracy, call `run_pending_tasks` to run pending
740 /// // internal tasks.
741 /// cache.run_pending_tasks().await;
742 ///
743 /// // Followings will print the actual numbers.
744 /// println!("{}", cache.entry_count()); // -> 3
745 /// println!("{}", cache.weighted_size()); // -> 3
746 /// }
747 /// ```
748 ///
749 pub fn entry_count(&self) -> u64 {
750 self.base.entry_count()
751 }
752
753 /// Returns an approximate total weighted size of entries in this cache.
754 ///
755 /// The value returned is _an estimate_; the actual size may differ if there are
756 /// concurrent insertions or removals, or if some entries are pending removal due
757 /// to expiration. This inaccuracy can be mitigated by calling
758 /// `run_pending_tasks` first. See [`entry_count`](#method.entry_count) for a
759 /// sample code.
760 pub fn weighted_size(&self) -> u64 {
761 self.base.weighted_size()
762 }
763
764 #[cfg(feature = "unstable-debug-counters")]
765 #[cfg_attr(docsrs, doc(cfg(feature = "unstable-debug-counters")))]
766 pub async fn debug_stats(&self) -> CacheDebugStats {
767 self.base.debug_stats().await
768 }
769}
770
771impl<K, V> Cache<K, V, RandomState>
772where
773 K: Hash + Eq + Send + Sync + 'static,
774 V: Clone + Send + Sync + 'static,
775{
776 /// Constructs a new `Cache<K, V>` that will store up to the `max_capacity`.
777 ///
778 /// To adjust various configuration knobs such as `initial_capacity` or
779 /// `time_to_live`, use the [`CacheBuilder`][builder-struct].
780 ///
781 /// [builder-struct]: ./struct.CacheBuilder.html
782 pub fn new(max_capacity: u64) -> Self {
783 let build_hasher = RandomState::default();
784 Self::with_everything(
785 None,
786 Some(max_capacity),
787 None,
788 build_hasher,
789 None,
790 EvictionPolicy::default(),
791 None,
792 ExpirationPolicy::default(),
793 HousekeeperConfig::default(),
794 false,
795 Clock::default(),
796 )
797 }
798
799 /// Returns a [`CacheBuilder`][builder-struct], which can builds a `Cache` with
800 /// various configuration knobs.
801 ///
802 /// [builder-struct]: ./struct.CacheBuilder.html
803 pub fn builder() -> CacheBuilder<K, V, Cache<K, V, RandomState>> {
804 CacheBuilder::default()
805 }
806}
807
808impl<K, V, S> Cache<K, V, S>
809where
810 K: Hash + Eq + Send + Sync + 'static,
811 V: Clone + Send + Sync + 'static,
812 S: BuildHasher + Clone + Send + Sync + 'static,
813{
814 // https://rust-lang.github.io/rust-clippy/master/index.html#too_many_arguments
815 #[allow(clippy::too_many_arguments)]
816 pub(crate) fn with_everything(
817 name: Option<String>,
818 max_capacity: Option<u64>,
819 initial_capacity: Option<usize>,
820 build_hasher: S,
821 weigher: Option<Weigher<K, V>>,
822 eviction_policy: EvictionPolicy,
823 eviction_listener: Option<AsyncEvictionListener<K, V>>,
824 expiration_policy: ExpirationPolicy<K, V>,
825 housekeeper_config: HousekeeperConfig,
826 invalidator_enabled: bool,
827 clock: Clock,
828 ) -> Self {
829 Self {
830 base: BaseCache::new(
831 name,
832 max_capacity,
833 initial_capacity,
834 build_hasher.clone(),
835 weigher,
836 eviction_policy,
837 eviction_listener,
838 expiration_policy,
839 housekeeper_config,
840 invalidator_enabled,
841 clock,
842 ),
843 value_initializer: Arc::new(ValueInitializer::with_hasher(build_hasher)),
844
845 #[cfg(test)]
846 schedule_write_op_should_block: Default::default(), // false
847 }
848 }
849
850 /// Returns `true` if the cache contains a value for the key.
851 ///
852 /// Unlike the `get` method, this method is not considered a cache read operation,
853 /// so it does not update the historic popularity estimator or reset the idle
854 /// timer for the key.
855 ///
856 /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
857 /// on the borrowed form _must_ match those for the key type.
858 pub fn contains_key<Q>(&self, key: &Q) -> bool
859 where
860 Q: Equivalent<K> + Hash + ?Sized,
861 {
862 self.base.contains_key_with_hash(key, self.base.hash(key))
863 }
864
865 /// Returns a _clone_ of the value corresponding to the key.
866 ///
867 /// If you want to store values that will be expensive to clone, wrap them by
868 /// `std::sync::Arc` before storing in a cache. [`Arc`][rustdoc-std-arc] is a
869 /// thread-safe reference-counted pointer and its `clone()` method is cheap.
870 ///
871 /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
872 /// on the borrowed form _must_ match those for the key type.
873 ///
874 /// [rustdoc-std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html
875 pub async fn get<Q>(&self, key: &Q) -> Option<V>
876 where
877 Q: Equivalent<K> + Hash + ?Sized,
878 {
879 let ignore_if = None as Option<&mut fn(&V) -> bool>;
880
881 self.base
882 .get_with_hash(key, self.base.hash(key), ignore_if, false, true)
883 .await
884 .map(Entry::into_value)
885 }
886
887 /// Takes a key `K` and returns an [`OwnedKeyEntrySelector`] that can be used to
888 /// select or insert an entry.
889 ///
890 /// [`OwnedKeyEntrySelector`]: ./struct.OwnedKeyEntrySelector.html
891 ///
892 /// # Example
893 ///
894 /// ```rust
895 /// // Cargo.toml
896 /// //
897 /// // [dependencies]
898 /// // moka = { version = "0.12", features = ["future"] }
899 /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
900 ///
901 /// use moka::future::Cache;
902 ///
903 /// #[tokio::main]
904 /// async fn main() {
905 /// let cache: Cache<String, u32> = Cache::new(100);
906 /// let key = "key1".to_string();
907 ///
908 /// let entry = cache.entry(key.clone()).or_insert(3).await;
909 /// assert!(entry.is_fresh());
910 /// assert_eq!(entry.key(), &key);
911 /// assert_eq!(entry.into_value(), 3);
912 ///
913 /// let entry = cache.entry(key).or_insert(6).await;
914 /// // Not fresh because the value was already in the cache.
915 /// assert!(!entry.is_fresh());
916 /// assert_eq!(entry.into_value(), 3);
917 /// }
918 /// ```
919 pub fn entry(&self, key: K) -> OwnedKeyEntrySelector<'_, K, V, S>
920 where
921 K: Hash + Eq,
922 {
923 let hash = self.base.hash(&key);
924 OwnedKeyEntrySelector::new(key, hash, self)
925 }
926
927 /// Takes a reference `&Q` of a key and returns an [`RefKeyEntrySelector`] that
928 /// can be used to select or insert an entry.
929 ///
930 /// [`RefKeyEntrySelector`]: ./struct.RefKeyEntrySelector.html
931 ///
932 /// # Example
933 ///
934 /// ```rust
935 /// // Cargo.toml
936 /// //
937 /// // [dependencies]
938 /// // moka = { version = "0.12", features = ["future"] }
939 /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
940 ///
941 /// use moka::future::Cache;
942 ///
943 /// #[tokio::main]
944 /// async fn main() {
945 /// let cache: Cache<String, u32> = Cache::new(100);
946 /// let key = "key1".to_string();
947 ///
948 /// let entry = cache.entry_by_ref(&key).or_insert(3).await;
949 /// assert!(entry.is_fresh());
950 /// assert_eq!(entry.key(), &key);
951 /// assert_eq!(entry.into_value(), 3);
952 ///
953 /// let entry = cache.entry_by_ref(&key).or_insert(6).await;
954 /// // Not fresh because the value was already in the cache.
955 /// assert!(!entry.is_fresh());
956 /// assert_eq!(entry.into_value(), 3);
957 /// }
958 /// ```
959 pub fn entry_by_ref<'a, Q>(&'a self, key: &'a Q) -> RefKeyEntrySelector<'a, K, Q, V, S>
960 where
961 Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
962 {
963 let hash = self.base.hash(key);
964 RefKeyEntrySelector::new(key, hash, self)
965 }
966
967 /// Returns a _clone_ of the value corresponding to the key. If the value does
968 /// not exist, resolve the `init` future and inserts the output.
969 ///
970 /// # Concurrent calls on the same key
971 ///
972 /// This method guarantees that concurrent calls on the same not-existing key are
973 /// coalesced into one evaluation of the `init` future. Only one of the calls
974 /// evaluates its future, and other calls wait for that future to resolve.
975 ///
976 /// The following code snippet demonstrates this behavior:
977 ///
978 /// ```rust
979 /// // Cargo.toml
980 /// //
981 /// // [dependencies]
982 /// // moka = { version = "0.12", features = ["future"] }
983 /// // futures-util = "0.3"
984 /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
985 /// use moka::future::Cache;
986 /// use std::sync::Arc;
987 ///
988 /// #[tokio::main]
989 /// async fn main() {
990 /// const TEN_MIB: usize = 10 * 1024 * 1024; // 10MiB
991 /// let cache = Cache::new(100);
992 ///
993 /// // Spawn four async tasks.
994 /// let tasks: Vec<_> = (0..4_u8)
995 /// .map(|task_id| {
996 /// let my_cache = cache.clone();
997 /// tokio::spawn(async move {
998 /// println!("Task {task_id} started.");
999 ///
1000 /// // Insert and get the value for key1. Although all four async
1001 /// // tasks will call `get_with` at the same time, the `init`
1002 /// // async block must be resolved only once.
1003 /// let value = my_cache
1004 /// .get_with("key1", async move {
1005 /// println!("Task {task_id} inserting a value.");
1006 /// Arc::new(vec![0u8; TEN_MIB])
1007 /// })
1008 /// .await;
1009 ///
1010 /// // Ensure the value exists now.
1011 /// assert_eq!(value.len(), TEN_MIB);
1012 /// assert!(my_cache.get(&"key1").await.is_some());
1013 ///
1014 /// println!("Task {task_id} got the value. (len: {})", value.len());
1015 /// })
1016 /// })
1017 /// .collect();
1018 ///
1019 /// // Run all tasks concurrently and wait for them to complete.
1020 /// futures_util::future::join_all(tasks).await;
1021 /// }
1022 /// ```
1023 ///
1024 /// **A Sample Result**
1025 ///
1026 /// - The `init` future (async black) was resolved exactly once by task 3.
1027 /// - Other tasks were blocked until task 3 inserted the value.
1028 ///
1029 /// ```console
1030 /// Task 0 started.
1031 /// Task 3 started.
1032 /// Task 1 started.
1033 /// Task 2 started.
1034 /// Task 3 inserting a value.
1035 /// Task 3 got the value. (len: 10485760)
1036 /// Task 0 got the value. (len: 10485760)
1037 /// Task 1 got the value. (len: 10485760)
1038 /// Task 2 got the value. (len: 10485760)
1039 /// ```
1040 ///
1041 /// # Panics
1042 ///
1043 /// This method panics when the `init` future has panicked. When it happens, only
1044 /// the caller whose `init` future panicked will get the panic (e.g. only task 3
1045 /// in the above sample). If there are other calls in progress (e.g. task 0, 1
1046 /// and 2 above), this method will restart and resolve one of the remaining
1047 /// `init` futures.
1048 ///
1049 pub async fn get_with(&self, key: K, init: impl Future<Output = V>) -> V {
1050 futures_util::pin_mut!(init);
1051 let hash = self.base.hash(&key);
1052 let key = Arc::new(key);
1053 let replace_if = None as Option<fn(&V) -> bool>;
1054 self.get_or_insert_with_hash_and_fun(key, hash, init, replace_if, false)
1055 .await
1056 .into_value()
1057 }
1058
1059 /// Similar to [`get_with`](#method.get_with), but instead of passing an owned
1060 /// key, you can pass a reference to the key. If the key does not exist in the
1061 /// cache, the key will be cloned to create new entry in the cache.
1062 pub async fn get_with_by_ref<Q>(&self, key: &Q, init: impl Future<Output = V>) -> V
1063 where
1064 Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
1065 {
1066 futures_util::pin_mut!(init);
1067 let hash = self.base.hash(key);
1068 let replace_if = None as Option<fn(&V) -> bool>;
1069 self.get_or_insert_with_hash_by_ref_and_fun(key, hash, init, replace_if, false)
1070 .await
1071 .into_value()
1072 }
1073
1074 /// TODO: Remove this in v0.13.0.
1075 /// Deprecated, replaced with
1076 /// [`entry()::or_insert_with_if()`](./struct.OwnedKeyEntrySelector.html#method.or_insert_with_if)
1077 #[deprecated(since = "0.10.0", note = "Replaced with `entry().or_insert_with_if()`")]
1078 pub async fn get_with_if(
1079 &self,
1080 key: K,
1081 init: impl Future<Output = V>,
1082 replace_if: impl FnMut(&V) -> bool + Send,
1083 ) -> V {
1084 futures_util::pin_mut!(init);
1085 let hash = self.base.hash(&key);
1086 let key = Arc::new(key);
1087 self.get_or_insert_with_hash_and_fun(key, hash, init, Some(replace_if), false)
1088 .await
1089 .into_value()
1090 }
1091
1092 /// Returns a _clone_ of the value corresponding to the key. If the value does
1093 /// not exist, resolves the `init` future, and inserts the value if `Some(value)`
1094 /// was returned. If `None` was returned from the future, this method does not
1095 /// insert a value and returns `None`.
1096 ///
1097 /// # Concurrent calls on the same key
1098 ///
1099 /// This method guarantees that concurrent calls on the same not-existing key are
1100 /// coalesced into one evaluation of the `init` future. Only one of the calls
1101 /// evaluates its future, and other calls wait for that future to resolve.
1102 ///
1103 /// The following code snippet demonstrates this behavior:
1104 ///
1105 /// ```rust
1106 /// // Cargo.toml
1107 /// //
1108 /// // [dependencies]
1109 /// // moka = { version = "0.12", features = ["future"] }
1110 /// // futures-util = "0.3"
1111 /// // reqwest = "0.11"
1112 /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
1113 /// use moka::future::Cache;
1114 ///
1115 /// // This async function tries to get HTML from the given URI.
1116 /// async fn get_html(task_id: u8, uri: &str) -> Option<String> {
1117 /// println!("get_html() called by task {task_id}.");
1118 /// reqwest::get(uri).await.ok()?.text().await.ok()
1119 /// }
1120 ///
1121 /// #[tokio::main]
1122 /// async fn main() {
1123 /// let cache = Cache::new(100);
1124 ///
1125 /// // Spawn four async tasks.
1126 /// let tasks: Vec<_> = (0..4_u8)
1127 /// .map(|task_id| {
1128 /// let my_cache = cache.clone();
1129 /// tokio::spawn(async move {
1130 /// println!("Task {task_id} started.");
1131 ///
1132 /// // Try to insert and get the value for key1. Although
1133 /// // all four async tasks will call `try_get_with`
1134 /// // at the same time, get_html() must be called only once.
1135 /// let value = my_cache
1136 /// .optionally_get_with(
1137 /// "key1",
1138 /// get_html(task_id, "https://www.rust-lang.org"),
1139 /// ).await;
1140 ///
1141 /// // Ensure the value exists now.
1142 /// assert!(value.is_some());
1143 /// assert!(my_cache.get(&"key1").await.is_some());
1144 ///
1145 /// println!(
1146 /// "Task {task_id} got the value. (len: {})",
1147 /// value.unwrap().len()
1148 /// );
1149 /// })
1150 /// })
1151 /// .collect();
1152 ///
1153 /// // Run all tasks concurrently and wait for them to complete.
1154 /// futures_util::future::join_all(tasks).await;
1155 /// }
1156 /// ```
1157 ///
1158 /// **A Sample Result**
1159 ///
1160 /// - `get_html()` was called exactly once by task 2.
1161 /// - Other tasks were blocked until task 2 inserted the value.
1162 ///
1163 /// ```console
1164 /// Task 1 started.
1165 /// Task 0 started.
1166 /// Task 2 started.
1167 /// Task 3 started.
1168 /// get_html() called by task 2.
1169 /// Task 2 got the value. (len: 19419)
1170 /// Task 1 got the value. (len: 19419)
1171 /// Task 0 got the value. (len: 19419)
1172 /// Task 3 got the value. (len: 19419)
1173 /// ```
1174 ///
1175 /// # Panics
1176 ///
1177 /// This method panics when the `init` future has panicked. When it happens, only
1178 /// the caller whose `init` future panicked will get the panic (e.g. only task 2
1179 /// in the above sample). If there are other calls in progress (e.g. task 0, 1
1180 /// and 3 above), this method will restart and resolve one of the remaining
1181 /// `init` futures.
1182 ///
1183 pub async fn optionally_get_with<F>(&self, key: K, init: F) -> Option<V>
1184 where
1185 F: Future<Output = Option<V>>,
1186 {
1187 futures_util::pin_mut!(init);
1188 let hash = self.base.hash(&key);
1189 let key = Arc::new(key);
1190 self.get_or_optionally_insert_with_hash_and_fun(key, hash, init, false)
1191 .await
1192 .map(Entry::into_value)
1193 }
1194
1195 /// Similar to [`optionally_get_with`](#method.optionally_get_with), but instead
1196 /// of passing an owned key, you can pass a reference to the key. If the key does
1197 /// not exist in the cache, the key will be cloned to create new entry in the
1198 /// cache.
1199 pub async fn optionally_get_with_by_ref<F, Q>(&self, key: &Q, init: F) -> Option<V>
1200 where
1201 F: Future<Output = Option<V>>,
1202 Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
1203 {
1204 futures_util::pin_mut!(init);
1205 let hash = self.base.hash(key);
1206 self.get_or_optionally_insert_with_hash_by_ref_and_fun(key, hash, init, false)
1207 .await
1208 .map(Entry::into_value)
1209 }
1210
1211 /// Returns a _clone_ of the value corresponding to the key. If the value does
1212 /// not exist, resolves the `init` future, and inserts the value if `Ok(value)`
1213 /// was returned. If `Err(_)` was returned from the future, this method does not
1214 /// insert a value and returns the `Err` wrapped by [`std::sync::Arc`][std-arc].
1215 ///
1216 /// [std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html
1217 ///
1218 /// # Concurrent calls on the same key
1219 ///
1220 /// This method guarantees that concurrent calls on the same not-existing key are
1221 /// coalesced into one evaluation of the `init` future (as long as these
1222 /// futures return the same error type). Only one of the calls evaluates its
1223 /// future, and other calls wait for that future to resolve.
1224 ///
1225 /// The following code snippet demonstrates this behavior:
1226 ///
1227 /// ```rust
1228 /// // Cargo.toml
1229 /// //
1230 /// // [dependencies]
1231 /// // moka = { version = "0.12", features = ["future"] }
1232 /// // futures-util = "0.3"
1233 /// // reqwest = "0.11"
1234 /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
1235 /// use moka::future::Cache;
1236 ///
1237 /// // This async function tries to get HTML from the given URI.
1238 /// async fn get_html(task_id: u8, uri: &str) -> Result<String, reqwest::Error> {
1239 /// println!("get_html() called by task {task_id}.");
1240 /// reqwest::get(uri).await?.text().await
1241 /// }
1242 ///
1243 /// #[tokio::main]
1244 /// async fn main() {
1245 /// let cache = Cache::new(100);
1246 ///
1247 /// // Spawn four async tasks.
1248 /// let tasks: Vec<_> = (0..4_u8)
1249 /// .map(|task_id| {
1250 /// let my_cache = cache.clone();
1251 /// tokio::spawn(async move {
1252 /// println!("Task {task_id} started.");
1253 ///
1254 /// // Try to insert and get the value for key1. Although
1255 /// // all four async tasks will call `try_get_with`
1256 /// // at the same time, get_html() must be called only once.
1257 /// let value = my_cache
1258 /// .try_get_with(
1259 /// "key1",
1260 /// get_html(task_id, "https://www.rust-lang.org"),
1261 /// ).await;
1262 ///
1263 /// // Ensure the value exists now.
1264 /// assert!(value.is_ok());
1265 /// assert!(my_cache.get(&"key1").await.is_some());
1266 ///
1267 /// println!(
1268 /// "Task {task_id} got the value. (len: {})",
1269 /// value.unwrap().len()
1270 /// );
1271 /// })
1272 /// })
1273 /// .collect();
1274 ///
1275 /// // Run all tasks concurrently and wait for them to complete.
1276 /// futures_util::future::join_all(tasks).await;
1277 /// }
1278 /// ```
1279 ///
1280 /// **A Sample Result**
1281 ///
1282 /// - `get_html()` was called exactly once by task 2.
1283 /// - Other tasks were blocked until task 2 inserted the value.
1284 ///
1285 /// ```console
1286 /// Task 1 started.
1287 /// Task 0 started.
1288 /// Task 2 started.
1289 /// Task 3 started.
1290 /// get_html() called by task 2.
1291 /// Task 2 got the value. (len: 19419)
1292 /// Task 1 got the value. (len: 19419)
1293 /// Task 0 got the value. (len: 19419)
1294 /// Task 3 got the value. (len: 19419)
1295 /// ```
1296 ///
1297 /// # Panics
1298 ///
1299 /// This method panics when the `init` future has panicked. When it happens, only
1300 /// the caller whose `init` future panicked will get the panic (e.g. only task 2
1301 /// in the above sample). If there are other calls in progress (e.g. task 0, 1
1302 /// and 3 above), this method will restart and resolve one of the remaining
1303 /// `init` futures.
1304 ///
1305 pub async fn try_get_with<F, E>(&self, key: K, init: F) -> Result<V, Arc<E>>
1306 where
1307 F: Future<Output = Result<V, E>>,
1308 E: Send + Sync + 'static,
1309 {
1310 futures_util::pin_mut!(init);
1311 let hash = self.base.hash(&key);
1312 let key = Arc::new(key);
1313 self.get_or_try_insert_with_hash_and_fun(key, hash, init, false)
1314 .await
1315 .map(Entry::into_value)
1316 }
1317
1318 /// Similar to [`try_get_with`](#method.try_get_with), but instead of passing an
1319 /// owned key, you can pass a reference to the key. If the key does not exist in
1320 /// the cache, the key will be cloned to create new entry in the cache.
1321 pub async fn try_get_with_by_ref<F, E, Q>(&self, key: &Q, init: F) -> Result<V, Arc<E>>
1322 where
1323 F: Future<Output = Result<V, E>>,
1324 E: Send + Sync + 'static,
1325 Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
1326 {
1327 futures_util::pin_mut!(init);
1328 let hash = self.base.hash(key);
1329 self.get_or_try_insert_with_hash_by_ref_and_fun(key, hash, init, false)
1330 .await
1331 .map(Entry::into_value)
1332 }
1333
1334 /// Inserts a key-value pair into the cache.
1335 ///
1336 /// If the cache has this key present, the value is updated.
1337 pub async fn insert(&self, key: K, value: V) {
1338 let hash = self.base.hash(&key);
1339 let key = Arc::new(key);
1340 self.insert_with_hash(key, hash, value).await;
1341 }
1342
1343 /// Discards any cached value for the key.
1344 ///
1345 /// If you need to get the value that has been discarded, use the
1346 /// [`remove`](#method.remove) method instead.
1347 ///
1348 /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
1349 /// on the borrowed form _must_ match those for the key type.
1350 pub async fn invalidate<Q>(&self, key: &Q)
1351 where
1352 Q: Equivalent<K> + Hash + ?Sized,
1353 {
1354 let hash = self.base.hash(key);
1355 self.invalidate_with_hash(key, hash, false).await;
1356 }
1357
1358 /// Discards any cached value for the key and returns a _clone_ of the value.
1359 ///
1360 /// If you do not need to get the value that has been discarded, use the
1361 /// [`invalidate`](#method.invalidate) method instead.
1362 ///
1363 /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
1364 /// on the borrowed form _must_ match those for the key type.
1365 pub async fn remove<Q>(&self, key: &Q) -> Option<V>
1366 where
1367 Q: Equivalent<K> + Hash + ?Sized,
1368 {
1369 let hash = self.base.hash(key);
1370 self.invalidate_with_hash(key, hash, true).await
1371 }
1372
1373 /// Discards all cached values.
1374 ///
1375 /// This method returns immediately by just setting the current time as the
1376 /// invalidation time. `get` and other retrieval methods are guaranteed not to
1377 /// return the entries inserted before or at the invalidation time.
1378 ///
1379 /// The actual removal of the invalidated entries is done as a maintenance task
1380 /// driven by a user thread. For more details, see
1381 /// [the Maintenance Tasks section](../index.html#maintenance-tasks) in the crate
1382 /// level documentation.
1383 ///
1384 /// Like the `invalidate` method, this method does not clear the historic
1385 /// popularity estimator of keys so that it retains the client activities of
1386 /// trying to retrieve an item.
1387 pub fn invalidate_all(&self) {
1388 self.base.invalidate_all();
1389 }
1390
1391 /// Discards cached values that satisfy a predicate.
1392 ///
1393 /// `invalidate_entries_if` takes a closure that returns `true` or `false`. The
1394 /// closure is called against each cached entry inserted before or at the time
1395 /// when this method was called. If the closure returns `true` that entry will be
1396 /// evicted from the cache.
1397 ///
1398 /// This method returns immediately by not actually removing the invalidated
1399 /// entries. Instead, it just sets the predicate to the cache with the time when
1400 /// this method was called. The actual removal of the invalidated entries is done
1401 /// as a maintenance task driven by a user thread. For more details, see
1402 /// [the Maintenance Tasks section](../index.html#maintenance-tasks) in the crate
1403 /// level documentation.
1404 ///
1405 /// Also the `get` and other retrieval methods will apply the closure to a cached
1406 /// entry to determine if it should have been invalidated. Therefore, it is
1407 /// guaranteed that these methods must not return invalidated values.
1408 ///
1409 /// Note that you must call
1410 /// [`CacheBuilder::support_invalidation_closures`][support-invalidation-closures]
1411 /// at the cache creation time as the cache needs to maintain additional internal
1412 /// data structures to support this method. Otherwise, calling this method will
1413 /// fail with a
1414 /// [`PredicateError::InvalidationClosuresDisabled`][invalidation-disabled-error].
1415 ///
1416 /// Like the `invalidate` method, this method does not clear the historic
1417 /// popularity estimator of keys so that it retains the client activities of
1418 /// trying to retrieve an item.
1419 ///
1420 /// [support-invalidation-closures]:
1421 /// ./struct.CacheBuilder.html#method.support_invalidation_closures
1422 /// [invalidation-disabled-error]:
1423 /// ../enum.PredicateError.html#variant.InvalidationClosuresDisabled
1424 pub fn invalidate_entries_if<F>(&self, predicate: F) -> Result<PredicateId, PredicateError>
1425 where
1426 F: Fn(&K, &V) -> bool + Send + Sync + 'static,
1427 {
1428 self.base.invalidate_entries_if(Arc::new(predicate))
1429 }
1430
1431 /// Creates an iterator visiting all key-value pairs in arbitrary order. The
1432 /// iterator element type is `(Arc<K>, V)`, where `V` is a clone of a stored
1433 /// value.
1434 ///
1435 /// Iterators do not block concurrent reads and writes on the cache. An entry can
1436 /// be inserted to, invalidated or evicted from a cache while iterators are alive
1437 /// on the same cache.
1438 ///
1439 /// Unlike the `get` method, visiting entries via an iterator do not update the
1440 /// historic popularity estimator or reset idle timers for keys.
1441 ///
1442 /// # Guarantees
1443 ///
1444 /// In order to allow concurrent access to the cache, iterator's `next` method
1445 /// does _not_ guarantee the following:
1446 ///
1447 /// - It does not guarantee to return a key-value pair (an entry) if its key has
1448 /// been inserted to the cache _after_ the iterator was created.
1449 /// - Such an entry may or may not be returned depending on key's hash and
1450 /// timing.
1451 ///
1452 /// and the `next` method guarantees the followings:
1453 ///
1454 /// - It guarantees not to return the same entry more than once.
1455 /// - It guarantees not to return an entry if it has been removed from the cache
1456 /// after the iterator was created.
1457 /// - Note: An entry can be removed by following reasons:
1458 /// - Manually invalidated.
1459 /// - Expired (e.g. time-to-live).
1460 /// - Evicted as the cache capacity exceeded.
1461 ///
1462 /// # Examples
1463 ///
1464 /// ```rust
1465 /// // Cargo.toml
1466 /// //
1467 /// // [dependencies]
1468 /// // moka = { version = "0.12", features = ["future"] }
1469 /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
1470 /// use moka::future::Cache;
1471 ///
1472 /// #[tokio::main]
1473 /// async fn main() {
1474 /// let cache = Cache::new(100);
1475 /// cache.insert("Julia", 14).await;
1476 ///
1477 /// let mut iter = cache.iter();
1478 /// let (k, v) = iter.next().unwrap(); // (Arc<K>, V)
1479 /// assert_eq!(*k, "Julia");
1480 /// assert_eq!(v, 14);
1481 ///
1482 /// assert!(iter.next().is_none());
1483 /// }
1484 /// ```
1485 ///
1486 pub fn iter(&self) -> Iter<'_, K, V> {
1487 use crate::common::iter::{Iter as InnerIter, ScanningGet};
1488
1489 let inner = InnerIter::with_single_cache_segment(&self.base, self.base.num_cht_segments());
1490 Iter::new(inner)
1491 }
1492
1493 /// Performs any pending maintenance operations needed by the cache.
1494 pub async fn run_pending_tasks(&self) {
1495 if let Some(hk) = &self.base.housekeeper {
1496 self.base.retry_interrupted_ops().await;
1497 hk.run_pending_tasks(Arc::clone(&self.base.inner)).await;
1498 }
1499 }
1500}
1501
1502impl<'a, K, V, S> IntoIterator for &'a Cache<K, V, S>
1503where
1504 K: Hash + Eq + Send + Sync + 'static,
1505 V: Clone + Send + Sync + 'static,
1506 S: BuildHasher + Clone + Send + Sync + 'static,
1507{
1508 type Item = (Arc<K>, V);
1509
1510 type IntoIter = Iter<'a, K, V>;
1511
1512 fn into_iter(self) -> Self::IntoIter {
1513 self.iter()
1514 }
1515}
1516
1517//
1518// private methods
1519//
1520impl<K, V, S> Cache<K, V, S>
1521where
1522 K: Hash + Eq + Send + Sync + 'static,
1523 V: Clone + Send + Sync + 'static,
1524 S: BuildHasher + Clone + Send + Sync + 'static,
1525{
1526 pub(crate) async fn get_or_insert_with_hash_and_fun(
1527 &self,
1528 key: Arc<K>,
1529 hash: u64,
1530 init: Pin<&mut impl Future<Output = V>>,
1531 mut replace_if: Option<impl FnMut(&V) -> bool + Send>,
1532 need_key: bool,
1533 ) -> Entry<K, V> {
1534 let maybe_entry = self
1535 .base
1536 .get_with_hash(&*key, hash, replace_if.as_mut(), need_key, true)
1537 .await;
1538 if let Some(entry) = maybe_entry {
1539 entry
1540 } else {
1541 self.insert_with_hash_and_fun(key, hash, init, replace_if, need_key)
1542 .await
1543 }
1544 }
1545
1546 pub(crate) async fn get_or_insert_with_hash_by_ref_and_fun<Q>(
1547 &self,
1548 key: &Q,
1549 hash: u64,
1550 init: Pin<&mut impl Future<Output = V>>,
1551 mut replace_if: Option<impl FnMut(&V) -> bool + Send>,
1552 need_key: bool,
1553 ) -> Entry<K, V>
1554 where
1555 Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
1556 {
1557 let maybe_entry = self
1558 .base
1559 .get_with_hash(key, hash, replace_if.as_mut(), need_key, true)
1560 .await;
1561 if let Some(entry) = maybe_entry {
1562 entry
1563 } else {
1564 let key = Arc::new(key.to_owned());
1565 self.insert_with_hash_and_fun(key, hash, init, replace_if, need_key)
1566 .await
1567 }
1568 }
1569
1570 async fn insert_with_hash_and_fun(
1571 &self,
1572 key: Arc<K>,
1573 hash: u64,
1574 init: Pin<&mut impl Future<Output = V>>,
1575 replace_if: Option<impl FnMut(&V) -> bool + Send>,
1576 need_key: bool,
1577 ) -> Entry<K, V> {
1578 let k = if need_key {
1579 Some(Arc::clone(&key))
1580 } else {
1581 None
1582 };
1583
1584 let type_id = ValueInitializer::<K, V, S>::type_id_for_get_with();
1585 let post_init = ValueInitializer::<K, V, S>::post_init_for_get_with;
1586
1587 match self
1588 .value_initializer
1589 .try_init_or_read(&key, hash, type_id, self, replace_if, init, post_init)
1590 .await
1591 {
1592 InitResult::Initialized(v) => {
1593 crossbeam_epoch::pin().flush();
1594 Entry::new(k, v, true, false)
1595 }
1596 InitResult::ReadExisting(v) => Entry::new(k, v, false, false),
1597 InitResult::InitErr(_) => unreachable!(),
1598 }
1599 }
1600
1601 pub(crate) async fn get_or_insert_with_hash(
1602 &self,
1603 key: Arc<K>,
1604 hash: u64,
1605 init: impl FnOnce() -> V,
1606 ) -> Entry<K, V> {
1607 match self
1608 .base
1609 .get_with_hash(&*key, hash, never_ignore(), true, true)
1610 .await
1611 {
1612 Some(entry) => entry,
1613 None => {
1614 let value = init();
1615 self.insert_with_hash(Arc::clone(&key), hash, value.clone())
1616 .await;
1617 Entry::new(Some(key), value, true, false)
1618 }
1619 }
1620 }
1621
1622 pub(crate) async fn get_or_insert_with_hash_by_ref<Q>(
1623 &self,
1624 key: &Q,
1625 hash: u64,
1626 init: impl FnOnce() -> V,
1627 ) -> Entry<K, V>
1628 where
1629 Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
1630 {
1631 match self
1632 .base
1633 .get_with_hash(key, hash, never_ignore(), true, true)
1634 .await
1635 {
1636 Some(entry) => entry,
1637 None => {
1638 let key = Arc::new(key.to_owned());
1639 let value = init();
1640 self.insert_with_hash(Arc::clone(&key), hash, value.clone())
1641 .await;
1642 Entry::new(Some(key), value, true, false)
1643 }
1644 }
1645 }
1646
1647 pub(crate) async fn get_or_optionally_insert_with_hash_and_fun<F>(
1648 &self,
1649 key: Arc<K>,
1650 hash: u64,
1651 init: Pin<&mut F>,
1652 need_key: bool,
1653 ) -> Option<Entry<K, V>>
1654 where
1655 F: Future<Output = Option<V>>,
1656 {
1657 let entry = self
1658 .base
1659 .get_with_hash(&*key, hash, never_ignore(), need_key, true)
1660 .await;
1661 if entry.is_some() {
1662 return entry;
1663 }
1664
1665 self.optionally_insert_with_hash_and_fun(key, hash, init, need_key)
1666 .await
1667 }
1668
1669 pub(crate) async fn get_or_optionally_insert_with_hash_by_ref_and_fun<F, Q>(
1670 &self,
1671 key: &Q,
1672 hash: u64,
1673 init: Pin<&mut F>,
1674 need_key: bool,
1675 ) -> Option<Entry<K, V>>
1676 where
1677 F: Future<Output = Option<V>>,
1678 Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
1679 {
1680 let entry = self
1681 .base
1682 .get_with_hash(key, hash, never_ignore(), need_key, true)
1683 .await;
1684 if entry.is_some() {
1685 return entry;
1686 }
1687
1688 let key = Arc::new(key.to_owned());
1689 self.optionally_insert_with_hash_and_fun(key, hash, init, need_key)
1690 .await
1691 }
1692
1693 async fn optionally_insert_with_hash_and_fun<F>(
1694 &self,
1695 key: Arc<K>,
1696 hash: u64,
1697 init: Pin<&mut F>,
1698 need_key: bool,
1699 ) -> Option<Entry<K, V>>
1700 where
1701 F: Future<Output = Option<V>>,
1702 {
1703 let k = if need_key {
1704 Some(Arc::clone(&key))
1705 } else {
1706 None
1707 };
1708
1709 let type_id = ValueInitializer::<K, V, S>::type_id_for_optionally_get_with();
1710 let post_init = ValueInitializer::<K, V, S>::post_init_for_optionally_get_with;
1711
1712 match self
1713 .value_initializer
1714 .try_init_or_read(&key, hash, type_id, self, never_ignore(), init, post_init)
1715 .await
1716 {
1717 InitResult::Initialized(v) => {
1718 crossbeam_epoch::pin().flush();
1719 Some(Entry::new(k, v, true, false))
1720 }
1721 InitResult::ReadExisting(v) => Some(Entry::new(k, v, false, false)),
1722 InitResult::InitErr(_) => None,
1723 }
1724 }
1725
1726 pub(super) async fn get_or_try_insert_with_hash_and_fun<F, E>(
1727 &self,
1728 key: Arc<K>,
1729 hash: u64,
1730 init: Pin<&mut F>,
1731 need_key: bool,
1732 ) -> Result<Entry<K, V>, Arc<E>>
1733 where
1734 F: Future<Output = Result<V, E>>,
1735 E: Send + Sync + 'static,
1736 {
1737 if let Some(entry) = self
1738 .base
1739 .get_with_hash(&*key, hash, never_ignore(), need_key, true)
1740 .await
1741 {
1742 return Ok(entry);
1743 }
1744
1745 self.try_insert_with_hash_and_fun(key, hash, init, need_key)
1746 .await
1747 }
1748
1749 pub(super) async fn get_or_try_insert_with_hash_by_ref_and_fun<F, E, Q>(
1750 &self,
1751 key: &Q,
1752 hash: u64,
1753 init: Pin<&mut F>,
1754 need_key: bool,
1755 ) -> Result<Entry<K, V>, Arc<E>>
1756 where
1757 F: Future<Output = Result<V, E>>,
1758 E: Send + Sync + 'static,
1759 Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
1760 {
1761 if let Some(entry) = self
1762 .base
1763 .get_with_hash(key, hash, never_ignore(), need_key, true)
1764 .await
1765 {
1766 return Ok(entry);
1767 }
1768 let key = Arc::new(key.to_owned());
1769 self.try_insert_with_hash_and_fun(key, hash, init, need_key)
1770 .await
1771 }
1772
1773 async fn try_insert_with_hash_and_fun<F, E>(
1774 &self,
1775 key: Arc<K>,
1776 hash: u64,
1777 init: Pin<&mut F>,
1778 need_key: bool,
1779 ) -> Result<Entry<K, V>, Arc<E>>
1780 where
1781 F: Future<Output = Result<V, E>>,
1782 E: Send + Sync + 'static,
1783 {
1784 let k = if need_key {
1785 Some(Arc::clone(&key))
1786 } else {
1787 None
1788 };
1789
1790 let type_id = ValueInitializer::<K, V, S>::type_id_for_try_get_with::<E>();
1791 let post_init = ValueInitializer::<K, V, S>::post_init_for_try_get_with;
1792
1793 match self
1794 .value_initializer
1795 .try_init_or_read(&key, hash, type_id, self, never_ignore(), init, post_init)
1796 .await
1797 {
1798 InitResult::Initialized(v) => {
1799 crossbeam_epoch::pin().flush();
1800 Ok(Entry::new(k, v, true, false))
1801 }
1802 InitResult::ReadExisting(v) => Ok(Entry::new(k, v, false, false)),
1803 InitResult::InitErr(e) => {
1804 crossbeam_epoch::pin().flush();
1805 Err(e)
1806 }
1807 }
1808 }
1809
1810 pub(crate) async fn insert_with_hash(&self, key: Arc<K>, hash: u64, value: V) {
1811 if self.base.is_map_disabled() {
1812 return;
1813 }
1814
1815 let (op, ts) = self.base.do_insert_with_hash(key, hash, value).await;
1816 let mut cancel_guard = CancelGuard::new(&self.base.interrupted_op_ch_snd, ts);
1817 cancel_guard.set_op(op.clone());
1818
1819 let should_block;
1820 #[cfg(not(test))]
1821 {
1822 should_block = false;
1823 }
1824 #[cfg(test)]
1825 {
1826 should_block = self.schedule_write_op_should_block.load(Ordering::Acquire);
1827 }
1828
1829 let hk = self.base.housekeeper.as_ref();
1830 let event = self.base.write_op_ch_ready_event();
1831
1832 BaseCache::<K, V, S>::schedule_write_op(
1833 &self.base.inner,
1834 &self.base.write_op_ch,
1835 event,
1836 op,
1837 ts,
1838 hk,
1839 should_block,
1840 )
1841 .await
1842 .expect("Failed to schedule write op for insert");
1843 cancel_guard.clear();
1844 }
1845
1846 pub(crate) async fn compute_with_hash_and_fun<F, Fut>(
1847 &self,
1848 key: Arc<K>,
1849 hash: u64,
1850 f: F,
1851 ) -> compute::CompResult<K, V>
1852 where
1853 F: FnOnce(Option<Entry<K, V>>) -> Fut,
1854 Fut: Future<Output = compute::Op<V>>,
1855 {
1856 let post_init = ValueInitializer::<K, V, S>::post_init_for_compute_with;
1857 match self
1858 .value_initializer
1859 .try_compute(key, hash, self, f, post_init, true)
1860 .await
1861 {
1862 Ok(result) => result,
1863 Err(_) => unreachable!(),
1864 }
1865 }
1866
1867 pub(crate) async fn try_compute_with_hash_and_fun<F, Fut, E>(
1868 &self,
1869 key: Arc<K>,
1870 hash: u64,
1871 f: F,
1872 ) -> Result<compute::CompResult<K, V>, E>
1873 where
1874 F: FnOnce(Option<Entry<K, V>>) -> Fut,
1875 Fut: Future<Output = Result<compute::Op<V>, E>>,
1876 E: Send + Sync + 'static,
1877 {
1878 let post_init = ValueInitializer::<K, V, S>::post_init_for_try_compute_with;
1879 self.value_initializer
1880 .try_compute(key, hash, self, f, post_init, true)
1881 .await
1882 }
1883
1884 pub(crate) async fn try_compute_if_nobody_else_with_hash_and_fun<F, Fut, E>(
1885 &self,
1886 key: Arc<K>,
1887 hash: u64,
1888 f: F,
1889 ) -> Result<compute::CompResult<K, V>, E>
1890 where
1891 F: FnOnce(Option<Entry<K, V>>) -> Fut,
1892 Fut: Future<Output = Result<compute::Op<V>, E>>,
1893 E: Send + Sync + 'static,
1894 {
1895 let post_init = ValueInitializer::<K, V, S>::post_init_for_try_compute_with_if_nobody_else;
1896 self.value_initializer
1897 .try_compute_if_nobody_else(key, hash, self, f, post_init, true)
1898 .await
1899 }
1900
1901 pub(crate) async fn upsert_with_hash_and_fun<F, Fut>(
1902 &self,
1903 key: Arc<K>,
1904 hash: u64,
1905 f: F,
1906 ) -> Entry<K, V>
1907 where
1908 F: FnOnce(Option<Entry<K, V>>) -> Fut,
1909 Fut: Future<Output = V>,
1910 {
1911 let post_init = ValueInitializer::<K, V, S>::post_init_for_upsert_with;
1912 match self
1913 .value_initializer
1914 .try_compute(key, hash, self, f, post_init, false)
1915 .await
1916 {
1917 Ok(CompResult::Inserted(entry) | CompResult::ReplacedWith(entry)) => entry,
1918 _ => unreachable!(),
1919 }
1920 }
1921
1922 pub(crate) async fn invalidate_with_hash<Q>(
1923 &self,
1924 key: &Q,
1925 hash: u64,
1926 need_value: bool,
1927 ) -> Option<V>
1928 where
1929 Q: Equivalent<K> + Hash + ?Sized,
1930 {
1931 use futures_util::FutureExt;
1932
1933 self.base.retry_interrupted_ops().await;
1934
1935 // Lock the key for removal if blocking removal notification is enabled.
1936 let mut kl = None;
1937 let mut klg = None;
1938 if self.base.is_removal_notifier_enabled() {
1939 // To lock the key, we have to get Arc<K> for key (&Q).
1940 //
1941 // TODO: Enhance this if possible. This is rather hack now because
1942 // it cannot prevent race conditions like this:
1943 //
1944 // 1. We miss the key because it does not exist. So we do not lock
1945 // the key.
1946 // 2. Somebody else (other thread) inserts the key.
1947 // 3. We remove the entry for the key, but without the key lock!
1948 //
1949 if let Some(arc_key) = self.base.get_key_with_hash(key, hash) {
1950 kl = self.base.maybe_key_lock(&arc_key);
1951 klg = if let Some(lock) = &kl {
1952 Some(lock.lock().await)
1953 } else {
1954 None
1955 };
1956 }
1957 }
1958
1959 match self.base.remove_entry(key, hash) {
1960 None => None,
1961 Some(kv) => {
1962 let now = self.base.current_time();
1963
1964 let maybe_v = if need_value {
1965 Some(kv.entry.value.clone())
1966 } else {
1967 None
1968 };
1969
1970 let info = kv.entry.entry_info();
1971 let entry_gen = info.incr_entry_gen();
1972
1973 let op: WriteOp<K, V> = WriteOp::Remove {
1974 kv_entry: kv.clone(),
1975 entry_gen,
1976 };
1977
1978 // Async Cancellation Safety: To ensure the below future should be
1979 // executed even if our caller async task is cancelled, we create a
1980 // cancel guard for the future (and the op). If our caller is
1981 // cancelled while we are awaiting for the future, the cancel guard
1982 // will save the future and the op to the interrupted_op_ch channel,
1983 // so that we can resume/retry later.
1984 let mut cancel_guard = CancelGuard::new(&self.base.interrupted_op_ch_snd, now);
1985
1986 if self.base.is_removal_notifier_enabled() {
1987 let future = self
1988 .base
1989 .notify_invalidate(&kv.key, &kv.entry)
1990 .boxed()
1991 .shared();
1992 cancel_guard.set_future_and_op(future.clone(), op.clone());
1993 // Send notification to the eviction listener.
1994 future.await;
1995 cancel_guard.unset_future();
1996 } else {
1997 cancel_guard.set_op(op.clone());
1998 }
1999
2000 // Drop the locks before scheduling write op to avoid a potential
2001 // dead lock. (Scheduling write can do spin lock when the queue is
2002 // full, and queue will be drained by the housekeeping thread that
2003 // can lock the same key)
2004 std::mem::drop(klg);
2005 std::mem::drop(kl);
2006
2007 let should_block;
2008 #[cfg(not(test))]
2009 {
2010 should_block = false;
2011 }
2012 #[cfg(test)]
2013 {
2014 should_block = self.schedule_write_op_should_block.load(Ordering::Acquire);
2015 }
2016
2017 let event = self.base.write_op_ch_ready_event();
2018 let hk = self.base.housekeeper.as_ref();
2019
2020 BaseCache::<K, V, S>::schedule_write_op(
2021 &self.base.inner,
2022 &self.base.write_op_ch,
2023 event,
2024 op,
2025 now,
2026 hk,
2027 should_block,
2028 )
2029 .await
2030 .expect("Failed to schedule write op for remove");
2031 cancel_guard.clear();
2032
2033 crossbeam_epoch::pin().flush();
2034 maybe_v
2035 }
2036 }
2037 }
2038}
2039
2040// For unit tests.
2041// For unit tests.
2042#[cfg(test)]
2043impl<K, V, S> Cache<K, V, S> {
2044 pub(crate) fn is_table_empty(&self) -> bool {
2045 self.entry_count() == 0
2046 }
2047
2048 pub(crate) fn is_waiter_map_empty(&self) -> bool {
2049 self.value_initializer.waiter_count() == 0
2050 }
2051}
2052
2053#[cfg(test)]
2054impl<K, V, S> Cache<K, V, S>
2055where
2056 K: Hash + Eq + Send + Sync + 'static,
2057 V: Clone + Send + Sync + 'static,
2058 S: BuildHasher + Clone + Send + Sync + 'static,
2059{
2060 fn invalidation_predicate_count(&self) -> usize {
2061 self.base.invalidation_predicate_count()
2062 }
2063
2064 async fn reconfigure_for_testing(&mut self) {
2065 self.base.reconfigure_for_testing().await;
2066 }
2067
2068 fn key_locks_map_is_empty(&self) -> bool {
2069 self.base.key_locks_map_is_empty()
2070 }
2071
2072 fn run_pending_tasks_initiation_count(&self) -> usize {
2073 self.base
2074 .housekeeper
2075 .as_ref()
2076 .map(|hk| hk.start_count.load(Ordering::Acquire))
2077 .expect("housekeeper is not set")
2078 }
2079
2080 fn run_pending_tasks_completion_count(&self) -> usize {
2081 self.base
2082 .housekeeper
2083 .as_ref()
2084 .map(|hk| hk.complete_count.load(Ordering::Acquire))
2085 .expect("housekeeper is not set")
2086 }
2087}
2088
2089// AS of Rust 1.71, we cannot make this function into a `const fn` because mutable
2090// references are not allowed.
2091// See [#57349](https://github.com/rust-lang/rust/issues/57349).
2092#[inline]
2093fn never_ignore<'a, V>() -> Option<&'a mut fn(&V) -> bool> {
2094 None
2095}
2096
2097// To see the debug prints, run test as `cargo test -- --nocapture`
2098#[cfg(test)]
2099mod tests {
2100 use super::Cache;
2101 use crate::{
2102 common::{time::Clock, HousekeeperConfig},
2103 future::FutureExt,
2104 notification::{ListenerFuture, RemovalCause},
2105 ops::compute,
2106 policy::{test_utils::ExpiryCallCounters, EvictionPolicy},
2107 Expiry,
2108 };
2109
2110 use async_lock::{Barrier, Mutex};
2111 use std::{
2112 convert::Infallible,
2113 sync::{
2114 atomic::{AtomicU32, AtomicU8, Ordering},
2115 Arc,
2116 },
2117 time::{Duration, Instant as StdInstant},
2118 vec,
2119 };
2120 use tokio::time::sleep;
2121
2122 #[test]
2123 fn futures_are_send() {
2124 let cache = Cache::new(0);
2125
2126 fn is_send(_: impl Send) {}
2127
2128 // pub fns
2129 is_send(cache.get(&()));
2130 is_send(cache.get_with((), async {}));
2131 is_send(cache.get_with_by_ref(&(), async {}));
2132 #[allow(deprecated)]
2133 is_send(cache.get_with_if((), async {}, |_| false));
2134 is_send(cache.insert((), ()));
2135 is_send(cache.invalidate(&()));
2136 is_send(cache.optionally_get_with((), async { None }));
2137 is_send(cache.optionally_get_with_by_ref(&(), async { None }));
2138 is_send(cache.remove(&()));
2139 is_send(cache.run_pending_tasks());
2140 is_send(cache.try_get_with((), async { Err(()) }));
2141 is_send(cache.try_get_with_by_ref(&(), async { Err(()) }));
2142
2143 // entry fns
2144 is_send(
2145 cache
2146 .entry(())
2147 .and_compute_with(|_| async { compute::Op::Nop }),
2148 );
2149 is_send(
2150 cache
2151 .entry(())
2152 .and_try_compute_with(|_| async { Ok(compute::Op::Nop) as Result<_, Infallible> }),
2153 );
2154 is_send(cache.entry(()).and_upsert_with(|_| async {}));
2155 is_send(cache.entry(()).or_default());
2156 is_send(cache.entry(()).or_insert(()));
2157 is_send(cache.entry(()).or_insert_with(async {}));
2158 is_send(cache.entry(()).or_insert_with_if(async {}, |_| false));
2159 is_send(cache.entry(()).or_optionally_insert_with(async { None }));
2160 is_send(cache.entry(()).or_try_insert_with(async { Err(()) }));
2161
2162 // entry_by_ref fns
2163 is_send(
2164 cache
2165 .entry_by_ref(&())
2166 .and_compute_with(|_| async { compute::Op::Nop }),
2167 );
2168 is_send(
2169 cache
2170 .entry_by_ref(&())
2171 .and_try_compute_with(|_| async { Ok(compute::Op::Nop) as Result<_, Infallible> }),
2172 );
2173 is_send(cache.entry_by_ref(&()).and_upsert_with(|_| async {}));
2174 is_send(cache.entry_by_ref(&()).or_default());
2175 is_send(cache.entry_by_ref(&()).or_insert(()));
2176 is_send(cache.entry_by_ref(&()).or_insert_with(async {}));
2177 is_send(
2178 cache
2179 .entry_by_ref(&())
2180 .or_insert_with_if(async {}, |_| false),
2181 );
2182 is_send(
2183 cache
2184 .entry_by_ref(&())
2185 .or_optionally_insert_with(async { None }),
2186 );
2187 is_send(
2188 cache
2189 .entry_by_ref(&())
2190 .or_try_insert_with(async { Err(()) }),
2191 );
2192 }
2193
2194 #[tokio::test]
2195 async fn max_capacity_zero() {
2196 let mut cache = Cache::new(0);
2197 cache.reconfigure_for_testing().await;
2198
2199 // Make the cache exterior immutable.
2200 let cache = cache;
2201
2202 cache.insert(0, ()).await;
2203
2204 assert!(!cache.contains_key(&0));
2205 assert!(cache.get(&0).await.is_none());
2206 cache.run_pending_tasks().await;
2207 assert!(!cache.contains_key(&0));
2208 assert!(cache.get(&0).await.is_none());
2209 assert_eq!(cache.entry_count(), 0)
2210 }
2211
2212 #[tokio::test]
2213 async fn basic_single_async_task() {
2214 // The following `Vec`s will hold actual and expected notifications.
2215 let actual = Arc::new(Mutex::new(Vec::new()));
2216 let mut expected = Vec::new();
2217
2218 // Create an eviction listener.
2219 let a1 = Arc::clone(&actual);
2220 let listener = move |k, v, cause| -> ListenerFuture {
2221 let a2 = Arc::clone(&a1);
2222 async move {
2223 a2.lock().await.push((k, v, cause));
2224 }
2225 .boxed()
2226 };
2227
2228 // Create a cache with the eviction listener.
2229 let mut cache = Cache::builder()
2230 .max_capacity(3)
2231 .async_eviction_listener(listener)
2232 .build();
2233 cache.reconfigure_for_testing().await;
2234
2235 // Make the cache exterior immutable.
2236 let cache = cache;
2237
2238 cache.insert("a", "alice").await;
2239 cache.insert("b", "bob").await;
2240 assert_eq!(cache.get(&"a").await, Some("alice"));
2241 assert!(cache.contains_key(&"a"));
2242 assert!(cache.contains_key(&"b"));
2243 assert_eq!(cache.get(&"b").await, Some("bob"));
2244 cache.run_pending_tasks().await;
2245 // counts: a -> 1, b -> 1
2246
2247 cache.insert("c", "cindy").await;
2248 assert_eq!(cache.get(&"c").await, Some("cindy"));
2249 assert!(cache.contains_key(&"c"));
2250 // counts: a -> 1, b -> 1, c -> 1
2251 cache.run_pending_tasks().await;
2252
2253 assert!(cache.contains_key(&"a"));
2254 assert_eq!(cache.get(&"a").await, Some("alice"));
2255 assert_eq!(cache.get(&"b").await, Some("bob"));
2256 assert!(cache.contains_key(&"b"));
2257 cache.run_pending_tasks().await;
2258 // counts: a -> 2, b -> 2, c -> 1
2259
2260 // "d" should not be admitted because its frequency is too low.
2261 cache.insert("d", "david").await; // count: d -> 0
2262 expected.push((Arc::new("d"), "david", RemovalCause::Size));
2263 cache.run_pending_tasks().await;
2264 assert_eq!(cache.get(&"d").await, None); // d -> 1
2265 assert!(!cache.contains_key(&"d"));
2266
2267 cache.insert("d", "david").await;
2268 expected.push((Arc::new("d"), "david", RemovalCause::Size));
2269 cache.run_pending_tasks().await;
2270 assert!(!cache.contains_key(&"d"));
2271 assert_eq!(cache.get(&"d").await, None); // d -> 2
2272
2273 // "d" should be admitted and "c" should be evicted
2274 // because d's frequency is higher than c's.
2275 cache.insert("d", "dennis").await;
2276 expected.push((Arc::new("c"), "cindy", RemovalCause::Size));
2277 cache.run_pending_tasks().await;
2278 assert_eq!(cache.get(&"a").await, Some("alice"));
2279 assert_eq!(cache.get(&"b").await, Some("bob"));
2280 assert_eq!(cache.get(&"c").await, None);
2281 assert_eq!(cache.get(&"d").await, Some("dennis"));
2282 assert!(cache.contains_key(&"a"));
2283 assert!(cache.contains_key(&"b"));
2284 assert!(!cache.contains_key(&"c"));
2285 assert!(cache.contains_key(&"d"));
2286
2287 cache.invalidate(&"b").await;
2288 expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
2289 cache.run_pending_tasks().await;
2290 assert_eq!(cache.get(&"b").await, None);
2291 assert!(!cache.contains_key(&"b"));
2292
2293 assert!(cache.remove(&"b").await.is_none());
2294 assert_eq!(cache.remove(&"d").await, Some("dennis"));
2295 expected.push((Arc::new("d"), "dennis", RemovalCause::Explicit));
2296 cache.run_pending_tasks().await;
2297 assert_eq!(cache.get(&"d").await, None);
2298 assert!(!cache.contains_key(&"d"));
2299
2300 verify_notification_vec(&cache, actual, &expected).await;
2301 assert!(cache.key_locks_map_is_empty());
2302 }
2303
2304 #[tokio::test]
2305 async fn basic_lru_single_thread() {
2306 // The following `Vec`s will hold actual and expected notifications.
2307 let actual = Arc::new(Mutex::new(Vec::new()));
2308 let mut expected = Vec::new();
2309
2310 // Create an eviction listener.
2311 let a1 = Arc::clone(&actual);
2312 let listener = move |k, v, cause| -> ListenerFuture {
2313 let a2 = Arc::clone(&a1);
2314 async move {
2315 a2.lock().await.push((k, v, cause));
2316 }
2317 .boxed()
2318 };
2319
2320 // Create a cache with the eviction listener.
2321 let mut cache = Cache::builder()
2322 .max_capacity(3)
2323 .eviction_policy(EvictionPolicy::lru())
2324 .async_eviction_listener(listener)
2325 .build();
2326 cache.reconfigure_for_testing().await;
2327
2328 // Make the cache exterior immutable.
2329 let cache = cache;
2330
2331 cache.insert("a", "alice").await;
2332 cache.insert("b", "bob").await;
2333 assert_eq!(cache.get(&"a").await, Some("alice"));
2334 assert!(cache.contains_key(&"a"));
2335 assert!(cache.contains_key(&"b"));
2336 assert_eq!(cache.get(&"b").await, Some("bob"));
2337 cache.run_pending_tasks().await;
2338 // a -> b
2339
2340 cache.insert("c", "cindy").await;
2341 assert_eq!(cache.get(&"c").await, Some("cindy"));
2342 assert!(cache.contains_key(&"c"));
2343 cache.run_pending_tasks().await;
2344 // a -> b -> c
2345
2346 assert!(cache.contains_key(&"a"));
2347 assert_eq!(cache.get(&"a").await, Some("alice"));
2348 assert_eq!(cache.get(&"b").await, Some("bob"));
2349 assert!(cache.contains_key(&"b"));
2350 cache.run_pending_tasks().await;
2351 // c -> a -> b
2352
2353 // "d" should be admitted because the cache uses the LRU strategy.
2354 cache.insert("d", "david").await;
2355 // "c" is the LRU and should have be evicted.
2356 expected.push((Arc::new("c"), "cindy", RemovalCause::Size));
2357 cache.run_pending_tasks().await;
2358
2359 assert_eq!(cache.get(&"a").await, Some("alice"));
2360 assert_eq!(cache.get(&"b").await, Some("bob"));
2361 assert_eq!(cache.get(&"c").await, None);
2362 assert_eq!(cache.get(&"d").await, Some("david"));
2363 assert!(cache.contains_key(&"a"));
2364 assert!(cache.contains_key(&"b"));
2365 assert!(!cache.contains_key(&"c"));
2366 assert!(cache.contains_key(&"d"));
2367 cache.run_pending_tasks().await;
2368 // a -> b -> d
2369
2370 cache.invalidate(&"b").await;
2371 expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
2372 cache.run_pending_tasks().await;
2373 // a -> d
2374 assert_eq!(cache.get(&"b").await, None);
2375 assert!(!cache.contains_key(&"b"));
2376
2377 assert!(cache.remove(&"b").await.is_none());
2378 assert_eq!(cache.remove(&"d").await, Some("david"));
2379 expected.push((Arc::new("d"), "david", RemovalCause::Explicit));
2380 cache.run_pending_tasks().await;
2381 // a
2382 assert_eq!(cache.get(&"d").await, None);
2383 assert!(!cache.contains_key(&"d"));
2384
2385 cache.insert("e", "emily").await;
2386 cache.insert("f", "frank").await;
2387 // "a" should be evicted because it is the LRU.
2388 cache.insert("g", "gina").await;
2389 expected.push((Arc::new("a"), "alice", RemovalCause::Size));
2390 cache.run_pending_tasks().await;
2391 // e -> f -> g
2392 assert_eq!(cache.get(&"a").await, None);
2393 assert_eq!(cache.get(&"e").await, Some("emily"));
2394 assert_eq!(cache.get(&"f").await, Some("frank"));
2395 assert_eq!(cache.get(&"g").await, Some("gina"));
2396 assert!(!cache.contains_key(&"a"));
2397 assert!(cache.contains_key(&"e"));
2398 assert!(cache.contains_key(&"f"));
2399 assert!(cache.contains_key(&"g"));
2400
2401 verify_notification_vec(&cache, actual, &expected).await;
2402 assert!(cache.key_locks_map_is_empty());
2403 }
2404
2405 #[tokio::test]
2406 async fn size_aware_eviction() {
2407 let weigher = |_k: &&str, v: &(&str, u32)| v.1;
2408
2409 let alice = ("alice", 10);
2410 let bob = ("bob", 15);
2411 let bill = ("bill", 20);
2412 let cindy = ("cindy", 5);
2413 let david = ("david", 15);
2414 let dennis = ("dennis", 15);
2415
2416 // The following `Vec`s will hold actual and expected notifications.
2417 let actual = Arc::new(Mutex::new(Vec::new()));
2418 let mut expected = Vec::new();
2419
2420 // Create an eviction listener.
2421 let a1 = Arc::clone(&actual);
2422 let listener = move |k, v, cause| -> ListenerFuture {
2423 let a2 = Arc::clone(&a1);
2424 async move {
2425 a2.lock().await.push((k, v, cause));
2426 }
2427 .boxed()
2428 };
2429
2430 // Create a cache with the eviction listener.
2431 let mut cache = Cache::builder()
2432 .max_capacity(31)
2433 .weigher(weigher)
2434 .async_eviction_listener(listener)
2435 .build();
2436 cache.reconfigure_for_testing().await;
2437
2438 // Make the cache exterior immutable.
2439 let cache = cache;
2440
2441 cache.insert("a", alice).await;
2442 cache.insert("b", bob).await;
2443 assert_eq!(cache.get(&"a").await, Some(alice));
2444 assert!(cache.contains_key(&"a"));
2445 assert!(cache.contains_key(&"b"));
2446 assert_eq!(cache.get(&"b").await, Some(bob));
2447 cache.run_pending_tasks().await;
2448 // order (LRU -> MRU) and counts: a -> 1, b -> 1
2449
2450 cache.insert("c", cindy).await;
2451 assert_eq!(cache.get(&"c").await, Some(cindy));
2452 assert!(cache.contains_key(&"c"));
2453 // order and counts: a -> 1, b -> 1, c -> 1
2454 cache.run_pending_tasks().await;
2455
2456 assert!(cache.contains_key(&"a"));
2457 assert_eq!(cache.get(&"a").await, Some(alice));
2458 assert_eq!(cache.get(&"b").await, Some(bob));
2459 assert!(cache.contains_key(&"b"));
2460 cache.run_pending_tasks().await;
2461 // order and counts: c -> 1, a -> 2, b -> 2
2462
2463 // To enter "d" (weight: 15), it needs to evict "c" (w: 5) and "a" (w: 10).
2464 // "d" must have higher count than 3, which is the aggregated count
2465 // of "a" and "c".
2466 cache.insert("d", david).await; // count: d -> 0
2467 expected.push((Arc::new("d"), david, RemovalCause::Size));
2468 cache.run_pending_tasks().await;
2469 assert_eq!(cache.get(&"d").await, None); // d -> 1
2470 assert!(!cache.contains_key(&"d"));
2471
2472 cache.insert("d", david).await;
2473 expected.push((Arc::new("d"), david, RemovalCause::Size));
2474 cache.run_pending_tasks().await;
2475 assert!(!cache.contains_key(&"d"));
2476 assert_eq!(cache.get(&"d").await, None); // d -> 2
2477
2478 cache.insert("d", david).await;
2479 expected.push((Arc::new("d"), david, RemovalCause::Size));
2480 cache.run_pending_tasks().await;
2481 assert_eq!(cache.get(&"d").await, None); // d -> 3
2482 assert!(!cache.contains_key(&"d"));
2483
2484 cache.insert("d", david).await;
2485 expected.push((Arc::new("d"), david, RemovalCause::Size));
2486 cache.run_pending_tasks().await;
2487 assert!(!cache.contains_key(&"d"));
2488 assert_eq!(cache.get(&"d").await, None); // d -> 4
2489
2490 // Finally "d" should be admitted by evicting "c" and "a".
2491 cache.insert("d", dennis).await;
2492 expected.push((Arc::new("c"), cindy, RemovalCause::Size));
2493 expected.push((Arc::new("a"), alice, RemovalCause::Size));
2494 cache.run_pending_tasks().await;
2495 assert_eq!(cache.get(&"a").await, None);
2496 assert_eq!(cache.get(&"b").await, Some(bob));
2497 assert_eq!(cache.get(&"c").await, None);
2498 assert_eq!(cache.get(&"d").await, Some(dennis));
2499 assert!(!cache.contains_key(&"a"));
2500 assert!(cache.contains_key(&"b"));
2501 assert!(!cache.contains_key(&"c"));
2502 assert!(cache.contains_key(&"d"));
2503
2504 // Update "b" with "bill" (w: 15 -> 20). This should evict "d" (w: 15).
2505 cache.insert("b", bill).await;
2506 expected.push((Arc::new("b"), bob, RemovalCause::Replaced));
2507 expected.push((Arc::new("d"), dennis, RemovalCause::Size));
2508 cache.run_pending_tasks().await;
2509 assert_eq!(cache.get(&"b").await, Some(bill));
2510 assert_eq!(cache.get(&"d").await, None);
2511 assert!(cache.contains_key(&"b"));
2512 assert!(!cache.contains_key(&"d"));
2513
2514 // Re-add "a" (w: 10) and update "b" with "bob" (w: 20 -> 15).
2515 cache.insert("a", alice).await;
2516 cache.insert("b", bob).await;
2517 expected.push((Arc::new("b"), bill, RemovalCause::Replaced));
2518 cache.run_pending_tasks().await;
2519 assert_eq!(cache.get(&"a").await, Some(alice));
2520 assert_eq!(cache.get(&"b").await, Some(bob));
2521 assert_eq!(cache.get(&"d").await, None);
2522 assert!(cache.contains_key(&"a"));
2523 assert!(cache.contains_key(&"b"));
2524 assert!(!cache.contains_key(&"d"));
2525
2526 // Verify the sizes.
2527 assert_eq!(cache.entry_count(), 2);
2528 assert_eq!(cache.weighted_size(), 25);
2529
2530 verify_notification_vec(&cache, actual, &expected).await;
2531 assert!(cache.key_locks_map_is_empty());
2532 }
2533
2534 #[tokio::test]
2535 async fn basic_multi_async_tasks() {
2536 let num_tasks = 2;
2537 let num_threads = 2;
2538
2539 let cache = Cache::new(100);
2540 let barrier = Arc::new(Barrier::new(num_tasks + num_threads as usize));
2541
2542 let tasks = (0..num_tasks)
2543 .map(|id| {
2544 let cache = cache.clone();
2545 let barrier = Arc::clone(&barrier);
2546
2547 tokio::spawn(async move {
2548 barrier.wait().await;
2549
2550 cache.insert(10, format!("{id}-100")).await;
2551 cache.get(&10).await;
2552 cache.insert(20, format!("{id}-200")).await;
2553 cache.invalidate(&10).await;
2554 })
2555 })
2556 .collect::<Vec<_>>();
2557
2558 let threads = (0..num_threads)
2559 .map(|id| {
2560 let cache = cache.clone();
2561 let barrier = Arc::clone(&barrier);
2562 let rt = tokio::runtime::Handle::current();
2563
2564 std::thread::spawn(move || {
2565 rt.block_on(barrier.wait());
2566
2567 rt.block_on(cache.insert(10, format!("{id}-100")));
2568 rt.block_on(cache.get(&10));
2569 rt.block_on(cache.insert(20, format!("{id}-200")));
2570 rt.block_on(cache.invalidate(&10));
2571 })
2572 })
2573 .collect::<Vec<_>>();
2574
2575 let _ = futures_util::future::join_all(tasks).await;
2576 threads.into_iter().for_each(|t| t.join().unwrap());
2577
2578 assert!(cache.get(&10).await.is_none());
2579 assert!(cache.get(&20).await.is_some());
2580 assert!(!cache.contains_key(&10));
2581 assert!(cache.contains_key(&20));
2582 }
2583
2584 #[tokio::test]
2585 async fn invalidate_all() {
2586 // The following `Vec`s will hold actual and expected notifications.
2587 let actual = Arc::new(Mutex::new(Vec::new()));
2588 let mut expected = Vec::new();
2589
2590 // Create an eviction listener.
2591 let a1 = Arc::clone(&actual);
2592 let listener = move |k, v, cause| -> ListenerFuture {
2593 let a2 = Arc::clone(&a1);
2594 async move {
2595 a2.lock().await.push((k, v, cause));
2596 }
2597 .boxed()
2598 };
2599
2600 // Create a cache with the eviction listener.
2601 let mut cache = Cache::builder()
2602 .max_capacity(100)
2603 .async_eviction_listener(listener)
2604 .build();
2605 cache.reconfigure_for_testing().await;
2606
2607 // Make the cache exterior immutable.
2608 let cache = cache;
2609
2610 cache.insert("a", "alice").await;
2611 cache.insert("b", "bob").await;
2612 cache.insert("c", "cindy").await;
2613 assert_eq!(cache.get(&"a").await, Some("alice"));
2614 assert_eq!(cache.get(&"b").await, Some("bob"));
2615 assert_eq!(cache.get(&"c").await, Some("cindy"));
2616 assert!(cache.contains_key(&"a"));
2617 assert!(cache.contains_key(&"b"));
2618 assert!(cache.contains_key(&"c"));
2619
2620 // `cache.run_pending_tasks().await` is no longer needed here before invalidating. The last
2621 // modified timestamp of the entries were updated when they were inserted.
2622 // https://github.com/moka-rs/moka/issues/155
2623
2624 cache.invalidate_all();
2625 expected.push((Arc::new("a"), "alice", RemovalCause::Explicit));
2626 expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
2627 expected.push((Arc::new("c"), "cindy", RemovalCause::Explicit));
2628 cache.run_pending_tasks().await;
2629
2630 cache.insert("d", "david").await;
2631 cache.run_pending_tasks().await;
2632
2633 assert!(cache.get(&"a").await.is_none());
2634 assert!(cache.get(&"b").await.is_none());
2635 assert!(cache.get(&"c").await.is_none());
2636 assert_eq!(cache.get(&"d").await, Some("david"));
2637 assert!(!cache.contains_key(&"a"));
2638 assert!(!cache.contains_key(&"b"));
2639 assert!(!cache.contains_key(&"c"));
2640 assert!(cache.contains_key(&"d"));
2641
2642 verify_notification_vec(&cache, actual, &expected).await;
2643 }
2644
2645 // This test is for https://github.com/moka-rs/moka/issues/155
2646 #[tokio::test]
2647 async fn invalidate_all_without_running_pending_tasks() {
2648 let cache = Cache::new(1024);
2649
2650 assert_eq!(cache.get(&0).await, None);
2651 cache.insert(0, 1).await;
2652 assert_eq!(cache.get(&0).await, Some(1));
2653
2654 cache.invalidate_all();
2655 assert_eq!(cache.get(&0).await, None);
2656 }
2657
2658 #[tokio::test]
2659 async fn invalidate_entries_if() -> Result<(), Box<dyn std::error::Error>> {
2660 use std::collections::HashSet;
2661
2662 // The following `Vec`s will hold actual and expected notifications.
2663 let actual = Arc::new(Mutex::new(Vec::new()));
2664 let mut expected = Vec::new();
2665
2666 // Create an eviction listener.
2667 let a1 = Arc::clone(&actual);
2668 let listener = move |k, v, cause| -> ListenerFuture {
2669 let a2 = Arc::clone(&a1);
2670 async move {
2671 a2.lock().await.push((k, v, cause));
2672 }
2673 .boxed()
2674 };
2675
2676 let (clock, mock) = Clock::mock();
2677
2678 // Create a cache with the eviction listener.
2679 let mut cache = Cache::builder()
2680 .max_capacity(100)
2681 .support_invalidation_closures()
2682 .async_eviction_listener(listener)
2683 .clock(clock)
2684 .build();
2685 cache.reconfigure_for_testing().await;
2686
2687 // Make the cache exterior immutable.
2688 let cache = cache;
2689
2690 cache.insert(0, "alice").await;
2691 cache.insert(1, "bob").await;
2692 cache.insert(2, "alex").await;
2693 cache.run_pending_tasks().await;
2694
2695 mock.increment(Duration::from_secs(5)); // 5 secs from the start.
2696 cache.run_pending_tasks().await;
2697
2698 assert_eq!(cache.get(&0).await, Some("alice"));
2699 assert_eq!(cache.get(&1).await, Some("bob"));
2700 assert_eq!(cache.get(&2).await, Some("alex"));
2701 assert!(cache.contains_key(&0));
2702 assert!(cache.contains_key(&1));
2703 assert!(cache.contains_key(&2));
2704
2705 let names = ["alice", "alex"].iter().cloned().collect::<HashSet<_>>();
2706 cache.invalidate_entries_if(move |_k, &v| names.contains(v))?;
2707 assert_eq!(cache.invalidation_predicate_count(), 1);
2708 expected.push((Arc::new(0), "alice", RemovalCause::Explicit));
2709 expected.push((Arc::new(2), "alex", RemovalCause::Explicit));
2710
2711 mock.increment(Duration::from_secs(5)); // 10 secs from the start.
2712
2713 cache.insert(3, "alice").await;
2714
2715 // Run the invalidation task and wait for it to finish. (TODO: Need a better way than sleeping)
2716 cache.run_pending_tasks().await; // To submit the invalidation task.
2717 std::thread::sleep(Duration::from_millis(200));
2718 cache.run_pending_tasks().await; // To process the task result.
2719 std::thread::sleep(Duration::from_millis(200));
2720
2721 assert!(cache.get(&0).await.is_none());
2722 assert!(cache.get(&2).await.is_none());
2723 assert_eq!(cache.get(&1).await, Some("bob"));
2724 // This should survive as it was inserted after calling invalidate_entries_if.
2725 assert_eq!(cache.get(&3).await, Some("alice"));
2726
2727 assert!(!cache.contains_key(&0));
2728 assert!(cache.contains_key(&1));
2729 assert!(!cache.contains_key(&2));
2730 assert!(cache.contains_key(&3));
2731
2732 assert_eq!(cache.entry_count(), 2);
2733 assert_eq!(cache.invalidation_predicate_count(), 0);
2734
2735 mock.increment(Duration::from_secs(5)); // 15 secs from the start.
2736
2737 cache.invalidate_entries_if(|_k, &v| v == "alice")?;
2738 cache.invalidate_entries_if(|_k, &v| v == "bob")?;
2739 assert_eq!(cache.invalidation_predicate_count(), 2);
2740 // key 1 was inserted before key 3.
2741 expected.push((Arc::new(1), "bob", RemovalCause::Explicit));
2742 expected.push((Arc::new(3), "alice", RemovalCause::Explicit));
2743
2744 // Run the invalidation task and wait for it to finish. (TODO: Need a better way than sleeping)
2745 cache.run_pending_tasks().await; // To submit the invalidation task.
2746 std::thread::sleep(Duration::from_millis(200));
2747 cache.run_pending_tasks().await; // To process the task result.
2748 std::thread::sleep(Duration::from_millis(200));
2749
2750 assert!(cache.get(&1).await.is_none());
2751 assert!(cache.get(&3).await.is_none());
2752
2753 assert!(!cache.contains_key(&1));
2754 assert!(!cache.contains_key(&3));
2755
2756 assert_eq!(cache.entry_count(), 0);
2757 assert_eq!(cache.invalidation_predicate_count(), 0);
2758
2759 verify_notification_vec(&cache, actual, &expected).await;
2760
2761 Ok(())
2762 }
2763
2764 #[tokio::test]
2765 async fn time_to_live() {
2766 // The following `Vec`s will hold actual and expected notifications.
2767 let actual = Arc::new(Mutex::new(Vec::new()));
2768 let mut expected = Vec::new();
2769
2770 // Create an eviction listener.
2771 let a1 = Arc::clone(&actual);
2772 let listener = move |k, v, cause| -> ListenerFuture {
2773 let a2 = Arc::clone(&a1);
2774 async move {
2775 a2.lock().await.push((k, v, cause));
2776 }
2777 .boxed()
2778 };
2779
2780 let (clock, mock) = Clock::mock();
2781
2782 // Create a cache with the eviction listener.
2783 let mut cache = Cache::builder()
2784 .max_capacity(100)
2785 .time_to_live(Duration::from_secs(10))
2786 .async_eviction_listener(listener)
2787 .clock(clock)
2788 .build();
2789 cache.reconfigure_for_testing().await;
2790
2791 // Make the cache exterior immutable.
2792 let cache = cache;
2793
2794 cache.insert("a", "alice").await;
2795 cache.run_pending_tasks().await;
2796
2797 mock.increment(Duration::from_secs(5)); // 5 secs from the start.
2798 cache.run_pending_tasks().await;
2799
2800 assert_eq!(cache.get(&"a").await, Some("alice"));
2801 assert!(cache.contains_key(&"a"));
2802
2803 mock.increment(Duration::from_secs(5)); // 10 secs.
2804 expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
2805 assert_eq!(cache.get(&"a").await, None);
2806 assert!(!cache.contains_key(&"a"));
2807
2808 assert_eq!(cache.iter().count(), 0);
2809
2810 cache.run_pending_tasks().await;
2811 assert!(cache.is_table_empty());
2812
2813 cache.insert("b", "bob").await;
2814 cache.run_pending_tasks().await;
2815
2816 assert_eq!(cache.entry_count(), 1);
2817
2818 mock.increment(Duration::from_secs(5)); // 15 secs.
2819 cache.run_pending_tasks().await;
2820
2821 assert_eq!(cache.get(&"b").await, Some("bob"));
2822 assert!(cache.contains_key(&"b"));
2823 assert_eq!(cache.entry_count(), 1);
2824
2825 cache.insert("b", "bill").await;
2826 expected.push((Arc::new("b"), "bob", RemovalCause::Replaced));
2827 cache.run_pending_tasks().await;
2828
2829 mock.increment(Duration::from_secs(5)); // 20 secs
2830 cache.run_pending_tasks().await;
2831
2832 assert_eq!(cache.get(&"b").await, Some("bill"));
2833 assert!(cache.contains_key(&"b"));
2834 assert_eq!(cache.entry_count(), 1);
2835
2836 mock.increment(Duration::from_secs(5)); // 25 secs
2837 expected.push((Arc::new("b"), "bill", RemovalCause::Expired));
2838
2839 assert_eq!(cache.get(&"a").await, None);
2840 assert_eq!(cache.get(&"b").await, None);
2841 assert!(!cache.contains_key(&"a"));
2842 assert!(!cache.contains_key(&"b"));
2843
2844 assert_eq!(cache.iter().count(), 0);
2845
2846 cache.run_pending_tasks().await;
2847 assert!(cache.is_table_empty());
2848
2849 verify_notification_vec(&cache, actual, &expected).await;
2850 }
2851
2852 #[tokio::test]
2853 async fn time_to_idle() {
2854 // The following `Vec`s will hold actual and expected notifications.
2855 let actual = Arc::new(Mutex::new(Vec::new()));
2856 let mut expected = Vec::new();
2857
2858 // Create an eviction listener.
2859 let a1 = Arc::clone(&actual);
2860 let listener = move |k, v, cause| -> ListenerFuture {
2861 let a2 = Arc::clone(&a1);
2862 async move {
2863 a2.lock().await.push((k, v, cause));
2864 }
2865 .boxed()
2866 };
2867
2868 let (clock, mock) = Clock::mock();
2869
2870 // Create a cache with the eviction listener.
2871 let mut cache = Cache::builder()
2872 .max_capacity(100)
2873 .time_to_idle(Duration::from_secs(10))
2874 .async_eviction_listener(listener)
2875 .clock(clock)
2876 .build();
2877 cache.reconfigure_for_testing().await;
2878
2879 // Make the cache exterior immutable.
2880 let cache = cache;
2881
2882 cache.insert("a", "alice").await;
2883 cache.run_pending_tasks().await;
2884
2885 mock.increment(Duration::from_secs(5)); // 5 secs from the start.
2886 cache.run_pending_tasks().await;
2887
2888 assert_eq!(cache.get(&"a").await, Some("alice"));
2889
2890 mock.increment(Duration::from_secs(5)); // 10 secs.
2891 cache.run_pending_tasks().await;
2892
2893 cache.insert("b", "bob").await;
2894 cache.run_pending_tasks().await;
2895
2896 assert_eq!(cache.entry_count(), 2);
2897
2898 mock.increment(Duration::from_secs(2)); // 12 secs.
2899 cache.run_pending_tasks().await;
2900
2901 // contains_key does not reset the idle timer for the key.
2902 assert!(cache.contains_key(&"a"));
2903 assert!(cache.contains_key(&"b"));
2904 cache.run_pending_tasks().await;
2905
2906 assert_eq!(cache.entry_count(), 2);
2907
2908 mock.increment(Duration::from_secs(3)); // 15 secs.
2909 expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
2910
2911 assert_eq!(cache.get(&"a").await, None);
2912 assert_eq!(cache.get(&"b").await, Some("bob"));
2913 assert!(!cache.contains_key(&"a"));
2914 assert!(cache.contains_key(&"b"));
2915
2916 assert_eq!(cache.iter().count(), 1);
2917
2918 cache.run_pending_tasks().await;
2919 assert_eq!(cache.entry_count(), 1);
2920
2921 mock.increment(Duration::from_secs(10)); // 25 secs
2922 expected.push((Arc::new("b"), "bob", RemovalCause::Expired));
2923
2924 assert_eq!(cache.get(&"a").await, None);
2925 assert_eq!(cache.get(&"b").await, None);
2926 assert!(!cache.contains_key(&"a"));
2927 assert!(!cache.contains_key(&"b"));
2928
2929 assert_eq!(cache.iter().count(), 0);
2930
2931 cache.run_pending_tasks().await;
2932 assert!(cache.is_table_empty());
2933
2934 verify_notification_vec(&cache, actual, &expected).await;
2935 }
2936
2937 // https://github.com/moka-rs/moka/issues/359
2938 #[tokio::test]
2939 async fn ensure_access_time_is_updated_immediately_after_read() {
2940 let (clock, mock) = Clock::mock();
2941 let mut cache = Cache::builder()
2942 .max_capacity(10)
2943 .time_to_idle(Duration::from_secs(5))
2944 .clock(clock)
2945 .build();
2946 cache.reconfigure_for_testing().await;
2947 // Make the cache exterior immutable.
2948 let cache = cache;
2949
2950 cache.insert(1, 1).await;
2951
2952 mock.increment(Duration::from_secs(4));
2953 assert_eq!(cache.get(&1).await, Some(1));
2954
2955 mock.increment(Duration::from_secs(2));
2956 assert_eq!(cache.get(&1).await, Some(1));
2957 cache.run_pending_tasks().await;
2958 assert_eq!(cache.get(&1).await, Some(1));
2959 }
2960
2961 #[tokio::test]
2962 async fn time_to_live_by_expiry_type() {
2963 // The following `Vec`s will hold actual and expected notifications.
2964 let actual = Arc::new(Mutex::new(Vec::new()));
2965 let mut expected = Vec::new();
2966
2967 // Define an expiry type.
2968 struct MyExpiry {
2969 counters: Arc<ExpiryCallCounters>,
2970 }
2971
2972 impl MyExpiry {
2973 fn new(counters: Arc<ExpiryCallCounters>) -> Self {
2974 Self { counters }
2975 }
2976 }
2977
2978 impl Expiry<&str, &str> for MyExpiry {
2979 fn expire_after_create(
2980 &self,
2981 _key: &&str,
2982 _value: &&str,
2983 _current_time: StdInstant,
2984 ) -> Option<Duration> {
2985 self.counters.incl_actual_creations();
2986 Some(Duration::from_secs(10))
2987 }
2988
2989 fn expire_after_update(
2990 &self,
2991 _key: &&str,
2992 _value: &&str,
2993 _current_time: StdInstant,
2994 _current_duration: Option<Duration>,
2995 ) -> Option<Duration> {
2996 self.counters.incl_actual_updates();
2997 Some(Duration::from_secs(10))
2998 }
2999 }
3000
3001 // Create an eviction listener.
3002 let a1 = Arc::clone(&actual);
3003 let listener = move |k, v, cause| -> ListenerFuture {
3004 let a2 = Arc::clone(&a1);
3005 async move {
3006 a2.lock().await.push((k, v, cause));
3007 }
3008 .boxed()
3009 };
3010
3011 // Create expiry counters and the expiry.
3012 let expiry_counters = Arc::new(ExpiryCallCounters::default());
3013 let expiry = MyExpiry::new(Arc::clone(&expiry_counters));
3014
3015 let (clock, mock) = Clock::mock();
3016
3017 // Create a cache with the expiry and eviction listener.
3018 let mut cache = Cache::builder()
3019 .max_capacity(100)
3020 .expire_after(expiry)
3021 .async_eviction_listener(listener)
3022 .clock(clock)
3023 .build();
3024 cache.reconfigure_for_testing().await;
3025
3026 // Make the cache exterior immutable.
3027 let cache = cache;
3028
3029 cache.insert("a", "alice").await;
3030 expiry_counters.incl_expected_creations();
3031 cache.run_pending_tasks().await;
3032
3033 mock.increment(Duration::from_secs(5)); // 5 secs from the start.
3034 cache.run_pending_tasks().await;
3035
3036 assert_eq!(cache.get(&"a").await, Some("alice"));
3037 assert!(cache.contains_key(&"a"));
3038
3039 mock.increment(Duration::from_secs(5)); // 10 secs.
3040 expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
3041 assert_eq!(cache.get(&"a").await, None);
3042 assert!(!cache.contains_key(&"a"));
3043
3044 assert_eq!(cache.iter().count(), 0);
3045
3046 cache.run_pending_tasks().await;
3047 assert!(cache.is_table_empty());
3048
3049 cache.insert("b", "bob").await;
3050 expiry_counters.incl_expected_creations();
3051 cache.run_pending_tasks().await;
3052
3053 assert_eq!(cache.entry_count(), 1);
3054
3055 mock.increment(Duration::from_secs(5)); // 15 secs.
3056 cache.run_pending_tasks().await;
3057
3058 assert_eq!(cache.get(&"b").await, Some("bob"));
3059 assert!(cache.contains_key(&"b"));
3060 assert_eq!(cache.entry_count(), 1);
3061
3062 cache.insert("b", "bill").await;
3063 expected.push((Arc::new("b"), "bob", RemovalCause::Replaced));
3064 expiry_counters.incl_expected_updates();
3065 cache.run_pending_tasks().await;
3066
3067 mock.increment(Duration::from_secs(5)); // 20 secs
3068 cache.run_pending_tasks().await;
3069
3070 assert_eq!(cache.get(&"b").await, Some("bill"));
3071 assert!(cache.contains_key(&"b"));
3072 assert_eq!(cache.entry_count(), 1);
3073
3074 mock.increment(Duration::from_secs(5)); // 25 secs
3075 expected.push((Arc::new("b"), "bill", RemovalCause::Expired));
3076
3077 assert_eq!(cache.get(&"a").await, None);
3078 assert_eq!(cache.get(&"b").await, None);
3079 assert!(!cache.contains_key(&"a"));
3080 assert!(!cache.contains_key(&"b"));
3081
3082 assert_eq!(cache.iter().count(), 0);
3083
3084 cache.run_pending_tasks().await;
3085 assert!(cache.is_table_empty());
3086
3087 expiry_counters.verify();
3088 verify_notification_vec(&cache, actual, &expected).await;
3089 }
3090
3091 #[tokio::test]
3092 async fn time_to_idle_by_expiry_type() {
3093 // The following `Vec`s will hold actual and expected notifications.
3094 let actual = Arc::new(Mutex::new(Vec::new()));
3095 let mut expected = Vec::new();
3096
3097 // Define an expiry type.
3098 struct MyExpiry {
3099 counters: Arc<ExpiryCallCounters>,
3100 }
3101
3102 impl MyExpiry {
3103 fn new(counters: Arc<ExpiryCallCounters>) -> Self {
3104 Self { counters }
3105 }
3106 }
3107
3108 impl Expiry<&str, &str> for MyExpiry {
3109 fn expire_after_read(
3110 &self,
3111 _key: &&str,
3112 _value: &&str,
3113 _current_time: StdInstant,
3114 _current_duration: Option<Duration>,
3115 _last_modified_at: StdInstant,
3116 ) -> Option<Duration> {
3117 self.counters.incl_actual_reads();
3118 Some(Duration::from_secs(10))
3119 }
3120 }
3121
3122 // Create an eviction listener.
3123 let a1 = Arc::clone(&actual);
3124 let listener = move |k, v, cause| -> ListenerFuture {
3125 let a2 = Arc::clone(&a1);
3126 async move {
3127 a2.lock().await.push((k, v, cause));
3128 }
3129 .boxed()
3130 };
3131
3132 // Create expiry counters and the expiry.
3133 let expiry_counters = Arc::new(ExpiryCallCounters::default());
3134 let expiry = MyExpiry::new(Arc::clone(&expiry_counters));
3135
3136 let (clock, mock) = Clock::mock();
3137
3138 // Create a cache with the expiry and eviction listener.
3139 let mut cache = Cache::builder()
3140 .max_capacity(100)
3141 .expire_after(expiry)
3142 .async_eviction_listener(listener)
3143 .clock(clock)
3144 .build();
3145 cache.reconfigure_for_testing().await;
3146
3147 // Make the cache exterior immutable.
3148 let cache = cache;
3149
3150 cache.insert("a", "alice").await;
3151 cache.run_pending_tasks().await;
3152
3153 mock.increment(Duration::from_secs(5)); // 5 secs from the start.
3154 cache.run_pending_tasks().await;
3155
3156 assert_eq!(cache.get(&"a").await, Some("alice"));
3157 expiry_counters.incl_expected_reads();
3158
3159 mock.increment(Duration::from_secs(5)); // 10 secs.
3160 cache.run_pending_tasks().await;
3161
3162 cache.insert("b", "bob").await;
3163 cache.run_pending_tasks().await;
3164
3165 assert_eq!(cache.entry_count(), 2);
3166
3167 mock.increment(Duration::from_secs(2)); // 12 secs.
3168 cache.run_pending_tasks().await;
3169
3170 // contains_key does not reset the idle timer for the key.
3171 assert!(cache.contains_key(&"a"));
3172 assert!(cache.contains_key(&"b"));
3173 cache.run_pending_tasks().await;
3174
3175 assert_eq!(cache.entry_count(), 2);
3176
3177 mock.increment(Duration::from_secs(3)); // 15 secs.
3178 expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
3179
3180 assert_eq!(cache.get(&"a").await, None);
3181 assert_eq!(cache.get(&"b").await, Some("bob"));
3182 expiry_counters.incl_expected_reads();
3183 assert!(!cache.contains_key(&"a"));
3184 assert!(cache.contains_key(&"b"));
3185
3186 assert_eq!(cache.iter().count(), 1);
3187
3188 cache.run_pending_tasks().await;
3189 assert_eq!(cache.entry_count(), 1);
3190
3191 mock.increment(Duration::from_secs(10)); // 25 secs
3192 expected.push((Arc::new("b"), "bob", RemovalCause::Expired));
3193
3194 assert_eq!(cache.get(&"a").await, None);
3195 assert_eq!(cache.get(&"b").await, None);
3196 assert!(!cache.contains_key(&"a"));
3197 assert!(!cache.contains_key(&"b"));
3198
3199 assert_eq!(cache.iter().count(), 0);
3200
3201 cache.run_pending_tasks().await;
3202 assert!(cache.is_table_empty());
3203
3204 expiry_counters.verify();
3205 verify_notification_vec(&cache, actual, &expected).await;
3206 }
3207
3208 /// Verify that the `Expiry::expire_after_read()` method is called in `get_with`
3209 /// only when the key was already present in the cache.
3210 #[tokio::test]
3211 async fn test_expiry_using_get_with() {
3212 // Define an expiry type, which always return `None`.
3213 struct NoExpiry {
3214 counters: Arc<ExpiryCallCounters>,
3215 }
3216
3217 impl NoExpiry {
3218 fn new(counters: Arc<ExpiryCallCounters>) -> Self {
3219 Self { counters }
3220 }
3221 }
3222
3223 impl Expiry<&str, &str> for NoExpiry {
3224 fn expire_after_create(
3225 &self,
3226 _key: &&str,
3227 _value: &&str,
3228 _current_time: StdInstant,
3229 ) -> Option<Duration> {
3230 self.counters.incl_actual_creations();
3231 None
3232 }
3233
3234 fn expire_after_read(
3235 &self,
3236 _key: &&str,
3237 _value: &&str,
3238 _current_time: StdInstant,
3239 _current_duration: Option<Duration>,
3240 _last_modified_at: StdInstant,
3241 ) -> Option<Duration> {
3242 self.counters.incl_actual_reads();
3243 None
3244 }
3245
3246 fn expire_after_update(
3247 &self,
3248 _key: &&str,
3249 _value: &&str,
3250 _current_time: StdInstant,
3251 _current_duration: Option<Duration>,
3252 ) -> Option<Duration> {
3253 unreachable!("The `expire_after_update()` method should not be called.");
3254 }
3255 }
3256
3257 // Create expiry counters and the expiry.
3258 let expiry_counters = Arc::new(ExpiryCallCounters::default());
3259 let expiry = NoExpiry::new(Arc::clone(&expiry_counters));
3260
3261 // Create a cache with the expiry and eviction listener.
3262 let mut cache = Cache::builder()
3263 .max_capacity(100)
3264 .expire_after(expiry)
3265 .build();
3266 cache.reconfigure_for_testing().await;
3267
3268 // Make the cache exterior immutable.
3269 let cache = cache;
3270
3271 // The key is not present.
3272 cache.get_with("a", async { "alice" }).await;
3273 expiry_counters.incl_expected_creations();
3274 cache.run_pending_tasks().await;
3275
3276 // The key is present.
3277 cache.get_with("a", async { "alex" }).await;
3278 expiry_counters.incl_expected_reads();
3279 cache.run_pending_tasks().await;
3280
3281 // The key is not present.
3282 cache.invalidate("a").await;
3283 cache.get_with("a", async { "amanda" }).await;
3284 expiry_counters.incl_expected_creations();
3285 cache.run_pending_tasks().await;
3286
3287 expiry_counters.verify();
3288 }
3289
3290 // https://github.com/moka-rs/moka/issues/345
3291 #[tokio::test]
3292 async fn test_race_between_updating_entry_and_processing_its_write_ops() {
3293 let (clock, mock) = Clock::mock();
3294 let cache = Cache::builder()
3295 .max_capacity(2)
3296 .time_to_idle(Duration::from_secs(1))
3297 .clock(clock)
3298 .build();
3299
3300 cache.insert("a", "alice").await;
3301 cache.insert("b", "bob").await;
3302 cache.insert("c", "cathy").await; // c1
3303 mock.increment(Duration::from_secs(2));
3304
3305 // The following `insert` will do the followings:
3306 // 1. Replaces current "c" (c1) in the concurrent hash table (cht).
3307 // 2. Runs the pending tasks implicitly.
3308 // (1) "a" will be admitted.
3309 // (2) "b" will be admitted.
3310 // (3) c1 will be evicted by size constraint.
3311 // (4) "a" will be evicted due to expiration.
3312 // (5) "b" will be evicted due to expiration.
3313 // 3. Send its `WriteOp` log to the channel.
3314 cache.insert("c", "cindy").await; // c2
3315
3316 // Remove "c" (c2) from the cht.
3317 assert_eq!(cache.remove(&"c").await, Some("cindy")); // c-remove
3318
3319 mock.increment(Duration::from_secs(2));
3320
3321 // The following `run_pending_tasks` will do the followings:
3322 // 1. Admits "c" (c2) to the cache. (Create a node in the LRU deque)
3323 // 2. Because of c-remove, removes c2's node from the LRU deque.
3324 cache.run_pending_tasks().await;
3325 assert_eq!(cache.entry_count(), 0);
3326 }
3327
3328 #[tokio::test]
3329 async fn test_race_between_recreating_entry_and_processing_its_write_ops() {
3330 let cache = Cache::builder().max_capacity(2).build();
3331
3332 cache.insert('a', "a").await;
3333 cache.insert('b', "b").await;
3334 cache.run_pending_tasks().await;
3335
3336 cache.insert('c', "c1").await; // (a) `EntryInfo` 1, gen: 1
3337 assert!(cache.remove(&'a').await.is_some()); // (b)
3338 assert!(cache.remove(&'b').await.is_some()); // (c)
3339 assert!(cache.remove(&'c').await.is_some()); // (d) `EntryInfo` 1, gen: 2
3340 cache.insert('c', "c2").await; // (e) `EntryInfo` 2, gen: 1
3341
3342 // Now the `write_op_ch` channel contains the following `WriteOp`s:
3343 //
3344 // - 0: (a) insert "c1" (`EntryInfo` 1, gen: 1)
3345 // - 1: (b) remove "a"
3346 // - 2: (c) remove "b"
3347 // - 3: (d) remove "c1" (`EntryInfo` 1, gen: 2)
3348 // - 4: (e) insert "c2" (`EntryInfo` 2, gen: 1)
3349 //
3350 // 0 for "c1" is going to be rejected because the cache is full. Let's ensure
3351 // processing 0 must not remove "c2" from the concurrent hash table. (Their
3352 // gen are the same, but `EntryInfo`s are different)
3353 cache.run_pending_tasks().await;
3354 assert_eq!(cache.get(&'c').await, Some("c2"));
3355 }
3356
3357 #[tokio::test]
3358 async fn test_iter() {
3359 const NUM_KEYS: usize = 50;
3360
3361 fn make_value(key: usize) -> String {
3362 format!("val: {key}")
3363 }
3364
3365 let cache = Cache::builder()
3366 .max_capacity(100)
3367 .time_to_idle(Duration::from_secs(10))
3368 .build();
3369
3370 for key in 0..NUM_KEYS {
3371 cache.insert(key, make_value(key)).await;
3372 }
3373
3374 let mut key_set = std::collections::HashSet::new();
3375
3376 for (key, value) in &cache {
3377 assert_eq!(value, make_value(*key));
3378
3379 key_set.insert(*key);
3380 }
3381
3382 // Ensure there are no missing or duplicate keys in the iteration.
3383 assert_eq!(key_set.len(), NUM_KEYS);
3384 }
3385
3386 /// Runs 16 async tasks at the same time and ensures no deadlock occurs.
3387 ///
3388 /// - Eight of the task will update key-values in the cache.
3389 /// - Eight others will iterate the cache.
3390 ///
3391 #[tokio::test]
3392 async fn test_iter_multi_async_tasks() {
3393 use std::collections::HashSet;
3394
3395 const NUM_KEYS: usize = 1024;
3396 const NUM_TASKS: usize = 16;
3397
3398 fn make_value(key: usize) -> String {
3399 format!("val: {key}")
3400 }
3401
3402 let cache = Cache::builder()
3403 .max_capacity(2048)
3404 .time_to_idle(Duration::from_secs(10))
3405 .build();
3406
3407 // Initialize the cache.
3408 for key in 0..NUM_KEYS {
3409 cache.insert(key, make_value(key)).await;
3410 }
3411
3412 let rw_lock = Arc::new(tokio::sync::RwLock::<()>::default());
3413 let write_lock = rw_lock.write().await;
3414
3415 let tasks = (0..NUM_TASKS)
3416 .map(|n| {
3417 let cache = cache.clone();
3418 let rw_lock = Arc::clone(&rw_lock);
3419
3420 if n % 2 == 0 {
3421 // This thread will update the cache.
3422 tokio::spawn(async move {
3423 let read_lock = rw_lock.read().await;
3424 for key in 0..NUM_KEYS {
3425 // TODO: Update keys in a random order?
3426 cache.insert(key, make_value(key)).await;
3427 }
3428 std::mem::drop(read_lock);
3429 })
3430 } else {
3431 // This thread will iterate the cache.
3432 tokio::spawn(async move {
3433 let read_lock = rw_lock.read().await;
3434 let mut key_set = HashSet::new();
3435 // let mut key_count = 0usize;
3436 for (key, value) in &cache {
3437 assert_eq!(value, make_value(*key));
3438 key_set.insert(*key);
3439 // key_count += 1;
3440 }
3441 // Ensure there are no missing or duplicate keys in the iteration.
3442 assert_eq!(key_set.len(), NUM_KEYS);
3443 std::mem::drop(read_lock);
3444 })
3445 }
3446 })
3447 .collect::<Vec<_>>();
3448
3449 // Let these threads to run by releasing the write lock.
3450 std::mem::drop(write_lock);
3451
3452 let _ = futures_util::future::join_all(tasks).await;
3453
3454 // Ensure there are no missing or duplicate keys in the iteration.
3455 let key_set = cache.iter().map(|(k, _v)| *k).collect::<HashSet<_>>();
3456 assert_eq!(key_set.len(), NUM_KEYS);
3457 }
3458
3459 #[tokio::test]
3460 async fn get_with() {
3461 let cache = Cache::new(100);
3462 const KEY: u32 = 0;
3463
3464 // This test will run five async tasks:
3465 //
3466 // Task1 will be the first task to call `get_with` for a key, so its async
3467 // block will be evaluated and then a &str value "task1" will be inserted to
3468 // the cache.
3469 let task1 = {
3470 let cache1 = cache.clone();
3471 async move {
3472 // Call `get_with` immediately.
3473 let v = cache1
3474 .get_with(KEY, async {
3475 // Wait for 300 ms and return a &str value.
3476 sleep(Duration::from_millis(300)).await;
3477 "task1"
3478 })
3479 .await;
3480 assert_eq!(v, "task1");
3481 }
3482 };
3483
3484 // Task2 will be the second task to call `get_with` for the same key, so its
3485 // async block will not be evaluated. Once task1's async block finishes, it
3486 // will get the value inserted by task1's async block.
3487 let task2 = {
3488 let cache2 = cache.clone();
3489 async move {
3490 // Wait for 100 ms before calling `get_with`.
3491 sleep(Duration::from_millis(100)).await;
3492 let v = cache2.get_with(KEY, async { unreachable!() }).await;
3493 assert_eq!(v, "task1");
3494 }
3495 };
3496
3497 // Task3 will be the third task to call `get_with` for the same key. By the
3498 // time it calls, task1's async block should have finished already and the
3499 // value should be already inserted to the cache. So its async block will not
3500 // be evaluated and will get the value inserted by task1's async block
3501 // immediately.
3502 let task3 = {
3503 let cache3 = cache.clone();
3504 async move {
3505 // Wait for 400 ms before calling `get_with`.
3506 sleep(Duration::from_millis(400)).await;
3507 let v = cache3.get_with(KEY, async { unreachable!() }).await;
3508 assert_eq!(v, "task1");
3509 }
3510 };
3511
3512 // Task4 will call `get` for the same key. It will call when task1's async
3513 // block is still running, so it will get none for the key.
3514 let task4 = {
3515 let cache4 = cache.clone();
3516 async move {
3517 // Wait for 200 ms before calling `get`.
3518 sleep(Duration::from_millis(200)).await;
3519 let maybe_v = cache4.get(&KEY).await;
3520 assert!(maybe_v.is_none());
3521 }
3522 };
3523
3524 // Task5 will call `get` for the same key. It will call after task1's async
3525 // block finished, so it will get the value insert by task1's async block.
3526 let task5 = {
3527 let cache5 = cache.clone();
3528 async move {
3529 // Wait for 400 ms before calling `get`.
3530 sleep(Duration::from_millis(400)).await;
3531 let maybe_v = cache5.get(&KEY).await;
3532 assert_eq!(maybe_v, Some("task1"));
3533 }
3534 };
3535
3536 futures_util::join!(task1, task2, task3, task4, task5);
3537
3538 assert!(cache.is_waiter_map_empty());
3539 }
3540
3541 #[tokio::test]
3542 async fn get_with_by_ref() {
3543 let cache = Cache::new(100);
3544 const KEY: &u32 = &0;
3545
3546 // This test will run five async tasks:
3547 //
3548 // Task1 will be the first task to call `get_with_by_ref` for a key, so its async
3549 // block will be evaluated and then a &str value "task1" will be inserted to
3550 // the cache.
3551 let task1 = {
3552 let cache1 = cache.clone();
3553 async move {
3554 // Call `get_with_by_ref` immediately.
3555 let v = cache1
3556 .get_with_by_ref(KEY, async {
3557 // Wait for 300 ms and return a &str value.
3558 sleep(Duration::from_millis(300)).await;
3559 "task1"
3560 })
3561 .await;
3562 assert_eq!(v, "task1");
3563 }
3564 };
3565
3566 // Task2 will be the second task to call `get_with_by_ref` for the same key, so its
3567 // async block will not be evaluated. Once task1's async block finishes, it
3568 // will get the value inserted by task1's async block.
3569 let task2 = {
3570 let cache2 = cache.clone();
3571 async move {
3572 // Wait for 100 ms before calling `get_with_by_ref`.
3573 sleep(Duration::from_millis(100)).await;
3574 let v = cache2.get_with_by_ref(KEY, async { unreachable!() }).await;
3575 assert_eq!(v, "task1");
3576 }
3577 };
3578
3579 // Task3 will be the third task to call `get_with_by_ref` for the same key. By the
3580 // time it calls, task1's async block should have finished already and the
3581 // value should be already inserted to the cache. So its async block will not
3582 // be evaluated and will get the value inserted by task1's async block
3583 // immediately.
3584 let task3 = {
3585 let cache3 = cache.clone();
3586 async move {
3587 // Wait for 400 ms before calling `get_with_by_ref`.
3588 sleep(Duration::from_millis(400)).await;
3589 let v = cache3.get_with_by_ref(KEY, async { unreachable!() }).await;
3590 assert_eq!(v, "task1");
3591 }
3592 };
3593
3594 // Task4 will call `get` for the same key. It will call when task1's async
3595 // block is still running, so it will get none for the key.
3596 let task4 = {
3597 let cache4 = cache.clone();
3598 async move {
3599 // Wait for 200 ms before calling `get`.
3600 sleep(Duration::from_millis(200)).await;
3601 let maybe_v = cache4.get(KEY).await;
3602 assert!(maybe_v.is_none());
3603 }
3604 };
3605
3606 // Task5 will call `get` for the same key. It will call after task1's async
3607 // block finished, so it will get the value insert by task1's async block.
3608 let task5 = {
3609 let cache5 = cache.clone();
3610 async move {
3611 // Wait for 400 ms before calling `get`.
3612 sleep(Duration::from_millis(400)).await;
3613 let maybe_v = cache5.get(KEY).await;
3614 assert_eq!(maybe_v, Some("task1"));
3615 }
3616 };
3617
3618 futures_util::join!(task1, task2, task3, task4, task5);
3619
3620 assert!(cache.is_waiter_map_empty());
3621 }
3622
3623 #[tokio::test]
3624 async fn entry_or_insert_with_if() {
3625 let cache = Cache::new(100);
3626 const KEY: u32 = 0;
3627
3628 // This test will run seven async tasks:
3629 //
3630 // Task1 will be the first task to call `or_insert_with_if` for a key, so its
3631 // async block will be evaluated and then a &str value "task1" will be
3632 // inserted to the cache.
3633 let task1 = {
3634 let cache1 = cache.clone();
3635 async move {
3636 // Call `or_insert_with_if` immediately.
3637 let entry = cache1
3638 .entry(KEY)
3639 .or_insert_with_if(
3640 async {
3641 // Wait for 300 ms and return a &str value.
3642 sleep(Duration::from_millis(300)).await;
3643 "task1"
3644 },
3645 |_v| unreachable!(),
3646 )
3647 .await;
3648 // Entry should be fresh because our async block should have been
3649 // evaluated.
3650 assert!(entry.is_fresh());
3651 assert_eq!(entry.into_value(), "task1");
3652 }
3653 };
3654
3655 // Task2 will be the second task to call `or_insert_with_if` for the same
3656 // key, so its async block will not be evaluated. Once task1's async block
3657 // finishes, it will get the value inserted by task1's async block.
3658 let task2 = {
3659 let cache2 = cache.clone();
3660 async move {
3661 // Wait for 100 ms before calling `or_insert_with_if`.
3662 sleep(Duration::from_millis(100)).await;
3663 let entry = cache2
3664 .entry(KEY)
3665 .or_insert_with_if(async { unreachable!() }, |_v| unreachable!())
3666 .await;
3667 // Entry should not be fresh because task1's async block should have
3668 // been evaluated instead of ours.
3669 assert!(!entry.is_fresh());
3670 assert_eq!(entry.into_value(), "task1");
3671 }
3672 };
3673
3674 // Task3 will be the third task to call `or_insert_with_if` for the same key.
3675 // By the time it calls, task1's async block should have finished already and
3676 // the value should be already inserted to the cache. Also task3's
3677 // `replace_if` closure returns `false`. So its async block will not be
3678 // evaluated and will get the value inserted by task1's async block
3679 // immediately.
3680 let task3 = {
3681 let cache3 = cache.clone();
3682 async move {
3683 // Wait for 350 ms before calling `or_insert_with_if`.
3684 sleep(Duration::from_millis(350)).await;
3685 let entry = cache3
3686 .entry(KEY)
3687 .or_insert_with_if(async { unreachable!() }, |v| {
3688 assert_eq!(v, &"task1");
3689 false
3690 })
3691 .await;
3692 assert!(!entry.is_fresh());
3693 assert_eq!(entry.into_value(), "task1");
3694 }
3695 };
3696
3697 // Task4 will be the fourth task to call `or_insert_with_if` for the same
3698 // key. The value should have been already inserted to the cache by task1.
3699 // However task4's `replace_if` closure returns `true`. So its async block
3700 // will be evaluated to replace the current value.
3701 let task4 = {
3702 let cache4 = cache.clone();
3703 async move {
3704 // Wait for 400 ms before calling `or_insert_with_if`.
3705 sleep(Duration::from_millis(400)).await;
3706 let entry = cache4
3707 .entry(KEY)
3708 .or_insert_with_if(async { "task4" }, |v| {
3709 assert_eq!(v, &"task1");
3710 true
3711 })
3712 .await;
3713 assert!(entry.is_fresh());
3714 assert_eq!(entry.into_value(), "task4");
3715 }
3716 };
3717
3718 // Task5 will call `get` for the same key. It will call when task1's async
3719 // block is still running, so it will get none for the key.
3720 let task5 = {
3721 let cache5 = cache.clone();
3722 async move {
3723 // Wait for 200 ms before calling `get`.
3724 sleep(Duration::from_millis(200)).await;
3725 let maybe_v = cache5.get(&KEY).await;
3726 assert!(maybe_v.is_none());
3727 }
3728 };
3729
3730 // Task6 will call `get` for the same key. It will call after task1's async
3731 // block finished, so it will get the value insert by task1's async block.
3732 let task6 = {
3733 let cache6 = cache.clone();
3734 async move {
3735 // Wait for 350 ms before calling `get`.
3736 sleep(Duration::from_millis(350)).await;
3737 let maybe_v = cache6.get(&KEY).await;
3738 assert_eq!(maybe_v, Some("task1"));
3739 }
3740 };
3741
3742 // Task7 will call `get` for the same key. It will call after task4's async
3743 // block finished, so it will get the value insert by task4's async block.
3744 let task7 = {
3745 let cache7 = cache.clone();
3746 async move {
3747 // Wait for 450 ms before calling `get`.
3748 sleep(Duration::from_millis(450)).await;
3749 let maybe_v = cache7.get(&KEY).await;
3750 assert_eq!(maybe_v, Some("task4"));
3751 }
3752 };
3753
3754 futures_util::join!(task1, task2, task3, task4, task5, task6, task7);
3755
3756 assert!(cache.is_waiter_map_empty());
3757 }
3758
3759 #[tokio::test]
3760 async fn entry_by_ref_or_insert_with_if() {
3761 let cache = Cache::new(100);
3762 const KEY: &u32 = &0;
3763
3764 // This test will run seven async tasks:
3765 //
3766 // Task1 will be the first task to call `or_insert_with_if` for a key, so its
3767 // async block will be evaluated and then a &str value "task1" will be
3768 // inserted to the cache.
3769 let task1 = {
3770 let cache1 = cache.clone();
3771 async move {
3772 // Call `or_insert_with_if` immediately.
3773 let entry = cache1
3774 .entry_by_ref(KEY)
3775 .or_insert_with_if(
3776 async {
3777 // Wait for 300 ms and return a &str value.
3778 sleep(Duration::from_millis(300)).await;
3779 "task1"
3780 },
3781 |_v| unreachable!(),
3782 )
3783 .await;
3784 // Entry should be fresh because our async block should have been
3785 // evaluated.
3786 assert!(entry.is_fresh());
3787 assert_eq!(entry.into_value(), "task1");
3788 }
3789 };
3790
3791 // Task2 will be the second task to call `or_insert_with_if` for the same
3792 // key, so its async block will not be evaluated. Once task1's async block
3793 // finishes, it will get the value inserted by task1's async block.
3794 let task2 = {
3795 let cache2 = cache.clone();
3796 async move {
3797 // Wait for 100 ms before calling `or_insert_with_if`.
3798 sleep(Duration::from_millis(100)).await;
3799 let entry = cache2
3800 .entry_by_ref(KEY)
3801 .or_insert_with_if(async { unreachable!() }, |_v| unreachable!())
3802 .await;
3803 // Entry should not be fresh because task1's async block should have
3804 // been evaluated instead of ours.
3805 assert!(!entry.is_fresh());
3806 assert_eq!(entry.into_value(), "task1");
3807 }
3808 };
3809
3810 // Task3 will be the third task to call `or_insert_with_if` for the same key.
3811 // By the time it calls, task1's async block should have finished already and
3812 // the value should be already inserted to the cache. Also task3's
3813 // `replace_if` closure returns `false`. So its async block will not be
3814 // evaluated and will get the value inserted by task1's async block
3815 // immediately.
3816 let task3 = {
3817 let cache3 = cache.clone();
3818 async move {
3819 // Wait for 350 ms before calling `or_insert_with_if`.
3820 sleep(Duration::from_millis(350)).await;
3821 let entry = cache3
3822 .entry_by_ref(KEY)
3823 .or_insert_with_if(async { unreachable!() }, |v| {
3824 assert_eq!(v, &"task1");
3825 false
3826 })
3827 .await;
3828 assert!(!entry.is_fresh());
3829 assert_eq!(entry.into_value(), "task1");
3830 }
3831 };
3832
3833 // Task4 will be the fourth task to call `or_insert_with_if` for the same
3834 // key. The value should have been already inserted to the cache by task1.
3835 // However task4's `replace_if` closure returns `true`. So its async block
3836 // will be evaluated to replace the current value.
3837 let task4 = {
3838 let cache4 = cache.clone();
3839 async move {
3840 // Wait for 400 ms before calling `or_insert_with_if`.
3841 sleep(Duration::from_millis(400)).await;
3842 let entry = cache4
3843 .entry_by_ref(KEY)
3844 .or_insert_with_if(async { "task4" }, |v| {
3845 assert_eq!(v, &"task1");
3846 true
3847 })
3848 .await;
3849 assert!(entry.is_fresh());
3850 assert_eq!(entry.into_value(), "task4");
3851 }
3852 };
3853
3854 // Task5 will call `get` for the same key. It will call when task1's async
3855 // block is still running, so it will get none for the key.
3856 let task5 = {
3857 let cache5 = cache.clone();
3858 async move {
3859 // Wait for 200 ms before calling `get`.
3860 sleep(Duration::from_millis(200)).await;
3861 let maybe_v = cache5.get(KEY).await;
3862 assert!(maybe_v.is_none());
3863 }
3864 };
3865
3866 // Task6 will call `get` for the same key. It will call after task1's async
3867 // block finished, so it will get the value insert by task1's async block.
3868 let task6 = {
3869 let cache6 = cache.clone();
3870 async move {
3871 // Wait for 350 ms before calling `get`.
3872 sleep(Duration::from_millis(350)).await;
3873 let maybe_v = cache6.get(KEY).await;
3874 assert_eq!(maybe_v, Some("task1"));
3875 }
3876 };
3877
3878 // Task7 will call `get` for the same key. It will call after task4's async
3879 // block finished, so it will get the value insert by task4's async block.
3880 let task7 = {
3881 let cache7 = cache.clone();
3882 async move {
3883 // Wait for 450 ms before calling `get`.
3884 sleep(Duration::from_millis(450)).await;
3885 let maybe_v = cache7.get(KEY).await;
3886 assert_eq!(maybe_v, Some("task4"));
3887 }
3888 };
3889
3890 futures_util::join!(task1, task2, task3, task4, task5, task6, task7);
3891
3892 assert!(cache.is_waiter_map_empty());
3893 }
3894
3895 #[tokio::test]
3896 async fn try_get_with() {
3897 use std::sync::Arc;
3898
3899 // Note that MyError does not implement std::error::Error trait
3900 // like anyhow::Error.
3901 #[derive(Debug)]
3902 pub struct MyError(#[allow(dead_code)] String);
3903
3904 type MyResult<T> = Result<T, Arc<MyError>>;
3905
3906 let cache = Cache::new(100);
3907 const KEY: u32 = 0;
3908
3909 // This test will run eight async tasks:
3910 //
3911 // Task1 will be the first task to call `get_with` for a key, so its async
3912 // block will be evaluated and then an error will be returned. Nothing will
3913 // be inserted to the cache.
3914 let task1 = {
3915 let cache1 = cache.clone();
3916 async move {
3917 // Call `try_get_with` immediately.
3918 let v = cache1
3919 .try_get_with(KEY, async {
3920 // Wait for 300 ms and return an error.
3921 sleep(Duration::from_millis(300)).await;
3922 Err(MyError("task1 error".into()))
3923 })
3924 .await;
3925 assert!(v.is_err());
3926 }
3927 };
3928
3929 // Task2 will be the second task to call `get_with` for the same key, so its
3930 // async block will not be evaluated. Once task1's async block finishes, it
3931 // will get the same error value returned by task1's async block.
3932 let task2 = {
3933 let cache2 = cache.clone();
3934 async move {
3935 // Wait for 100 ms before calling `try_get_with`.
3936 sleep(Duration::from_millis(100)).await;
3937 let v: MyResult<_> = cache2.try_get_with(KEY, async { unreachable!() }).await;
3938 assert!(v.is_err());
3939 }
3940 };
3941
3942 // Task3 will be the third task to call `get_with` for the same key. By the
3943 // time it calls, task1's async block should have finished already, but the
3944 // key still does not exist in the cache. So its async block will be
3945 // evaluated and then an okay &str value will be returned. That value will be
3946 // inserted to the cache.
3947 let task3 = {
3948 let cache3 = cache.clone();
3949 async move {
3950 // Wait for 400 ms before calling `try_get_with`.
3951 sleep(Duration::from_millis(400)).await;
3952 let v: MyResult<_> = cache3
3953 .try_get_with(KEY, async {
3954 // Wait for 300 ms and return an Ok(&str) value.
3955 sleep(Duration::from_millis(300)).await;
3956 Ok("task3")
3957 })
3958 .await;
3959 assert_eq!(v.unwrap(), "task3");
3960 }
3961 };
3962
3963 // Task4 will be the fourth task to call `get_with` for the same key. So its
3964 // async block will not be evaluated. Once task3's async block finishes, it
3965 // will get the same okay &str value.
3966 let task4 = {
3967 let cache4 = cache.clone();
3968 async move {
3969 // Wait for 500 ms before calling `try_get_with`.
3970 sleep(Duration::from_millis(500)).await;
3971 let v: MyResult<_> = cache4.try_get_with(KEY, async { unreachable!() }).await;
3972 assert_eq!(v.unwrap(), "task3");
3973 }
3974 };
3975
3976 // Task5 will be the fifth task to call `get_with` for the same key. So its
3977 // async block will not be evaluated. By the time it calls, task3's async
3978 // block should have finished already, so its async block will not be
3979 // evaluated and will get the value insert by task3's async block
3980 // immediately.
3981 let task5 = {
3982 let cache5 = cache.clone();
3983 async move {
3984 // Wait for 800 ms before calling `try_get_with`.
3985 sleep(Duration::from_millis(800)).await;
3986 let v: MyResult<_> = cache5.try_get_with(KEY, async { unreachable!() }).await;
3987 assert_eq!(v.unwrap(), "task3");
3988 }
3989 };
3990
3991 // Task6 will call `get` for the same key. It will call when task1's async
3992 // block is still running, so it will get none for the key.
3993 let task6 = {
3994 let cache6 = cache.clone();
3995 async move {
3996 // Wait for 200 ms before calling `get`.
3997 sleep(Duration::from_millis(200)).await;
3998 let maybe_v = cache6.get(&KEY).await;
3999 assert!(maybe_v.is_none());
4000 }
4001 };
4002
4003 // Task7 will call `get` for the same key. It will call after task1's async
4004 // block finished with an error. So it will get none for the key.
4005 let task7 = {
4006 let cache7 = cache.clone();
4007 async move {
4008 // Wait for 400 ms before calling `get`.
4009 sleep(Duration::from_millis(400)).await;
4010 let maybe_v = cache7.get(&KEY).await;
4011 assert!(maybe_v.is_none());
4012 }
4013 };
4014
4015 // Task8 will call `get` for the same key. It will call after task3's async
4016 // block finished, so it will get the value insert by task3's async block.
4017 let task8 = {
4018 let cache8 = cache.clone();
4019 async move {
4020 // Wait for 800 ms before calling `get`.
4021 sleep(Duration::from_millis(800)).await;
4022 let maybe_v = cache8.get(&KEY).await;
4023 assert_eq!(maybe_v, Some("task3"));
4024 }
4025 };
4026
4027 futures_util::join!(task1, task2, task3, task4, task5, task6, task7, task8);
4028
4029 assert!(cache.is_waiter_map_empty());
4030 }
4031
4032 #[tokio::test]
4033 async fn try_get_with_by_ref() {
4034 use std::sync::Arc;
4035
4036 // Note that MyError does not implement std::error::Error trait
4037 // like anyhow::Error.
4038 #[derive(Debug)]
4039 pub struct MyError(#[allow(dead_code)] String);
4040
4041 type MyResult<T> = Result<T, Arc<MyError>>;
4042
4043 let cache = Cache::new(100);
4044 const KEY: &u32 = &0;
4045
4046 // This test will run eight async tasks:
4047 //
4048 // Task1 will be the first task to call `try_get_with_by_ref` for a key, so
4049 // its async block will be evaluated and then an error will be returned.
4050 // Nothing will be inserted to the cache.
4051 let task1 = {
4052 let cache1 = cache.clone();
4053 async move {
4054 // Call `try_get_with_by_ref` immediately.
4055 let v = cache1
4056 .try_get_with_by_ref(KEY, async {
4057 // Wait for 300 ms and return an error.
4058 sleep(Duration::from_millis(300)).await;
4059 Err(MyError("task1 error".into()))
4060 })
4061 .await;
4062 assert!(v.is_err());
4063 }
4064 };
4065
4066 // Task2 will be the second task to call `get_with` for the same key, so its
4067 // async block will not be evaluated. Once task1's async block finishes, it
4068 // will get the same error value returned by task1's async block.
4069 let task2 = {
4070 let cache2 = cache.clone();
4071 async move {
4072 // Wait for 100 ms before calling `try_get_with_by_ref`.
4073 sleep(Duration::from_millis(100)).await;
4074 let v: MyResult<_> = cache2
4075 .try_get_with_by_ref(KEY, async { unreachable!() })
4076 .await;
4077 assert!(v.is_err());
4078 }
4079 };
4080
4081 // Task3 will be the third task to call `get_with` for the same key. By the
4082 // time it calls, task1's async block should have finished already, but the
4083 // key still does not exist in the cache. So its async block will be
4084 // evaluated and then an okay &str value will be returned. That value will be
4085 // inserted to the cache.
4086 let task3 = {
4087 let cache3 = cache.clone();
4088 async move {
4089 // Wait for 400 ms before calling `try_get_with_by_ref`.
4090 sleep(Duration::from_millis(400)).await;
4091 let v: MyResult<_> = cache3
4092 .try_get_with_by_ref(KEY, async {
4093 // Wait for 300 ms and return an Ok(&str) value.
4094 sleep(Duration::from_millis(300)).await;
4095 Ok("task3")
4096 })
4097 .await;
4098 assert_eq!(v.unwrap(), "task3");
4099 }
4100 };
4101
4102 // Task4 will be the fourth task to call `get_with` for the same key. So its
4103 // async block will not be evaluated. Once task3's async block finishes, it
4104 // will get the same okay &str value.
4105 let task4 = {
4106 let cache4 = cache.clone();
4107 async move {
4108 // Wait for 500 ms before calling `try_get_with_by_ref`.
4109 sleep(Duration::from_millis(500)).await;
4110 let v: MyResult<_> = cache4
4111 .try_get_with_by_ref(KEY, async { unreachable!() })
4112 .await;
4113 assert_eq!(v.unwrap(), "task3");
4114 }
4115 };
4116
4117 // Task5 will be the fifth task to call `get_with` for the same key. So its
4118 // async block will not be evaluated. By the time it calls, task3's async
4119 // block should have finished already, so its async block will not be
4120 // evaluated and will get the value insert by task3's async block
4121 // immediately.
4122 let task5 = {
4123 let cache5 = cache.clone();
4124 async move {
4125 // Wait for 800 ms before calling `try_get_with_by_ref`.
4126 sleep(Duration::from_millis(800)).await;
4127 let v: MyResult<_> = cache5
4128 .try_get_with_by_ref(KEY, async { unreachable!() })
4129 .await;
4130 assert_eq!(v.unwrap(), "task3");
4131 }
4132 };
4133
4134 // Task6 will call `get` for the same key. It will call when task1's async
4135 // block is still running, so it will get none for the key.
4136 let task6 = {
4137 let cache6 = cache.clone();
4138 async move {
4139 // Wait for 200 ms before calling `get`.
4140 sleep(Duration::from_millis(200)).await;
4141 let maybe_v = cache6.get(KEY).await;
4142 assert!(maybe_v.is_none());
4143 }
4144 };
4145
4146 // Task7 will call `get` for the same key. It will call after task1's async
4147 // block finished with an error. So it will get none for the key.
4148 let task7 = {
4149 let cache7 = cache.clone();
4150 async move {
4151 // Wait for 400 ms before calling `get`.
4152 sleep(Duration::from_millis(400)).await;
4153 let maybe_v = cache7.get(KEY).await;
4154 assert!(maybe_v.is_none());
4155 }
4156 };
4157
4158 // Task8 will call `get` for the same key. It will call after task3's async
4159 // block finished, so it will get the value insert by task3's async block.
4160 let task8 = {
4161 let cache8 = cache.clone();
4162 async move {
4163 // Wait for 800 ms before calling `get`.
4164 sleep(Duration::from_millis(800)).await;
4165 let maybe_v = cache8.get(KEY).await;
4166 assert_eq!(maybe_v, Some("task3"));
4167 }
4168 };
4169
4170 futures_util::join!(task1, task2, task3, task4, task5, task6, task7, task8);
4171
4172 assert!(cache.is_waiter_map_empty());
4173 }
4174
4175 #[tokio::test]
4176 async fn optionally_get_with() {
4177 let cache = Cache::new(100);
4178 const KEY: u32 = 0;
4179
4180 // This test will run eight async tasks:
4181 //
4182 // Task1 will be the first task to call `optionally_get_with` for a key,
4183 // so its async block will be evaluated and then an None will be
4184 // returned. Nothing will be inserted to the cache.
4185 let task1 = {
4186 let cache1 = cache.clone();
4187 async move {
4188 // Call `try_get_with` immediately.
4189 let v = cache1
4190 .optionally_get_with(KEY, async {
4191 // Wait for 300 ms and return an None.
4192 sleep(Duration::from_millis(300)).await;
4193 None
4194 })
4195 .await;
4196 assert!(v.is_none());
4197 }
4198 };
4199
4200 // Task2 will be the second task to call `optionally_get_with` for the same
4201 // key, so its async block will not be evaluated. Once task1's async block
4202 // finishes, it will get the same error value returned by task1's async
4203 // block.
4204 let task2 = {
4205 let cache2 = cache.clone();
4206 async move {
4207 // Wait for 100 ms before calling `optionally_get_with`.
4208 sleep(Duration::from_millis(100)).await;
4209 let v = cache2
4210 .optionally_get_with(KEY, async { unreachable!() })
4211 .await;
4212 assert!(v.is_none());
4213 }
4214 };
4215
4216 // Task3 will be the third task to call `optionally_get_with` for the
4217 // same key. By the time it calls, task1's async block should have
4218 // finished already, but the key still does not exist in the cache. So
4219 // its async block will be evaluated and then an okay &str value will be
4220 // returned. That value will be inserted to the cache.
4221 let task3 = {
4222 let cache3 = cache.clone();
4223 async move {
4224 // Wait for 400 ms before calling `optionally_get_with`.
4225 sleep(Duration::from_millis(400)).await;
4226 let v = cache3
4227 .optionally_get_with(KEY, async {
4228 // Wait for 300 ms and return an Some(&str) value.
4229 sleep(Duration::from_millis(300)).await;
4230 Some("task3")
4231 })
4232 .await;
4233 assert_eq!(v.unwrap(), "task3");
4234 }
4235 };
4236
4237 // Task4 will be the fourth task to call `optionally_get_with` for the
4238 // same key. So its async block will not be evaluated. Once task3's
4239 // async block finishes, it will get the same okay &str value.
4240 let task4 = {
4241 let cache4 = cache.clone();
4242 async move {
4243 // Wait for 500 ms before calling `try_get_with`.
4244 sleep(Duration::from_millis(500)).await;
4245 let v = cache4
4246 .optionally_get_with(KEY, async { unreachable!() })
4247 .await;
4248 assert_eq!(v.unwrap(), "task3");
4249 }
4250 };
4251
4252 // Task5 will be the fifth task to call `optionally_get_with` for the
4253 // same key. So its async block will not be evaluated. By the time it
4254 // calls, task3's async block should have finished already, so its async
4255 // block will not be evaluated and will get the value insert by task3's
4256 // async block immediately.
4257 let task5 = {
4258 let cache5 = cache.clone();
4259 async move {
4260 // Wait for 800 ms before calling `optionally_get_with`.
4261 sleep(Duration::from_millis(800)).await;
4262 let v = cache5
4263 .optionally_get_with(KEY, async { unreachable!() })
4264 .await;
4265 assert_eq!(v.unwrap(), "task3");
4266 }
4267 };
4268
4269 // Task6 will call `get` for the same key. It will call when task1's async
4270 // block is still running, so it will get none for the key.
4271 let task6 = {
4272 let cache6 = cache.clone();
4273 async move {
4274 // Wait for 200 ms before calling `get`.
4275 sleep(Duration::from_millis(200)).await;
4276 let maybe_v = cache6.get(&KEY).await;
4277 assert!(maybe_v.is_none());
4278 }
4279 };
4280
4281 // Task7 will call `get` for the same key. It will call after task1's async
4282 // block finished with an error. So it will get none for the key.
4283 let task7 = {
4284 let cache7 = cache.clone();
4285 async move {
4286 // Wait for 400 ms before calling `get`.
4287 sleep(Duration::from_millis(400)).await;
4288 let maybe_v = cache7.get(&KEY).await;
4289 assert!(maybe_v.is_none());
4290 }
4291 };
4292
4293 // Task8 will call `get` for the same key. It will call after task3's async
4294 // block finished, so it will get the value insert by task3's async block.
4295 let task8 = {
4296 let cache8 = cache.clone();
4297 async move {
4298 // Wait for 800 ms before calling `get`.
4299 sleep(Duration::from_millis(800)).await;
4300 let maybe_v = cache8.get(&KEY).await;
4301 assert_eq!(maybe_v, Some("task3"));
4302 }
4303 };
4304
4305 futures_util::join!(task1, task2, task3, task4, task5, task6, task7, task8);
4306 }
4307
4308 #[tokio::test]
4309 async fn optionally_get_with_by_ref() {
4310 let cache = Cache::new(100);
4311 const KEY: &u32 = &0;
4312
4313 // This test will run eight async tasks:
4314 //
4315 // Task1 will be the first task to call `optionally_get_with_by_ref` for a
4316 // key, so its async block will be evaluated and then an None will be
4317 // returned. Nothing will be inserted to the cache.
4318 let task1 = {
4319 let cache1 = cache.clone();
4320 async move {
4321 // Call `try_get_with` immediately.
4322 let v = cache1
4323 .optionally_get_with_by_ref(KEY, async {
4324 // Wait for 300 ms and return an None.
4325 sleep(Duration::from_millis(300)).await;
4326 None
4327 })
4328 .await;
4329 assert!(v.is_none());
4330 }
4331 };
4332
4333 // Task2 will be the second task to call `optionally_get_with_by_ref` for the
4334 // same key, so its async block will not be evaluated. Once task1's async
4335 // block finishes, it will get the same error value returned by task1's async
4336 // block.
4337 let task2 = {
4338 let cache2 = cache.clone();
4339 async move {
4340 // Wait for 100 ms before calling `optionally_get_with_by_ref`.
4341 sleep(Duration::from_millis(100)).await;
4342 let v = cache2
4343 .optionally_get_with_by_ref(KEY, async { unreachable!() })
4344 .await;
4345 assert!(v.is_none());
4346 }
4347 };
4348
4349 // Task3 will be the third task to call `optionally_get_with_by_ref` for the
4350 // same key. By the time it calls, task1's async block should have
4351 // finished already, but the key still does not exist in the cache. So
4352 // its async block will be evaluated and then an okay &str value will be
4353 // returned. That value will be inserted to the cache.
4354 let task3 = {
4355 let cache3 = cache.clone();
4356 async move {
4357 // Wait for 400 ms before calling `optionally_get_with_by_ref`.
4358 sleep(Duration::from_millis(400)).await;
4359 let v = cache3
4360 .optionally_get_with_by_ref(KEY, async {
4361 // Wait for 300 ms and return an Some(&str) value.
4362 sleep(Duration::from_millis(300)).await;
4363 Some("task3")
4364 })
4365 .await;
4366 assert_eq!(v.unwrap(), "task3");
4367 }
4368 };
4369
4370 // Task4 will be the fourth task to call `optionally_get_with_by_ref` for the
4371 // same key. So its async block will not be evaluated. Once task3's
4372 // async block finishes, it will get the same okay &str value.
4373 let task4 = {
4374 let cache4 = cache.clone();
4375 async move {
4376 // Wait for 500 ms before calling `try_get_with`.
4377 sleep(Duration::from_millis(500)).await;
4378 let v = cache4
4379 .optionally_get_with_by_ref(KEY, async { unreachable!() })
4380 .await;
4381 assert_eq!(v.unwrap(), "task3");
4382 }
4383 };
4384
4385 // Task5 will be the fifth task to call `optionally_get_with_by_ref` for the
4386 // same key. So its async block will not be evaluated. By the time it
4387 // calls, task3's async block should have finished already, so its async
4388 // block will not be evaluated and will get the value insert by task3's
4389 // async block immediately.
4390 let task5 = {
4391 let cache5 = cache.clone();
4392 async move {
4393 // Wait for 800 ms before calling `optionally_get_with_by_ref`.
4394 sleep(Duration::from_millis(800)).await;
4395 let v = cache5
4396 .optionally_get_with_by_ref(KEY, async { unreachable!() })
4397 .await;
4398 assert_eq!(v.unwrap(), "task3");
4399 }
4400 };
4401
4402 // Task6 will call `get` for the same key. It will call when task1's async
4403 // block is still running, so it will get none for the key.
4404 let task6 = {
4405 let cache6 = cache.clone();
4406 async move {
4407 // Wait for 200 ms before calling `get`.
4408 sleep(Duration::from_millis(200)).await;
4409 let maybe_v = cache6.get(KEY).await;
4410 assert!(maybe_v.is_none());
4411 }
4412 };
4413
4414 // Task7 will call `get` for the same key. It will call after task1's async
4415 // block finished with an error. So it will get none for the key.
4416 let task7 = {
4417 let cache7 = cache.clone();
4418 async move {
4419 // Wait for 400 ms before calling `get`.
4420 sleep(Duration::from_millis(400)).await;
4421 let maybe_v = cache7.get(KEY).await;
4422 assert!(maybe_v.is_none());
4423 }
4424 };
4425
4426 // Task8 will call `get` for the same key. It will call after task3's async
4427 // block finished, so it will get the value insert by task3's async block.
4428 let task8 = {
4429 let cache8 = cache.clone();
4430 async move {
4431 // Wait for 800 ms before calling `get`.
4432 sleep(Duration::from_millis(800)).await;
4433 let maybe_v = cache8.get(KEY).await;
4434 assert_eq!(maybe_v, Some("task3"));
4435 }
4436 };
4437
4438 futures_util::join!(task1, task2, task3, task4, task5, task6, task7, task8);
4439
4440 assert!(cache.is_waiter_map_empty());
4441 }
4442
4443 #[tokio::test]
4444 async fn upsert_with() {
4445 let cache = Cache::new(100);
4446 const KEY: u32 = 0;
4447
4448 // Spawn three async tasks to call `and_upsert_with` for the same key and
4449 // each task increments the current value by 1. Ensure the key-level lock is
4450 // working by verifying the value is 3 after all tasks finish.
4451 //
4452 // | | task 1 | task 2 | task 3 |
4453 // |--------|----------|----------|----------|
4454 // | 0 ms | get none | | |
4455 // | 100 ms | | blocked | |
4456 // | 200 ms | insert 1 | | |
4457 // | | | get 1 | |
4458 // | 300 ms | | | blocked |
4459 // | 400 ms | | insert 2 | |
4460 // | | | | get 2 |
4461 // | 500 ms | | | insert 3 |
4462
4463 let task1 = {
4464 let cache1 = cache.clone();
4465 async move {
4466 cache1
4467 .entry(KEY)
4468 .and_upsert_with(|maybe_entry| async move {
4469 sleep(Duration::from_millis(200)).await;
4470 assert!(maybe_entry.is_none());
4471 1
4472 })
4473 .await
4474 }
4475 };
4476
4477 let task2 = {
4478 let cache2 = cache.clone();
4479 async move {
4480 sleep(Duration::from_millis(100)).await;
4481 cache2
4482 .entry_by_ref(&KEY)
4483 .and_upsert_with(|maybe_entry| async move {
4484 sleep(Duration::from_millis(200)).await;
4485 let entry = maybe_entry.expect("The entry should exist");
4486 entry.into_value() + 1
4487 })
4488 .await
4489 }
4490 };
4491
4492 let task3 = {
4493 let cache3 = cache.clone();
4494 async move {
4495 sleep(Duration::from_millis(300)).await;
4496 cache3
4497 .entry_by_ref(&KEY)
4498 .and_upsert_with(|maybe_entry| async move {
4499 sleep(Duration::from_millis(100)).await;
4500 let entry = maybe_entry.expect("The entry should exist");
4501 entry.into_value() + 1
4502 })
4503 .await
4504 }
4505 };
4506
4507 let (ent1, ent2, ent3) = futures_util::join!(task1, task2, task3);
4508 assert_eq!(ent1.into_value(), 1);
4509 assert_eq!(ent2.into_value(), 2);
4510 assert_eq!(ent3.into_value(), 3);
4511
4512 assert_eq!(cache.get(&KEY).await, Some(3));
4513
4514 assert!(cache.is_waiter_map_empty());
4515 }
4516
4517 #[tokio::test]
4518 async fn compute_with() {
4519 use crate::ops::compute;
4520 use tokio::sync::RwLock;
4521
4522 let cache = Cache::new(100);
4523 const KEY: u32 = 0;
4524
4525 // Spawn six async tasks to call `and_compute_with` for the same key. Ensure
4526 // the key-level lock is working by verifying the value after all tasks
4527 // finish.
4528 //
4529 // | | task 1 | task 2 | task 3 | task 4 | task 5 | task 6 |
4530 // |---------|------------|---------------|------------|----------|------------|---------|
4531 // | 0 ms | get none | | | | | |
4532 // | 100 ms | | blocked | | | | |
4533 // | 200 ms | insert [1] | | | | | |
4534 // | | | get [1] | | | | |
4535 // | 300 ms | | | blocked | | | |
4536 // | 400 ms | | insert [1, 2] | | | | |
4537 // | | | | get [1, 2] | | | |
4538 // | 500 ms | | | | blocked | | |
4539 // | 600 ms | | | remove | | | |
4540 // | | | | | get none | | |
4541 // | 700 ms | | | | | blocked | |
4542 // | 800 ms | | | | nop | | |
4543 // | | | | | | get none | |
4544 // | 900 ms | | | | | | blocked |
4545 // | 1000 ms | | | | | insert [5] | |
4546 // | | | | | | | get [5] |
4547 // | 1100 ms | | | | | | nop |
4548
4549 let task1 = {
4550 let cache1 = cache.clone();
4551 async move {
4552 cache1
4553 .entry(KEY)
4554 .and_compute_with(|maybe_entry| async move {
4555 sleep(Duration::from_millis(200)).await;
4556 assert!(maybe_entry.is_none());
4557 compute::Op::Put(Arc::new(RwLock::new(vec![1])))
4558 })
4559 .await
4560 }
4561 };
4562
4563 let task2 = {
4564 let cache2 = cache.clone();
4565 async move {
4566 sleep(Duration::from_millis(100)).await;
4567 cache2
4568 .entry_by_ref(&KEY)
4569 .and_compute_with(|maybe_entry| async move {
4570 let entry = maybe_entry.expect("The entry should exist");
4571 let value = entry.into_value();
4572 assert_eq!(*value.read().await, vec![1]);
4573 sleep(Duration::from_millis(200)).await;
4574 value.write().await.push(2);
4575 compute::Op::Put(value)
4576 })
4577 .await
4578 }
4579 };
4580
4581 let task3 = {
4582 let cache3 = cache.clone();
4583 async move {
4584 sleep(Duration::from_millis(300)).await;
4585 cache3
4586 .entry(KEY)
4587 .and_compute_with(|maybe_entry| async move {
4588 let entry = maybe_entry.expect("The entry should exist");
4589 let value = entry.into_value();
4590 assert_eq!(*value.read().await, vec![1, 2]);
4591 sleep(Duration::from_millis(200)).await;
4592 compute::Op::Remove
4593 })
4594 .await
4595 }
4596 };
4597
4598 let task4 = {
4599 let cache4 = cache.clone();
4600 async move {
4601 sleep(Duration::from_millis(500)).await;
4602 cache4
4603 .entry(KEY)
4604 .and_compute_with(|maybe_entry| async move {
4605 assert!(maybe_entry.is_none());
4606 sleep(Duration::from_millis(200)).await;
4607 compute::Op::Nop
4608 })
4609 .await
4610 }
4611 };
4612
4613 let task5 = {
4614 let cache5 = cache.clone();
4615 async move {
4616 sleep(Duration::from_millis(700)).await;
4617 cache5
4618 .entry_by_ref(&KEY)
4619 .and_compute_with(|maybe_entry| async move {
4620 assert!(maybe_entry.is_none());
4621 sleep(Duration::from_millis(200)).await;
4622 compute::Op::Put(Arc::new(RwLock::new(vec![5])))
4623 })
4624 .await
4625 }
4626 };
4627
4628 let task6 = {
4629 let cache6 = cache.clone();
4630 async move {
4631 sleep(Duration::from_millis(900)).await;
4632 cache6
4633 .entry_by_ref(&KEY)
4634 .and_compute_with(|maybe_entry| async move {
4635 let entry = maybe_entry.expect("The entry should exist");
4636 let value = entry.into_value();
4637 assert_eq!(*value.read().await, vec![5]);
4638 sleep(Duration::from_millis(100)).await;
4639 compute::Op::Nop
4640 })
4641 .await
4642 }
4643 };
4644
4645 let (res1, res2, res3, res4, res5, res6) =
4646 futures_util::join!(task1, task2, task3, task4, task5, task6);
4647
4648 let compute::CompResult::Inserted(entry) = res1 else {
4649 panic!("Expected `Inserted`. Got {res1:?}")
4650 };
4651 assert_eq!(
4652 *entry.into_value().read().await,
4653 vec![1, 2] // The same Vec was modified by task2.
4654 );
4655
4656 let compute::CompResult::ReplacedWith(entry) = res2 else {
4657 panic!("Expected `ReplacedWith`. Got {res2:?}")
4658 };
4659 assert_eq!(*entry.into_value().read().await, vec![1, 2]);
4660
4661 let compute::CompResult::Removed(entry) = res3 else {
4662 panic!("Expected `Removed`. Got {res3:?}")
4663 };
4664 assert_eq!(*entry.into_value().read().await, vec![1, 2]);
4665
4666 let compute::CompResult::StillNone(key) = res4 else {
4667 panic!("Expected `StillNone`. Got {res4:?}")
4668 };
4669 assert_eq!(*key, KEY);
4670
4671 let compute::CompResult::Inserted(entry) = res5 else {
4672 panic!("Expected `Inserted`. Got {res5:?}")
4673 };
4674 assert_eq!(*entry.into_value().read().await, vec![5]);
4675
4676 let compute::CompResult::Unchanged(entry) = res6 else {
4677 panic!("Expected `Unchanged`. Got {res6:?}")
4678 };
4679 assert_eq!(*entry.into_value().read().await, vec![5]);
4680
4681 assert!(cache.is_waiter_map_empty());
4682 }
4683
4684 #[tokio::test]
4685 async fn try_compute_with() {
4686 use crate::ops::compute;
4687 use tokio::sync::RwLock;
4688
4689 let cache: Cache<u32, Arc<RwLock<Vec<i32>>>> = Cache::new(100);
4690 const KEY: u32 = 0;
4691
4692 // Spawn four async tasks to call `and_try_compute_with` for the same key.
4693 // Ensure the key-level lock is working by verifying the value after all
4694 // tasks finish.
4695 //
4696 // | | task 1 | task 2 | task 3 | task 4 |
4697 // |---------|------------|---------------|------------|------------|
4698 // | 0 ms | get none | | | |
4699 // | 100 ms | | blocked | | |
4700 // | 200 ms | insert [1] | | | |
4701 // | | | get [1] | | |
4702 // | 300 ms | | | blocked | |
4703 // | 400 ms | | insert [1, 2] | | |
4704 // | | | | get [1, 2] | |
4705 // | 500 ms | | | | blocked |
4706 // | 600 ms | | | err | |
4707 // | | | | | get [1, 2] |
4708 // | 700 ms | | | | remove |
4709 //
4710 // This test is shorter than `compute_with` test because this one omits `Nop`
4711 // cases.
4712
4713 let task1 = {
4714 let cache1 = cache.clone();
4715 async move {
4716 cache1
4717 .entry(KEY)
4718 .and_try_compute_with(|maybe_entry| async move {
4719 sleep(Duration::from_millis(200)).await;
4720 assert!(maybe_entry.is_none());
4721 Ok(compute::Op::Put(Arc::new(RwLock::new(vec![1])))) as Result<_, ()>
4722 })
4723 .await
4724 }
4725 };
4726
4727 let task2 = {
4728 let cache2 = cache.clone();
4729 async move {
4730 sleep(Duration::from_millis(100)).await;
4731 cache2
4732 .entry_by_ref(&KEY)
4733 .and_try_compute_with(|maybe_entry| async move {
4734 let entry = maybe_entry.expect("The entry should exist");
4735 let value = entry.into_value();
4736 assert_eq!(*value.read().await, vec![1]);
4737 sleep(Duration::from_millis(200)).await;
4738 value.write().await.push(2);
4739 Ok(compute::Op::Put(value)) as Result<_, ()>
4740 })
4741 .await
4742 }
4743 };
4744
4745 let task3 = {
4746 let cache3 = cache.clone();
4747 async move {
4748 sleep(Duration::from_millis(300)).await;
4749 cache3
4750 .entry(KEY)
4751 .and_try_compute_with(|maybe_entry| async move {
4752 let entry = maybe_entry.expect("The entry should exist");
4753 let value = entry.into_value();
4754 assert_eq!(*value.read().await, vec![1, 2]);
4755 sleep(Duration::from_millis(200)).await;
4756 Err(())
4757 })
4758 .await
4759 }
4760 };
4761
4762 let task4 = {
4763 let cache4 = cache.clone();
4764 async move {
4765 sleep(Duration::from_millis(500)).await;
4766 cache4
4767 .entry(KEY)
4768 .and_try_compute_with(|maybe_entry| async move {
4769 let entry = maybe_entry.expect("The entry should exist");
4770 let value = entry.into_value();
4771 assert_eq!(*value.read().await, vec![1, 2]);
4772 sleep(Duration::from_millis(100)).await;
4773 Ok(compute::Op::Remove) as Result<_, ()>
4774 })
4775 .await
4776 }
4777 };
4778
4779 let (res1, res2, res3, res4) = futures_util::join!(task1, task2, task3, task4);
4780
4781 let Ok(compute::CompResult::Inserted(entry)) = res1 else {
4782 panic!("Expected `Inserted`. Got {res1:?}")
4783 };
4784 assert_eq!(
4785 *entry.into_value().read().await,
4786 vec![1, 2] // The same Vec was modified by task2.
4787 );
4788
4789 let Ok(compute::CompResult::ReplacedWith(entry)) = res2 else {
4790 panic!("Expected `ReplacedWith`. Got {res2:?}")
4791 };
4792 assert_eq!(*entry.into_value().read().await, vec![1, 2]);
4793
4794 assert!(res3.is_err());
4795
4796 let Ok(compute::CompResult::Removed(entry)) = res4 else {
4797 panic!("Expected `Removed`. Got {res4:?}")
4798 };
4799 assert_eq!(
4800 *entry.into_value().read().await,
4801 vec![1, 2] // Removed value.
4802 );
4803
4804 assert!(cache.is_waiter_map_empty());
4805 }
4806
4807 #[tokio::test]
4808 // https://github.com/moka-rs/moka/issues/43
4809 async fn handle_panic_in_get_with() {
4810 use tokio::time::{sleep, Duration};
4811
4812 let cache = Cache::new(16);
4813 let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
4814 {
4815 let cache_ref = cache.clone();
4816 let semaphore_ref = semaphore.clone();
4817 tokio::task::spawn(async move {
4818 let _ = cache_ref
4819 .get_with(1, async move {
4820 semaphore_ref.add_permits(1);
4821 sleep(Duration::from_millis(50)).await;
4822 panic!("Panic during try_get_with");
4823 })
4824 .await;
4825 });
4826 }
4827 let _ = semaphore.acquire().await.expect("semaphore acquire failed");
4828 assert_eq!(cache.get_with(1, async { 5 }).await, 5);
4829 }
4830
4831 #[tokio::test]
4832 // https://github.com/moka-rs/moka/issues/43
4833 async fn handle_panic_in_try_get_with() {
4834 use tokio::time::{sleep, Duration};
4835
4836 let cache = Cache::new(16);
4837 let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
4838 {
4839 let cache_ref = cache.clone();
4840 let semaphore_ref = semaphore.clone();
4841 tokio::task::spawn(async move {
4842 let _ = cache_ref
4843 .try_get_with(1, async move {
4844 semaphore_ref.add_permits(1);
4845 sleep(Duration::from_millis(50)).await;
4846 panic!("Panic during try_get_with");
4847 })
4848 .await as Result<_, Arc<Infallible>>;
4849 });
4850 }
4851 let _ = semaphore.acquire().await.expect("semaphore acquire failed");
4852 assert_eq!(
4853 cache.try_get_with(1, async { Ok(5) }).await as Result<_, Arc<Infallible>>,
4854 Ok(5)
4855 );
4856
4857 assert!(cache.is_waiter_map_empty());
4858 }
4859
4860 #[tokio::test]
4861 // https://github.com/moka-rs/moka/issues/59
4862 async fn abort_get_with() {
4863 use tokio::time::{sleep, Duration};
4864
4865 let cache = Cache::new(16);
4866 let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
4867
4868 let handle;
4869 {
4870 let cache_ref = cache.clone();
4871 let semaphore_ref = semaphore.clone();
4872
4873 handle = tokio::task::spawn(async move {
4874 let _ = cache_ref
4875 .get_with(1, async move {
4876 semaphore_ref.add_permits(1);
4877 sleep(Duration::from_millis(50)).await;
4878 unreachable!();
4879 })
4880 .await;
4881 });
4882 }
4883
4884 let _ = semaphore.acquire().await.expect("semaphore acquire failed");
4885 handle.abort();
4886
4887 assert_eq!(cache.get_with(1, async { 5 }).await, 5);
4888
4889 assert!(cache.is_waiter_map_empty());
4890 }
4891
4892 #[tokio::test]
4893 // https://github.com/moka-rs/moka/issues/59
4894 async fn abort_try_get_with() {
4895 use tokio::time::{sleep, Duration};
4896
4897 let cache = Cache::new(16);
4898 let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
4899
4900 let handle;
4901 {
4902 let cache_ref = cache.clone();
4903 let semaphore_ref = semaphore.clone();
4904
4905 handle = tokio::task::spawn(async move {
4906 let _ = cache_ref
4907 .try_get_with(1, async move {
4908 semaphore_ref.add_permits(1);
4909 sleep(Duration::from_millis(50)).await;
4910 unreachable!();
4911 })
4912 .await as Result<_, Arc<Infallible>>;
4913 });
4914 }
4915
4916 let _ = semaphore.acquire().await.expect("semaphore acquire failed");
4917 handle.abort();
4918
4919 assert_eq!(
4920 cache.try_get_with(1, async { Ok(5) }).await as Result<_, Arc<Infallible>>,
4921 Ok(5)
4922 );
4923
4924 assert!(cache.is_waiter_map_empty());
4925 }
4926
4927 #[tokio::test]
4928 async fn test_removal_notifications() {
4929 // The following `Vec`s will hold actual and expected notifications.
4930 let actual = Arc::new(Mutex::new(Vec::new()));
4931 let mut expected = Vec::new();
4932
4933 // Create an eviction listener.
4934 let a1 = Arc::clone(&actual);
4935 let listener = move |k, v, cause| -> ListenerFuture {
4936 let a2 = Arc::clone(&a1);
4937 async move {
4938 a2.lock().await.push((k, v, cause));
4939 }
4940 .boxed()
4941 };
4942
4943 // Create a cache with the eviction listener.
4944 let mut cache = Cache::builder()
4945 .max_capacity(3)
4946 .async_eviction_listener(listener)
4947 .build();
4948 cache.reconfigure_for_testing().await;
4949
4950 // Make the cache exterior immutable.
4951 let cache = cache;
4952
4953 cache.insert('a', "alice").await;
4954 cache.invalidate(&'a').await;
4955 expected.push((Arc::new('a'), "alice", RemovalCause::Explicit));
4956
4957 cache.run_pending_tasks().await;
4958 assert_eq!(cache.entry_count(), 0);
4959
4960 cache.insert('b', "bob").await;
4961 cache.insert('c', "cathy").await;
4962 cache.insert('d', "david").await;
4963 cache.run_pending_tasks().await;
4964 assert_eq!(cache.entry_count(), 3);
4965
4966 // This will be rejected due to the size constraint.
4967 cache.insert('e', "emily").await;
4968 expected.push((Arc::new('e'), "emily", RemovalCause::Size));
4969 cache.run_pending_tasks().await;
4970 assert_eq!(cache.entry_count(), 3);
4971
4972 // Raise the popularity of 'e' so it will be accepted next time.
4973 cache.get(&'e').await;
4974 cache.run_pending_tasks().await;
4975
4976 // Retry.
4977 cache.insert('e', "eliza").await;
4978 // and the LRU entry will be evicted.
4979 expected.push((Arc::new('b'), "bob", RemovalCause::Size));
4980 cache.run_pending_tasks().await;
4981 assert_eq!(cache.entry_count(), 3);
4982
4983 // Replace an existing entry.
4984 cache.insert('d', "dennis").await;
4985 expected.push((Arc::new('d'), "david", RemovalCause::Replaced));
4986 cache.run_pending_tasks().await;
4987 assert_eq!(cache.entry_count(), 3);
4988
4989 verify_notification_vec(&cache, actual, &expected).await;
4990 }
4991
4992 #[tokio::test]
4993 async fn test_removal_notifications_with_updates() {
4994 // The following `Vec`s will hold actual and expected notifications.
4995 let actual = Arc::new(Mutex::new(Vec::new()));
4996 let mut expected = Vec::new();
4997
4998 // Create an eviction listener.
4999 let a1 = Arc::clone(&actual);
5000 let listener = move |k, v, cause| -> ListenerFuture {
5001 let a2 = Arc::clone(&a1);
5002 async move {
5003 a2.lock().await.push((k, v, cause));
5004 }
5005 .boxed()
5006 };
5007
5008 let (clock, mock) = Clock::mock();
5009
5010 // Create a cache with the eviction listener and also TTL and TTI.
5011 let mut cache = Cache::builder()
5012 .async_eviction_listener(listener)
5013 .time_to_live(Duration::from_secs(7))
5014 .time_to_idle(Duration::from_secs(5))
5015 .clock(clock)
5016 .build();
5017 cache.reconfigure_for_testing().await;
5018
5019 // Make the cache exterior immutable.
5020 let cache = cache;
5021
5022 cache.insert("alice", "a0").await;
5023 cache.run_pending_tasks().await;
5024
5025 // Now alice (a0) has been expired by the idle timeout (TTI).
5026 mock.increment(Duration::from_secs(6));
5027 expected.push((Arc::new("alice"), "a0", RemovalCause::Expired));
5028 assert_eq!(cache.get(&"alice").await, None);
5029
5030 // We have not ran sync after the expiration of alice (a0), so it is
5031 // still in the cache.
5032 assert_eq!(cache.entry_count(), 1);
5033
5034 // Re-insert alice with a different value. Since alice (a0) is still
5035 // in the cache, this is actually a replace operation rather than an
5036 // insert operation. We want to verify that the RemovalCause of a0 is
5037 // Expired, not Replaced.
5038 cache.insert("alice", "a1").await;
5039 cache.run_pending_tasks().await;
5040
5041 mock.increment(Duration::from_secs(4));
5042 assert_eq!(cache.get(&"alice").await, Some("a1"));
5043 cache.run_pending_tasks().await;
5044
5045 // Now alice has been expired by time-to-live (TTL).
5046 mock.increment(Duration::from_secs(4));
5047 expected.push((Arc::new("alice"), "a1", RemovalCause::Expired));
5048 assert_eq!(cache.get(&"alice").await, None);
5049
5050 // But, again, it is still in the cache.
5051 assert_eq!(cache.entry_count(), 1);
5052
5053 // Re-insert alice with a different value and verify that the
5054 // RemovalCause of a1 is Expired (not Replaced).
5055 cache.insert("alice", "a2").await;
5056 cache.run_pending_tasks().await;
5057
5058 assert_eq!(cache.entry_count(), 1);
5059
5060 // Now alice (a2) has been expired by the idle timeout.
5061 mock.increment(Duration::from_secs(6));
5062 expected.push((Arc::new("alice"), "a2", RemovalCause::Expired));
5063 assert_eq!(cache.get(&"alice").await, None);
5064 assert_eq!(cache.entry_count(), 1);
5065
5066 // This invalidate will internally remove alice (a2).
5067 cache.invalidate(&"alice").await;
5068 cache.run_pending_tasks().await;
5069 assert_eq!(cache.entry_count(), 0);
5070
5071 // Re-insert, and this time, make it expired by the TTL.
5072 cache.insert("alice", "a3").await;
5073 cache.run_pending_tasks().await;
5074 mock.increment(Duration::from_secs(4));
5075 assert_eq!(cache.get(&"alice").await, Some("a3"));
5076 cache.run_pending_tasks().await;
5077 mock.increment(Duration::from_secs(4));
5078 expected.push((Arc::new("alice"), "a3", RemovalCause::Expired));
5079 assert_eq!(cache.get(&"alice").await, None);
5080 assert_eq!(cache.entry_count(), 1);
5081
5082 // This invalidate will internally remove alice (a2).
5083 cache.invalidate(&"alice").await;
5084 cache.run_pending_tasks().await;
5085 assert_eq!(cache.entry_count(), 0);
5086
5087 verify_notification_vec(&cache, actual, &expected).await;
5088 }
5089
5090 // When the eviction listener is not set, calling `run_pending_tasks` once should
5091 // evict all entries that can be removed.
5092 #[tokio::test]
5093 async fn no_batch_size_limit_on_eviction() {
5094 const MAX_CAPACITY: u64 = 20;
5095
5096 const EVICTION_TIMEOUT: Duration = Duration::from_nanos(0);
5097 const MAX_LOG_SYNC_REPEATS: u32 = 1;
5098 const EVICTION_BATCH_SIZE: u32 = 1;
5099
5100 let hk_conf = HousekeeperConfig::new(
5101 // Timeout should be ignored when the eviction listener is not provided.
5102 Some(EVICTION_TIMEOUT),
5103 Some(MAX_LOG_SYNC_REPEATS),
5104 Some(EVICTION_BATCH_SIZE),
5105 );
5106
5107 // Create a cache with the LRU policy.
5108 let mut cache = Cache::builder()
5109 .max_capacity(MAX_CAPACITY)
5110 .eviction_policy(EvictionPolicy::lru())
5111 .housekeeper_config(hk_conf)
5112 .build();
5113 cache.reconfigure_for_testing().await;
5114
5115 // Make the cache exterior immutable.
5116 let cache = cache;
5117
5118 // Fill the cache.
5119 for i in 0..MAX_CAPACITY {
5120 let v = format!("v{i}");
5121 cache.insert(i, v).await
5122 }
5123 // The max capacity should not change because we have not called
5124 // `run_pending_tasks` yet.
5125 assert_eq!(cache.entry_count(), 0);
5126
5127 cache.run_pending_tasks().await;
5128 assert_eq!(cache.entry_count(), MAX_CAPACITY);
5129
5130 // Insert more items to the cache.
5131 for i in MAX_CAPACITY..(MAX_CAPACITY * 2) {
5132 let v = format!("v{i}");
5133 cache.insert(i, v).await
5134 }
5135 // The max capacity should not change because we have not called
5136 // `run_pending_tasks` yet.
5137 assert_eq!(cache.entry_count(), MAX_CAPACITY);
5138 // Both old and new keys should exist.
5139 assert!(cache.contains_key(&0)); // old
5140 assert!(cache.contains_key(&(MAX_CAPACITY - 1))); // old
5141 assert!(cache.contains_key(&(MAX_CAPACITY * 2 - 1))); // new
5142
5143 // Process the remaining write op logs (there should be MAX_CAPACITY logs),
5144 // and evict the LRU entries.
5145 cache.run_pending_tasks().await;
5146 assert_eq!(cache.entry_count(), MAX_CAPACITY);
5147
5148 // Now all the old keys should be gone.
5149 assert!(!cache.contains_key(&0));
5150 assert!(!cache.contains_key(&(MAX_CAPACITY - 1)));
5151 // And the new keys should exist.
5152 assert!(cache.contains_key(&(MAX_CAPACITY * 2 - 1)));
5153 }
5154
5155 #[tokio::test]
5156 async fn slow_eviction_listener() {
5157 const MAX_CAPACITY: u64 = 20;
5158
5159 const EVICTION_TIMEOUT: Duration = Duration::from_millis(30);
5160 const LISTENER_DELAY: Duration = Duration::from_millis(11);
5161 const MAX_LOG_SYNC_REPEATS: u32 = 1;
5162 const EVICTION_BATCH_SIZE: u32 = 1;
5163
5164 let hk_conf = HousekeeperConfig::new(
5165 Some(EVICTION_TIMEOUT),
5166 Some(MAX_LOG_SYNC_REPEATS),
5167 Some(EVICTION_BATCH_SIZE),
5168 );
5169
5170 let (clock, mock) = Clock::mock();
5171 let listener_call_count = Arc::new(AtomicU8::new(0));
5172 let lcc = Arc::clone(&listener_call_count);
5173
5174 // A slow eviction listener that spend `LISTENER_DELAY` to process a removal
5175 // notification.
5176 let listener = move |_k, _v, _cause| {
5177 mock.increment(LISTENER_DELAY);
5178 lcc.fetch_add(1, Ordering::AcqRel);
5179 };
5180
5181 // Create a cache with the LRU policy.
5182 let mut cache = Cache::builder()
5183 .max_capacity(MAX_CAPACITY)
5184 .eviction_policy(EvictionPolicy::lru())
5185 .eviction_listener(listener)
5186 .housekeeper_config(hk_conf)
5187 .clock(clock)
5188 .build();
5189 cache.reconfigure_for_testing().await;
5190
5191 // Make the cache exterior immutable.
5192 let cache = cache;
5193
5194 // Fill the cache.
5195 for i in 0..MAX_CAPACITY {
5196 let v = format!("v{i}");
5197 cache.insert(i, v).await
5198 }
5199 // The max capacity should not change because we have not called
5200 // `run_pending_tasks` yet.
5201 assert_eq!(cache.entry_count(), 0);
5202
5203 cache.run_pending_tasks().await;
5204 assert_eq!(listener_call_count.load(Ordering::Acquire), 0);
5205 assert_eq!(cache.entry_count(), MAX_CAPACITY);
5206
5207 // Insert more items to the cache.
5208 for i in MAX_CAPACITY..(MAX_CAPACITY * 2) {
5209 let v = format!("v{i}");
5210 cache.insert(i, v).await
5211 }
5212 assert_eq!(cache.entry_count(), MAX_CAPACITY);
5213
5214 cache.run_pending_tasks().await;
5215 // Because of the slow listener, cache should get an over capacity.
5216 let mut expected_call_count = 3;
5217 assert_eq!(
5218 listener_call_count.load(Ordering::Acquire) as u64,
5219 expected_call_count
5220 );
5221 assert_eq!(cache.entry_count(), MAX_CAPACITY * 2 - expected_call_count);
5222
5223 loop {
5224 cache.run_pending_tasks().await;
5225
5226 expected_call_count += 3;
5227 if expected_call_count > MAX_CAPACITY {
5228 expected_call_count = MAX_CAPACITY;
5229 }
5230
5231 let actual_count = listener_call_count.load(Ordering::Acquire) as u64;
5232 assert_eq!(actual_count, expected_call_count);
5233 let expected_entry_count = MAX_CAPACITY * 2 - expected_call_count;
5234 assert_eq!(cache.entry_count(), expected_entry_count);
5235
5236 if expected_call_count >= MAX_CAPACITY {
5237 break;
5238 }
5239 }
5240
5241 assert_eq!(cache.entry_count(), MAX_CAPACITY);
5242 }
5243
5244 // NOTE: To enable the panic logging, run the following command:
5245 //
5246 // RUST_LOG=moka=info cargo test --features 'future, logging' -- \
5247 // future::cache::tests::recover_from_panicking_eviction_listener --exact --nocapture
5248 //
5249 #[tokio::test]
5250 async fn recover_from_panicking_eviction_listener() {
5251 #[cfg(feature = "logging")]
5252 let _ = env_logger::builder().is_test(true).try_init();
5253
5254 // The following `Vec`s will hold actual and expected notifications.
5255 let actual = Arc::new(Mutex::new(Vec::new()));
5256 let mut expected = Vec::new();
5257
5258 // Create an eviction listener that panics when it see
5259 // a value "panic now!".
5260 let a1 = Arc::clone(&actual);
5261 let listener = move |k, v, cause| -> ListenerFuture {
5262 let a2 = Arc::clone(&a1);
5263 async move {
5264 if v == "panic now!" {
5265 panic!("Panic now!");
5266 }
5267 a2.lock().await.push((k, v, cause));
5268 }
5269 .boxed()
5270 };
5271
5272 // Create a cache with the eviction listener.
5273 let mut cache = Cache::builder()
5274 .name("My Future Cache")
5275 .async_eviction_listener(listener)
5276 .build();
5277 cache.reconfigure_for_testing().await;
5278
5279 // Make the cache exterior immutable.
5280 let cache = cache;
5281
5282 // Insert an okay value.
5283 cache.insert("alice", "a0").await;
5284 cache.run_pending_tasks().await;
5285
5286 // Insert a value that will cause the eviction listener to panic.
5287 cache.insert("alice", "panic now!").await;
5288 expected.push((Arc::new("alice"), "a0", RemovalCause::Replaced));
5289 cache.run_pending_tasks().await;
5290
5291 // Insert an okay value. This will replace the previous
5292 // value "panic now!" so the eviction listener will panic.
5293 cache.insert("alice", "a2").await;
5294 cache.run_pending_tasks().await;
5295 // No more removal notification should be sent.
5296
5297 // Invalidate the okay value.
5298 cache.invalidate(&"alice").await;
5299 cache.run_pending_tasks().await;
5300
5301 verify_notification_vec(&cache, actual, &expected).await;
5302 }
5303
5304 #[tokio::test]
5305 async fn cancel_future_while_running_pending_tasks() {
5306 use crate::future::FutureExt;
5307 use futures_util::future::poll_immediate;
5308 use tokio::task::yield_now;
5309
5310 let listener_initiation_count: Arc<AtomicU32> = Default::default();
5311 let listener_completion_count: Arc<AtomicU32> = Default::default();
5312
5313 let listener = {
5314 // Variables to capture.
5315 let init_count = Arc::clone(&listener_initiation_count);
5316 let comp_count = Arc::clone(&listener_completion_count);
5317
5318 // Our eviction listener closure.
5319 move |_k, _v, _r| {
5320 init_count.fetch_add(1, Ordering::AcqRel);
5321 let comp_count1 = Arc::clone(&comp_count);
5322
5323 async move {
5324 yield_now().await;
5325 comp_count1.fetch_add(1, Ordering::AcqRel);
5326 }
5327 .boxed()
5328 }
5329 };
5330
5331 let (clock, mock) = Clock::mock();
5332
5333 let mut cache: Cache<u32, u32> = Cache::builder()
5334 .time_to_live(Duration::from_millis(10))
5335 .async_eviction_listener(listener)
5336 .clock(clock)
5337 .build();
5338
5339 cache.reconfigure_for_testing().await;
5340
5341 // Make the cache exterior immutable.
5342 let cache = cache;
5343
5344 cache.insert(1, 1).await;
5345 assert_eq!(cache.run_pending_tasks_initiation_count(), 0);
5346 assert_eq!(cache.run_pending_tasks_completion_count(), 0);
5347
5348 // Key 1 is not yet expired.
5349 mock.increment(Duration::from_millis(7));
5350
5351 cache.run_pending_tasks().await;
5352 assert_eq!(cache.run_pending_tasks_initiation_count(), 1);
5353 assert_eq!(cache.run_pending_tasks_completion_count(), 1);
5354 assert_eq!(listener_initiation_count.load(Ordering::Acquire), 0);
5355 assert_eq!(listener_completion_count.load(Ordering::Acquire), 0);
5356
5357 // Now key 1 is expired, so the eviction listener should be called when we
5358 // call run_pending_tasks() and poll the returned future.
5359 mock.increment(Duration::from_millis(7));
5360
5361 let fut = cache.run_pending_tasks();
5362 // Poll the fut only once, and drop it. The fut should not be completed (so
5363 // it is cancelled) because the eviction listener performed a yield_now().
5364 assert!(poll_immediate(fut).await.is_none());
5365
5366 // The task is initiated but not completed.
5367 assert_eq!(cache.run_pending_tasks_initiation_count(), 2);
5368 assert_eq!(cache.run_pending_tasks_completion_count(), 1);
5369 // The listener is initiated but not completed.
5370 assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
5371 assert_eq!(listener_completion_count.load(Ordering::Acquire), 0);
5372
5373 // This will resume the task and the listener, and continue polling
5374 // until complete.
5375 cache.run_pending_tasks().await;
5376 // Now the task is completed.
5377 assert_eq!(cache.run_pending_tasks_initiation_count(), 2);
5378 assert_eq!(cache.run_pending_tasks_completion_count(), 2);
5379 // Now the listener is completed.
5380 assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
5381 assert_eq!(listener_completion_count.load(Ordering::Acquire), 1);
5382 }
5383
5384 #[tokio::test]
5385 async fn cancel_future_while_calling_eviction_listener() {
5386 use crate::future::FutureExt;
5387 use futures_util::future::poll_immediate;
5388 use tokio::task::yield_now;
5389
5390 let listener_initiation_count: Arc<AtomicU32> = Default::default();
5391 let listener_completion_count: Arc<AtomicU32> = Default::default();
5392
5393 let listener = {
5394 // Variables to capture.
5395 let init_count = Arc::clone(&listener_initiation_count);
5396 let comp_count = Arc::clone(&listener_completion_count);
5397
5398 // Our eviction listener closure.
5399 move |_k, _v, _r| {
5400 init_count.fetch_add(1, Ordering::AcqRel);
5401 let comp_count1 = Arc::clone(&comp_count);
5402
5403 async move {
5404 yield_now().await;
5405 comp_count1.fetch_add(1, Ordering::AcqRel);
5406 }
5407 .boxed()
5408 }
5409 };
5410
5411 let mut cache: Cache<u32, u32> = Cache::builder()
5412 .time_to_live(Duration::from_millis(10))
5413 .async_eviction_listener(listener)
5414 .build();
5415
5416 cache.reconfigure_for_testing().await;
5417
5418 // Make the cache exterior immutable.
5419 let cache = cache;
5420
5421 // ------------------------------------------------------------
5422 // Interrupt the eviction listener while calling `insert`
5423 // ------------------------------------------------------------
5424
5425 cache.insert(1, 1).await;
5426
5427 let fut = cache.insert(1, 2);
5428 // Poll the fut only once, and drop it. The fut should not be completed (so
5429 // it is cancelled) because the eviction listener performed a yield_now().
5430 assert!(poll_immediate(fut).await.is_none());
5431
5432 // The listener is initiated but not completed.
5433 assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
5434 assert_eq!(listener_completion_count.load(Ordering::Acquire), 0);
5435
5436 // This will call retry_interrupted_ops() and resume the interrupted
5437 // listener.
5438 cache.run_pending_tasks().await;
5439 assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
5440 assert_eq!(listener_completion_count.load(Ordering::Acquire), 1);
5441
5442 // ------------------------------------------------------------
5443 // Interrupt the eviction listener while calling `invalidate`
5444 // ------------------------------------------------------------
5445
5446 let fut = cache.invalidate(&1);
5447 // Cancel the fut after one poll.
5448 assert!(poll_immediate(fut).await.is_none());
5449 // The listener is initiated but not completed.
5450 assert_eq!(listener_initiation_count.load(Ordering::Acquire), 2);
5451 assert_eq!(listener_completion_count.load(Ordering::Acquire), 1);
5452
5453 // This will call retry_interrupted_ops() and resume the interrupted
5454 // listener.
5455 cache.get(&99).await;
5456 assert_eq!(listener_initiation_count.load(Ordering::Acquire), 2);
5457 assert_eq!(listener_completion_count.load(Ordering::Acquire), 2);
5458
5459 // ------------------------------------------------------------
5460 // Ensure retry_interrupted_ops() is called
5461 // ------------------------------------------------------------
5462
5463 // Repeat the same test with `insert`, but this time, call different methods
5464 // to ensure retry_interrupted_ops() is called.
5465 let prepare = || async {
5466 cache.invalidate(&1).await;
5467
5468 // Reset the counters.
5469 listener_initiation_count.store(0, Ordering::Release);
5470 listener_completion_count.store(0, Ordering::Release);
5471
5472 cache.insert(1, 1).await;
5473
5474 let fut = cache.insert(1, 2);
5475 // Poll the fut only once, and drop it. The fut should not be completed (so
5476 // it is cancelled) because the eviction listener performed a yield_now().
5477 assert!(poll_immediate(fut).await.is_none());
5478
5479 // The listener is initiated but not completed.
5480 assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
5481 assert_eq!(listener_completion_count.load(Ordering::Acquire), 0);
5482 };
5483
5484 // Methods to test:
5485 //
5486 // - run_pending_tasks (Already tested in a previous test)
5487 // - get (Already tested in a previous test)
5488 // - insert
5489 // - invalidate
5490 // - remove
5491
5492 // insert
5493 prepare().await;
5494 cache.insert(99, 99).await;
5495 assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
5496 assert_eq!(listener_completion_count.load(Ordering::Acquire), 1);
5497
5498 // invalidate
5499 prepare().await;
5500 cache.invalidate(&88).await;
5501 assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
5502 assert_eq!(listener_completion_count.load(Ordering::Acquire), 1);
5503
5504 // remove
5505 prepare().await;
5506 cache.remove(&77).await;
5507 assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
5508 assert_eq!(listener_completion_count.load(Ordering::Acquire), 1);
5509 }
5510
5511 #[tokio::test]
5512 async fn cancel_future_while_scheduling_write_op() {
5513 use futures_util::future::poll_immediate;
5514
5515 let mut cache: Cache<u32, u32> = Cache::builder().build();
5516 cache.reconfigure_for_testing().await;
5517
5518 // Make the cache exterior immutable.
5519 let cache = cache;
5520
5521 // --------------------------------------------------------------
5522 // Interrupt `insert` while blocking in `schedule_write_op`
5523 // --------------------------------------------------------------
5524
5525 cache
5526 .schedule_write_op_should_block
5527 .store(true, Ordering::Release);
5528 let fut = cache.insert(1, 1);
5529 // Poll the fut only once, and drop it. The fut should not be completed (so
5530 // it is cancelled) because schedule_write_op should be awaiting for a lock.
5531 assert!(poll_immediate(fut).await.is_none());
5532
5533 assert_eq!(cache.base.interrupted_op_ch_snd.len(), 1);
5534 assert_eq!(cache.base.write_op_ch.len(), 0);
5535
5536 // This should retry the interrupted operation.
5537 cache
5538 .schedule_write_op_should_block
5539 .store(false, Ordering::Release);
5540 cache.get(&99).await;
5541 assert_eq!(cache.base.interrupted_op_ch_snd.len(), 0);
5542 assert_eq!(cache.base.write_op_ch.len(), 1);
5543
5544 cache.run_pending_tasks().await;
5545 assert_eq!(cache.base.write_op_ch.len(), 0);
5546
5547 // --------------------------------------------------------------
5548 // Interrupt `invalidate` while blocking in `schedule_write_op`
5549 // --------------------------------------------------------------
5550
5551 cache
5552 .schedule_write_op_should_block
5553 .store(true, Ordering::Release);
5554 let fut = cache.invalidate(&1);
5555 // Poll the fut only once, and drop it. The fut should not be completed (so
5556 // it is cancelled) because schedule_write_op should be awaiting for a lock.
5557 assert!(poll_immediate(fut).await.is_none());
5558
5559 assert_eq!(cache.base.interrupted_op_ch_snd.len(), 1);
5560 assert_eq!(cache.base.write_op_ch.len(), 0);
5561
5562 // This should retry the interrupted operation.
5563 cache
5564 .schedule_write_op_should_block
5565 .store(false, Ordering::Release);
5566 cache.get(&99).await;
5567 assert_eq!(cache.base.interrupted_op_ch_snd.len(), 0);
5568 assert_eq!(cache.base.write_op_ch.len(), 1);
5569
5570 cache.run_pending_tasks().await;
5571 assert_eq!(cache.base.write_op_ch.len(), 0);
5572 }
5573
5574 // This test ensures that the `contains_key`, `get` and `invalidate` can use
5575 // borrowed form `&[u8]` for key with type `Vec<u8>`.
5576 // https://github.com/moka-rs/moka/issues/166
5577 #[tokio::test]
5578 async fn borrowed_forms_of_key() {
5579 let cache: Cache<Vec<u8>, ()> = Cache::new(1);
5580
5581 let key = vec![1_u8];
5582 cache.insert(key.clone(), ()).await;
5583
5584 // key as &Vec<u8>
5585 let key_v: &Vec<u8> = &key;
5586 assert!(cache.contains_key(key_v));
5587 assert_eq!(cache.get(key_v).await, Some(()));
5588 cache.invalidate(key_v).await;
5589
5590 cache.insert(key, ()).await;
5591
5592 // key as &[u8]
5593 let key_s: &[u8] = &[1_u8];
5594 assert!(cache.contains_key(key_s));
5595 assert_eq!(cache.get(key_s).await, Some(()));
5596 cache.invalidate(key_s).await;
5597 }
5598
5599 #[tokio::test]
5600 async fn drop_value_immediately_after_eviction() {
5601 use crate::common::test_utils::{Counters, Value};
5602
5603 const MAX_CAPACITY: u32 = 500;
5604 const KEYS: u32 = ((MAX_CAPACITY as f64) * 1.2) as u32;
5605
5606 let counters = Arc::new(Counters::default());
5607 let counters1 = Arc::clone(&counters);
5608
5609 let listener = move |_k, _v, cause| match cause {
5610 RemovalCause::Size => counters1.incl_evicted(),
5611 RemovalCause::Explicit => counters1.incl_invalidated(),
5612 _ => (),
5613 };
5614
5615 let mut cache = Cache::builder()
5616 .max_capacity(MAX_CAPACITY as u64)
5617 .eviction_listener(listener)
5618 .build();
5619 cache.reconfigure_for_testing().await;
5620
5621 // Make the cache exterior immutable.
5622 let cache = cache;
5623
5624 for key in 0..KEYS {
5625 let value = Arc::new(Value::new(vec![0u8; 1024], &counters));
5626 cache.insert(key, value).await;
5627 counters.incl_inserted();
5628 cache.run_pending_tasks().await;
5629 }
5630
5631 let eviction_count = KEYS - MAX_CAPACITY;
5632
5633 // Retries will be needed when testing in a QEMU VM.
5634 const MAX_RETRIES: usize = 5;
5635 let mut retries = 0;
5636 loop {
5637 // Ensure all scheduled notifications have been processed.
5638 std::thread::sleep(Duration::from_millis(500));
5639
5640 if counters.evicted() != eviction_count || counters.value_dropped() != eviction_count {
5641 if retries <= MAX_RETRIES {
5642 retries += 1;
5643 cache.run_pending_tasks().await;
5644 continue;
5645 } else {
5646 assert_eq!(counters.evicted(), eviction_count, "Retries exhausted");
5647 assert_eq!(
5648 counters.value_dropped(),
5649 eviction_count,
5650 "Retries exhausted"
5651 );
5652 }
5653 }
5654
5655 assert_eq!(counters.inserted(), KEYS, "inserted");
5656 assert_eq!(counters.value_created(), KEYS, "value_created");
5657 assert_eq!(counters.evicted(), eviction_count, "evicted");
5658 assert_eq!(counters.invalidated(), 0, "invalidated");
5659 assert_eq!(counters.value_dropped(), eviction_count, "value_dropped");
5660
5661 break;
5662 }
5663
5664 for key in 0..KEYS {
5665 cache.invalidate(&key).await;
5666 cache.run_pending_tasks().await;
5667 }
5668
5669 let mut retries = 0;
5670 loop {
5671 // Ensure all scheduled notifications have been processed.
5672 std::thread::sleep(Duration::from_millis(500));
5673
5674 if counters.invalidated() != MAX_CAPACITY || counters.value_dropped() != KEYS {
5675 if retries <= MAX_RETRIES {
5676 retries += 1;
5677 cache.run_pending_tasks().await;
5678 continue;
5679 } else {
5680 assert_eq!(counters.invalidated(), MAX_CAPACITY, "Retries exhausted");
5681 assert_eq!(counters.value_dropped(), KEYS, "Retries exhausted");
5682 }
5683 }
5684
5685 assert_eq!(counters.inserted(), KEYS, "inserted");
5686 assert_eq!(counters.value_created(), KEYS, "value_created");
5687 assert_eq!(counters.evicted(), eviction_count, "evicted");
5688 assert_eq!(counters.invalidated(), MAX_CAPACITY, "invalidated");
5689 assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
5690
5691 break;
5692 }
5693
5694 std::mem::drop(cache);
5695 assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
5696 }
5697
5698 // https://github.com/moka-rs/moka/issues/383
5699 #[tokio::test]
5700 async fn ensure_gc_runs_when_dropping_cache() {
5701 let cache = Cache::builder().build();
5702 let val = Arc::new(0);
5703 cache
5704 .get_with(1, std::future::ready(Arc::clone(&val)))
5705 .await;
5706 drop(cache);
5707 assert_eq!(Arc::strong_count(&val), 1);
5708 }
5709
5710 #[tokio::test]
5711 async fn test_debug_format() {
5712 let cache = Cache::new(10);
5713 cache.insert('a', "alice").await;
5714 cache.insert('b', "bob").await;
5715 cache.insert('c', "cindy").await;
5716
5717 let debug_str = format!("{cache:?}");
5718 assert!(debug_str.starts_with('{'));
5719 assert!(debug_str.contains(r#"'a': "alice""#));
5720 assert!(debug_str.contains(r#"'b': "bob""#));
5721 assert!(debug_str.contains(r#"'c': "cindy""#));
5722 assert!(debug_str.ends_with('}'));
5723 }
5724
5725 type NotificationTuple<K, V> = (Arc<K>, V, RemovalCause);
5726
5727 async fn verify_notification_vec<K, V, S>(
5728 cache: &Cache<K, V, S>,
5729 actual: Arc<Mutex<Vec<NotificationTuple<K, V>>>>,
5730 expected: &[NotificationTuple<K, V>],
5731 ) where
5732 K: std::hash::Hash + Eq + std::fmt::Debug + Send + Sync + 'static,
5733 V: Eq + std::fmt::Debug + Clone + Send + Sync + 'static,
5734 S: std::hash::BuildHasher + Clone + Send + Sync + 'static,
5735 {
5736 // Retries will be needed when testing in a QEMU VM.
5737 const MAX_RETRIES: usize = 5;
5738 let mut retries = 0;
5739 loop {
5740 // Ensure all scheduled notifications have been processed.
5741 std::thread::sleep(Duration::from_millis(500));
5742
5743 let actual = &*actual.lock().await;
5744 if actual.len() != expected.len() {
5745 if retries <= MAX_RETRIES {
5746 retries += 1;
5747 cache.run_pending_tasks().await;
5748 continue;
5749 } else {
5750 assert_eq!(actual.len(), expected.len(), "Retries exhausted");
5751 }
5752 }
5753
5754 for (i, (actual, expected)) in actual.iter().zip(expected).enumerate() {
5755 assert_eq!(actual, expected, "expected[{i}]");
5756 }
5757
5758 break;
5759 }
5760 }
5761}