moka/future/
cache.rs

1use equivalent::Equivalent;
2
3use super::{
4    base_cache::BaseCache,
5    value_initializer::{InitResult, ValueInitializer},
6    CacheBuilder, CancelGuard, Iter, OwnedKeyEntrySelector, PredicateId, RefKeyEntrySelector,
7    WriteOp,
8};
9use crate::{
10    common::{concurrent::Weigher, time::Clock, HousekeeperConfig},
11    notification::AsyncEvictionListener,
12    ops::compute::{self, CompResult},
13    policy::{EvictionPolicy, ExpirationPolicy},
14    Entry, Policy, PredicateError,
15};
16
17#[cfg(feature = "unstable-debug-counters")]
18use crate::common::concurrent::debug_counters::CacheDebugStats;
19
20use std::{
21    collections::hash_map::RandomState,
22    fmt,
23    future::Future,
24    hash::{BuildHasher, Hash},
25    pin::Pin,
26    sync::Arc,
27};
28
29#[cfg(test)]
30use std::sync::atomic::{AtomicBool, Ordering};
31
32/// A thread-safe, futures-aware concurrent in-memory cache.
33///
34/// `Cache` supports full concurrency of retrievals and a high expected concurrency
35/// for updates. It utilizes a lock-free concurrent hash table as the central
36/// key-value storage. It performs a best-effort bounding of the map using an entry
37/// replacement algorithm to determine which entries to evict when the capacity is
38/// exceeded.
39///
40/// To use this cache, enable a crate feature called "future".
41///
42/// # Table of Contents
43///
44/// - [Example: `insert`, `get` and `invalidate`](#example-insert-get-and-invalidate)
45/// - [Avoiding to clone the value at `get`](#avoiding-to-clone-the-value-at-get)
46/// - [Sharing a cache across asynchronous tasks](#sharing-a-cache-across-asynchronous-tasks)
47///     - [No lock is needed](#no-lock-is-needed)
48/// - [Hashing Algorithm](#hashing-algorithm)
49/// - [Example: Size-based Eviction](#example-size-based-eviction)
50/// - [Example: Time-based Expirations](#example-time-based-expirations)
51///     - [Cache-level TTL and TTI policies](#cache-level-ttl-and-tti-policies)
52///     - [Per-entry expiration policy](#per-entry-expiration-policy)
53/// - [Example: Eviction Listener](#example-eviction-listener)
54///     - [You should avoid eviction listener to panic](#you-should-avoid-eviction-listener-to-panic)
55///
56/// # Example: `insert`, `get` and `invalidate`
57///
58/// Cache entries are manually added using [`insert`](#method.insert) of
59/// [`get_with`](#method.get_with) method, and are stored in the cache until either
60/// evicted or manually invalidated:
61///
62/// Here's an example of reading and updating a cache by using multiple asynchronous
63/// tasks with [Tokio][tokio-crate] runtime:
64///
65/// [tokio-crate]: https://crates.io/crates/tokio
66///
67///```rust
68/// // Cargo.toml
69/// //
70/// // [dependencies]
71/// // moka = { version = "0.12", features = ["future"] }
72/// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
73/// // futures-util = "0.3"
74///
75/// use moka::future::Cache;
76///
77/// #[tokio::main]
78/// async fn main() {
79///     const NUM_TASKS: usize = 16;
80///     const NUM_KEYS_PER_TASK: usize = 64;
81///
82///     fn value(n: usize) -> String {
83///         format!("value {n}")
84///     }
85///
86///     // Create a cache that can store up to 10,000 entries.
87///     let cache = Cache::new(10_000);
88///
89///     // Spawn async tasks and write to and read from the cache.
90///     let tasks: Vec<_> = (0..NUM_TASKS)
91///         .map(|i| {
92///             // To share the same cache across the async tasks, clone it.
93///             // This is a cheap operation.
94///             let my_cache = cache.clone();
95///             let start = i * NUM_KEYS_PER_TASK;
96///             let end = (i + 1) * NUM_KEYS_PER_TASK;
97///
98///             tokio::spawn(async move {
99///                 // Insert 64 entries. (NUM_KEYS_PER_TASK = 64)
100///                 for key in start..end {
101///                     my_cache.insert(key, value(key)).await;
102///                     // get() returns Option<String>, a clone of the stored value.
103///                     assert_eq!(my_cache.get(&key).await, Some(value(key)));
104///                 }
105///
106///                 // Invalidate every 4 element of the inserted entries.
107///                 for key in (start..end).step_by(4) {
108///                     my_cache.invalidate(&key).await;
109///                 }
110///             })
111///         })
112///         .collect();
113///
114///     // Wait for all tasks to complete.
115///     futures_util::future::join_all(tasks).await;
116///
117///     // Verify the result.
118///     for key in 0..(NUM_TASKS * NUM_KEYS_PER_TASK) {
119///         if key % 4 == 0 {
120///             assert_eq!(cache.get(&key).await, None);
121///         } else {
122///             assert_eq!(cache.get(&key).await, Some(value(key)));
123///         }
124///     }
125/// }
126/// ```
127///
128/// If you want to atomically initialize and insert a value when the key is not
129/// present, you might want to check other insertion methods
130/// [`get_with`](#method.get_with) and [`try_get_with`](#method.try_get_with).
131///
132/// # Avoiding to clone the value at `get`
133///
134/// The return type of `get` method is `Option<V>` instead of `Option<&V>`. Every
135/// time `get` is called for an existing key, it creates a clone of the stored value
136/// `V` and returns it. This is because the `Cache` allows concurrent updates from
137/// threads so a value stored in the cache can be dropped or replaced at any time by
138/// any other thread. `get` cannot return a reference `&V` as it is impossible to
139/// guarantee the value outlives the reference.
140///
141/// If you want to store values that will be expensive to clone, wrap them by
142/// `std::sync::Arc` before storing in a cache. [`Arc`][rustdoc-std-arc] is a
143/// thread-safe reference-counted pointer and its `clone()` method is cheap.
144///
145/// [rustdoc-std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html
146///
147/// # Sharing a cache across asynchronous tasks
148///
149/// To share a cache across async tasks (or OS threads), do one of the followings:
150///
151/// - Create a clone of the cache by calling its `clone` method and pass it to other
152///   task.
153/// - If you are using a web application framework such as Actix Web or Axum, you can
154///   store a cache in Actix Web's [`web::Data`][actix-web-data] or Axum's
155///   [shared state][axum-state-extractor], and access it from each request handler.
156/// - Wrap the cache by a `sync::OnceCell` or `sync::Lazy` from
157///   [once_cell][once-cell-crate] create, and set it to a `static` variable.
158///
159/// Cloning is a cheap operation for `Cache` as it only creates thread-safe
160/// reference-counted pointers to the internal data structures.
161///
162/// [once-cell-crate]: https://crates.io/crates/once_cell
163/// [actix-web-data]: https://docs.rs/actix-web/4.3.1/actix_web/web/struct.Data.html
164/// [axum-state-extractor]: https://docs.rs/axum/latest/axum/#sharing-state-with-handlers
165///
166/// ## No lock is needed
167///
168/// Don't wrap a `Cache` by a lock such as `Mutex` or `RwLock`. All methods provided
169/// by the `Cache` are considered thread-safe, and can be safely called by multiple
170/// async tasks at the same time. No lock is needed.
171///
172/// [once-cell-crate]: https://crates.io/crates/once_cell
173///
174/// # Hashing Algorithm
175///
176/// By default, `Cache` uses a hashing algorithm selected to provide resistance
177/// against HashDoS attacks. It will be the same one used by
178/// `std::collections::HashMap`, which is currently SipHash 1-3.
179///
180/// While SipHash's performance is very competitive for medium sized keys, other
181/// hashing algorithms will outperform it for small keys such as integers as well as
182/// large keys such as long strings. However those algorithms will typically not
183/// protect against attacks such as HashDoS.
184///
185/// The hashing algorithm can be replaced on a per-`Cache` basis using the
186/// [`build_with_hasher`][build-with-hasher-method] method of the `CacheBuilder`.
187/// Many alternative algorithms are available on crates.io, such as the
188/// [AHash][ahash-crate] crate.
189///
190/// [build-with-hasher-method]: ./struct.CacheBuilder.html#method.build_with_hasher
191/// [ahash-crate]: https://crates.io/crates/ahash
192///
193/// # Example: Size-based Eviction
194///
195/// ```rust
196/// // Cargo.toml
197/// //
198/// // [dependencies]
199/// // moka = { version = "0.12", features = ["future"] }
200/// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
201/// // futures-util = "0.3"
202///
203/// use moka::future::Cache;
204///
205/// #[tokio::main]
206/// async fn main() {
207///     // Evict based on the number of entries in the cache.
208///     let cache = Cache::builder()
209///         // Up to 10,000 entries.
210///         .max_capacity(10_000)
211///         // Create the cache.
212///         .build();
213///     cache.insert(1, "one".to_string()).await;
214///
215///     // Evict based on the byte length of strings in the cache.
216///     let cache = Cache::builder()
217///         // A weigher closure takes &K and &V and returns a u32
218///         // representing the relative size of the entry.
219///         .weigher(|_key, value: &String| -> u32 {
220///             value.len().try_into().unwrap_or(u32::MAX)
221///         })
222///         // This cache will hold up to 32MiB of values.
223///         .max_capacity(32 * 1024 * 1024)
224///         .build();
225///     cache.insert(2, "two".to_string()).await;
226/// }
227/// ```
228///
229/// If your cache should not grow beyond a certain size, use the `max_capacity`
230/// method of the [`CacheBuilder`][builder-struct] to set the upper bound. The cache
231/// will try to evict entries that have not been used recently or very often.
232///
233/// At the cache creation time, a weigher closure can be set by the `weigher` method
234/// of the `CacheBuilder`. A weigher closure takes `&K` and `&V` as the arguments and
235/// returns a `u32` representing the relative size of the entry:
236///
237/// - If the `weigher` is _not_ set, the cache will treat each entry has the same
238///   size of `1`. This means the cache will be bounded by the number of entries.
239/// - If the `weigher` is set, the cache will call the weigher to calculate the
240///   weighted size (relative size) on an entry. This means the cache will be bounded
241///   by the total weighted size of entries.
242///
243/// Note that weighted sizes are not used when making eviction selections.
244///
245/// [builder-struct]: ./struct.CacheBuilder.html
246///
247/// # Example: Time-based Expirations
248///
249/// ## Cache-level TTL and TTI policies
250///
251/// `Cache` supports the following cache-level expiration policies:
252///
253/// - **Time to live (TTL)**: A cached entry will be expired after the specified
254///   duration past from `insert`.
255/// - **Time to idle (TTI)**: A cached entry will be expired after the specified
256///   duration past from `get` or `insert`.
257///
258/// They are a cache-level expiration policies; all entries in the cache will have
259/// the same TTL and/or TTI durations. If you want to set different expiration
260/// durations for different entries, see the next section.
261///
262/// ```rust
263/// // Cargo.toml
264/// //
265/// // [dependencies]
266/// // moka = { version = "0.12", features = ["future"] }
267/// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
268/// // futures-util = "0.3"
269///
270/// use moka::future::Cache;
271/// use std::time::Duration;
272///
273/// #[tokio::main]
274/// async fn main() {
275///     let cache = Cache::builder()
276///         // Time to live (TTL): 30 minutes
277///         .time_to_live(Duration::from_secs(30 * 60))
278///         // Time to idle (TTI):  5 minutes
279///         .time_to_idle(Duration::from_secs( 5 * 60))
280///         // Create the cache.
281///         .build();
282///
283///     // This entry will expire after 5 minutes (TTI) if there is no get().
284///     cache.insert(0, "zero").await;
285///
286///     // This get() will extend the entry life for another 5 minutes.
287///     cache.get(&0);
288///
289///     // Even though we keep calling get(), the entry will expire
290///     // after 30 minutes (TTL) from the insert().
291/// }
292/// ```
293///
294/// ## Per-entry expiration policy
295///
296/// `Cache` supports per-entry expiration policy through the `Expiry` trait.
297///
298/// `Expiry` trait provides three callback methods:
299/// [`expire_after_create`][exp-create], [`expire_after_read`][exp-read] and
300/// [`expire_after_update`][exp-update]. When a cache entry is inserted, read or
301/// updated, one of these methods is called. These methods return an
302/// `Option<Duration>`, which is used as the expiration duration of the entry.
303///
304/// `Expiry` trait provides the default implementations of these methods, so you will
305/// implement only the methods you want to customize.
306///
307/// [exp-create]: ../trait.Expiry.html#method.expire_after_create
308/// [exp-read]: ../trait.Expiry.html#method.expire_after_read
309/// [exp-update]: ../trait.Expiry.html#method.expire_after_update
310///
311/// ```rust
312/// // Cargo.toml
313/// //
314/// // [dependencies]
315/// // moka = { version = "0.12", features = ["future"] }
316/// // tokio = { version = "1", features = ["rt-multi-thread", "macros", "time" ] }
317///
318/// use moka::{future::Cache, Expiry};
319/// use std::time::{Duration, Instant};
320///
321/// // In this example, we will create a `future::Cache` with `u32` as the key, and
322/// // `(Expiration, String)` as the value. `Expiration` is an enum to represent the
323/// // expiration of the value, and `String` is the application data of the value.
324///
325/// /// An enum to represent the expiration of a value.
326/// #[derive(Clone, Copy, Debug, Eq, PartialEq)]
327/// pub enum Expiration {
328///     /// The value never expires.
329///     Never,
330///     /// The value expires after a short time. (5 seconds in this example)
331///     AfterShortTime,
332///     /// The value expires after a long time. (15 seconds in this example)
333///     AfterLongTime,
334/// }
335///
336/// impl Expiration {
337///     /// Returns the duration of this expiration.
338///     pub fn as_duration(&self) -> Option<Duration> {
339///         match self {
340///             Expiration::Never => None,
341///             Expiration::AfterShortTime => Some(Duration::from_secs(5)),
342///             Expiration::AfterLongTime => Some(Duration::from_secs(15)),
343///         }
344///     }
345/// }
346///
347/// /// An expiry that implements `moka::Expiry` trait. `Expiry` trait provides the
348/// /// default implementations of three callback methods `expire_after_create`,
349/// /// `expire_after_read`, and `expire_after_update`.
350/// ///
351/// /// In this example, we only override the `expire_after_create` method.
352/// pub struct MyExpiry;
353///
354/// impl Expiry<u32, (Expiration, String)> for MyExpiry {
355///     /// Returns the duration of the expiration of the value that was just
356///     /// created.
357///     fn expire_after_create(
358///         &self,
359///         _key: &u32,
360///         value: &(Expiration, String),
361///         _current_time: Instant,
362///     ) -> Option<Duration> {
363///         let duration = value.0.as_duration();
364///         println!("MyExpiry: expire_after_create called with key {_key} and value {value:?}. Returning {duration:?}.");
365///         duration
366///     }
367/// }
368///
369/// #[tokio::main]
370/// async fn main() {
371///     // Create a `Cache<u32, (Expiration, String)>` with an expiry `MyExpiry` and
372///     // eviction listener.
373///     let expiry = MyExpiry;
374///
375///     let eviction_listener = |key, _value, cause| {
376///         println!("Evicted key {key}. Cause: {cause:?}");
377///     };
378///
379///     let cache = Cache::builder()
380///         .max_capacity(100)
381///         .expire_after(expiry)
382///         .eviction_listener(eviction_listener)
383///         .build();
384///
385///     // Insert some entries into the cache with different expirations.
386///     cache
387///         .get_with(0, async { (Expiration::AfterShortTime, "a".to_string()) })
388///         .await;
389///
390///     cache
391///         .get_with(1, async { (Expiration::AfterLongTime, "b".to_string()) })
392///         .await;
393///
394///     cache
395///         .get_with(2, async { (Expiration::Never, "c".to_string()) })
396///         .await;
397///
398///     // Verify that all the inserted entries exist.
399///     assert!(cache.contains_key(&0));
400///     assert!(cache.contains_key(&1));
401///     assert!(cache.contains_key(&2));
402///
403///     // Sleep for 6 seconds. Key 0 should expire.
404///     println!("\nSleeping for 6 seconds...\n");
405///     tokio::time::sleep(Duration::from_secs(6)).await;
406///     cache.run_pending_tasks().await;
407///     println!("Entry count: {}", cache.entry_count());
408///
409///     // Verify that key 0 has been evicted.
410///     assert!(!cache.contains_key(&0));
411///     assert!(cache.contains_key(&1));
412///     assert!(cache.contains_key(&2));
413///
414///     // Sleep for 10 more seconds. Key 1 should expire.
415///     println!("\nSleeping for 10 seconds...\n");
416///     tokio::time::sleep(Duration::from_secs(10)).await;
417///     cache.run_pending_tasks().await;
418///     println!("Entry count: {}", cache.entry_count());
419///
420///     // Verify that key 1 has been evicted.
421///     assert!(!cache.contains_key(&1));
422///     assert!(cache.contains_key(&2));
423///
424///     // Manually invalidate key 2.
425///     cache.invalidate(&2).await;
426///     assert!(!cache.contains_key(&2));
427///
428///     println!("\nSleeping for a second...\n");
429///     tokio::time::sleep(Duration::from_secs(1)).await;
430///     cache.run_pending_tasks().await;
431///     println!("Entry count: {}", cache.entry_count());
432///
433///     println!("\nDone!");
434/// }
435/// ```
436///
437/// # Example: Eviction Listener
438///
439/// A `Cache` can be configured with an eviction listener, a closure that is called
440/// every time there is a cache eviction. The listener takes three parameters: the
441/// key and value of the evicted entry, and the
442/// [`RemovalCause`](../notification/enum.RemovalCause.html) to indicate why the
443/// entry was evicted.
444///
445/// An eviction listener can be used to keep other data structures in sync with the
446/// cache, for example.
447///
448/// The following example demonstrates how to use an eviction listener with
449/// time-to-live expiration to manage the lifecycle of temporary files on a
450/// filesystem. The cache stores the paths of the files, and when one of them has
451/// expired, the eviction listener will be called with the path, so it can remove the
452/// file from the filesystem.
453///
454/// ```rust
455/// // Cargo.toml
456/// //
457/// // [dependencies]
458/// // anyhow = "1.0"
459/// // uuid = { version = "1.1", features = ["v4"] }
460/// // tokio = { version = "1.18", features = ["fs", "macros", "rt-multi-thread", "sync", "time"] }
461///
462/// use moka::{future::Cache, notification::ListenerFuture};
463/// // FutureExt trait provides the boxed method.
464/// use moka::future::FutureExt;
465///
466/// use anyhow::{anyhow, Context};
467/// use std::{
468///     io,
469///     path::{Path, PathBuf},
470///     sync::Arc,
471///     time::Duration,
472/// };
473/// use tokio::{fs, sync::RwLock};
474/// use uuid::Uuid;
475///
476/// /// The DataFileManager writes, reads and removes data files.
477/// struct DataFileManager {
478///     base_dir: PathBuf,
479///     file_count: usize,
480/// }
481///
482/// impl DataFileManager {
483///     fn new(base_dir: PathBuf) -> Self {
484///         Self {
485///             base_dir,
486///             file_count: 0,
487///         }
488///     }
489///
490///     async fn write_data_file(
491///         &mut self,
492///         key: impl AsRef<str>,
493///         contents: String
494///     ) -> io::Result<PathBuf> {
495///         // Use the key as a part of the filename.
496///         let mut path = self.base_dir.to_path_buf();
497///         path.push(key.as_ref());
498///
499///         assert!(!path.exists(), "Path already exists: {path:?}");
500///
501///         // create the file at the path and write the contents to the file.
502///         fs::write(&path, contents).await?;
503///         self.file_count += 1;
504///         println!("Created a data file at {path:?} (file count: {})", self.file_count);
505///         Ok(path)
506///     }
507///
508///     async fn read_data_file(&self, path: impl AsRef<Path>) -> io::Result<String> {
509///         // Reads the contents of the file at the path, and return the contents.
510///         fs::read_to_string(path).await
511///     }
512///
513///     async fn remove_data_file(&mut self, path: impl AsRef<Path>) -> io::Result<()> {
514///         // Remove the file at the path.
515///         fs::remove_file(path.as_ref()).await?;
516///         self.file_count -= 1;
517///         println!(
518///             "Removed a data file at {:?} (file count: {})",
519///             path.as_ref(),
520///             self.file_count
521///         );
522///
523///         Ok(())
524///     }
525/// }
526///
527/// #[tokio::main]
528/// async fn main() -> anyhow::Result<()> {
529///     // Create an instance of the DataFileManager and wrap it with
530///     // Arc<RwLock<_>> so it can be shared across threads.
531///     let mut base_dir = std::env::temp_dir();
532///     base_dir.push(Uuid::new_v4().as_hyphenated().to_string());
533///     println!("base_dir: {base_dir:?}");
534///     std::fs::create_dir(&base_dir)?;
535///
536///     let file_mgr = DataFileManager::new(base_dir);
537///     let file_mgr = Arc::new(RwLock::new(file_mgr));
538///
539///     let file_mgr1 = Arc::clone(&file_mgr);
540///     let rt = tokio::runtime::Handle::current();
541///
542///     // Create an eviction listener closure.
543///     let eviction_listener = move |k, v: PathBuf, cause| -> ListenerFuture {
544///         println!("\n== An entry has been evicted. k: {k:?}, v: {v:?}, cause: {cause:?}");
545///         let file_mgr2 = Arc::clone(&file_mgr1);
546///
547///         // Create a Future that removes the data file at the path `v`.
548///         async move {
549///             // Acquire the write lock of the DataFileManager.
550///             let mut mgr = file_mgr2.write().await;
551///             // Remove the data file. We must handle error cases here to
552///             // prevent the listener from panicking.
553///             if let Err(_e) = mgr.remove_data_file(v.as_path()).await {
554///                 eprintln!("Failed to remove a data file at {v:?}");
555///             }
556///         }
557///         // Convert the regular Future into ListenerFuture. This method is
558///         // provided by moka::future::FutureExt trait.
559///         .boxed()
560///     };
561///
562///     // Create the cache. Set time to live for two seconds and set the
563///     // eviction listener.
564///     let cache = Cache::builder()
565///         .max_capacity(100)
566///         .time_to_live(Duration::from_secs(2))
567///         .async_eviction_listener(eviction_listener)
568///         .build();
569///
570///     // Insert an entry to the cache.
571///     // This will create and write a data file for the key "user1", store the
572///     // path of the file to the cache, and return it.
573///     println!("== try_get_with()");
574///     let key = "user1";
575///     let path = cache
576///         .try_get_with(key, async {
577///             let mut mgr = file_mgr.write().await;
578///             let path = mgr
579///                 .write_data_file(key, "user data".into())
580///                 .await
581///                 .with_context(|| format!("Failed to create a data file"))?;
582///             Ok(path) as anyhow::Result<_>
583///         })
584///         .await
585///         .map_err(|e| anyhow!("{e}"))?;
586///
587///     // Read the data file at the path and print the contents.
588///     println!("\n== read_data_file()");
589///     {
590///         let mgr = file_mgr.read().await;
591///         let contents = mgr
592///             .read_data_file(path.as_path())
593///             .await
594///             .with_context(|| format!("Failed to read data from {path:?}"))?;
595///         println!("contents: {contents}");
596///     }
597///
598///     // Sleep for five seconds. While sleeping, the cache entry for key "user1"
599///     // will be expired and evicted, so the eviction listener will be called to
600///     // remove the file.
601///     tokio::time::sleep(Duration::from_secs(5)).await;
602///
603///     cache.run_pending_tasks();
604///
605///     Ok(())
606/// }
607/// ```
608///
609/// ## You should avoid eviction listener to panic
610///
611/// It is very important to make an eviction listener closure not to panic.
612/// Otherwise, the cache will stop calling the listener after a panic. This is an
613/// intended behavior because the cache cannot know whether it is memory safe or not
614/// to call the panicked listener again.
615///
616/// When a listener panics, the cache will swallow the panic and disable the
617/// listener. If you want to know when a listener panics and the reason of the panic,
618/// you can enable an optional `logging` feature of Moka and check error-level logs.
619///
620/// To enable the `logging`, do the followings:
621///
622/// 1. In `Cargo.toml`, add the crate feature `logging` for `moka`.
623/// 2. Set the logging level for `moka` to `error` or any lower levels (`warn`,
624///    `info`, ...):
625///     - If you are using the `env_logger` crate, you can achieve this by setting
626///       `RUST_LOG` environment variable to `moka=error`.
627/// 3. If you have more than one caches, you may want to set a distinct name for each
628///    cache by using cache builder's [`name`][builder-name-method] method. The name
629///    will appear in the log.
630///
631/// [builder-name-method]: ./struct.CacheBuilder.html#method.name
632///
633pub struct Cache<K, V, S = RandomState> {
634    pub(crate) base: BaseCache<K, V, S>,
635    value_initializer: Arc<ValueInitializer<K, V, S>>,
636
637    #[cfg(test)]
638    schedule_write_op_should_block: AtomicBool,
639}
640
641unsafe impl<K, V, S> Send for Cache<K, V, S>
642where
643    K: Send + Sync,
644    V: Send + Sync,
645    S: Send,
646{
647}
648
649unsafe impl<K, V, S> Sync for Cache<K, V, S>
650where
651    K: Send + Sync,
652    V: Send + Sync,
653    S: Sync,
654{
655}
656
657// NOTE: We cannot do `#[derive(Clone)]` because it will add `Clone` bound to `K`.
658impl<K, V, S> Clone for Cache<K, V, S> {
659    /// Makes a clone of this shared cache.
660    ///
661    /// This operation is cheap as it only creates thread-safe reference counted
662    /// pointers to the shared internal data structures.
663    fn clone(&self) -> Self {
664        Self {
665            base: self.base.clone(),
666            value_initializer: Arc::clone(&self.value_initializer),
667
668            #[cfg(test)]
669            schedule_write_op_should_block: AtomicBool::new(
670                self.schedule_write_op_should_block.load(Ordering::Acquire),
671            ),
672        }
673    }
674}
675
676impl<K, V, S> fmt::Debug for Cache<K, V, S>
677where
678    K: fmt::Debug + Eq + Hash + Send + Sync + 'static,
679    V: fmt::Debug + Clone + Send + Sync + 'static,
680    // TODO: Remove these bounds from S.
681    S: BuildHasher + Clone + Send + Sync + 'static,
682{
683    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
684        let mut d_map = f.debug_map();
685
686        for (k, v) in self {
687            d_map.entry(&k, &v);
688        }
689
690        d_map.finish()
691    }
692}
693
694impl<K, V, S> Cache<K, V, S> {
695    /// Returns cache’s name.
696    pub fn name(&self) -> Option<&str> {
697        self.base.name()
698    }
699
700    /// Returns a read-only cache policy of this cache.
701    ///
702    /// At this time, cache policy cannot be modified after cache creation.
703    /// A future version may support to modify it.
704    pub fn policy(&self) -> Policy {
705        self.base.policy()
706    }
707
708    /// Returns an approximate number of entries in this cache.
709    ///
710    /// The value returned is _an estimate_; the actual count may differ if there are
711    /// concurrent insertions or removals, or if some entries are pending removal due
712    /// to expiration. This inaccuracy can be mitigated by calling
713    /// `run_pending_tasks` first.
714    ///
715    /// # Example
716    ///
717    /// ```rust
718    /// // Cargo.toml
719    /// //
720    /// // [dependencies]
721    /// // moka = { version = "0.12", features = ["future"] }
722    /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
723    /// use moka::future::Cache;
724    ///
725    /// #[tokio::main]
726    /// async fn main() {
727    ///     let cache = Cache::new(10);
728    ///     cache.insert('n', "Netherland Dwarf").await;
729    ///     cache.insert('l', "Lop Eared").await;
730    ///     cache.insert('d', "Dutch").await;
731    ///
732    ///     // Ensure an entry exists.
733    ///     assert!(cache.contains_key(&'n'));
734    ///
735    ///     // However, followings may print stale number zeros instead of threes.
736    ///     println!("{}", cache.entry_count());   // -> 0
737    ///     println!("{}", cache.weighted_size()); // -> 0
738    ///
739    ///     // To mitigate the inaccuracy, call `run_pending_tasks` to run pending
740    ///     // internal tasks.
741    ///     cache.run_pending_tasks().await;
742    ///
743    ///     // Followings will print the actual numbers.
744    ///     println!("{}", cache.entry_count());   // -> 3
745    ///     println!("{}", cache.weighted_size()); // -> 3
746    /// }
747    /// ```
748    ///
749    pub fn entry_count(&self) -> u64 {
750        self.base.entry_count()
751    }
752
753    /// Returns an approximate total weighted size of entries in this cache.
754    ///
755    /// The value returned is _an estimate_; the actual size may differ if there are
756    /// concurrent insertions or removals, or if some entries are pending removal due
757    /// to expiration. This inaccuracy can be mitigated by calling
758    /// `run_pending_tasks` first. See [`entry_count`](#method.entry_count) for a
759    /// sample code.
760    pub fn weighted_size(&self) -> u64 {
761        self.base.weighted_size()
762    }
763
764    #[cfg(feature = "unstable-debug-counters")]
765    #[cfg_attr(docsrs, doc(cfg(feature = "unstable-debug-counters")))]
766    pub async fn debug_stats(&self) -> CacheDebugStats {
767        self.base.debug_stats().await
768    }
769}
770
771impl<K, V> Cache<K, V, RandomState>
772where
773    K: Hash + Eq + Send + Sync + 'static,
774    V: Clone + Send + Sync + 'static,
775{
776    /// Constructs a new `Cache<K, V>` that will store up to the `max_capacity`.
777    ///
778    /// To adjust various configuration knobs such as `initial_capacity` or
779    /// `time_to_live`, use the [`CacheBuilder`][builder-struct].
780    ///
781    /// [builder-struct]: ./struct.CacheBuilder.html
782    pub fn new(max_capacity: u64) -> Self {
783        let build_hasher = RandomState::default();
784        Self::with_everything(
785            None,
786            Some(max_capacity),
787            None,
788            build_hasher,
789            None,
790            EvictionPolicy::default(),
791            None,
792            ExpirationPolicy::default(),
793            HousekeeperConfig::default(),
794            false,
795            Clock::default(),
796        )
797    }
798
799    /// Returns a [`CacheBuilder`][builder-struct], which can builds a `Cache` with
800    /// various configuration knobs.
801    ///
802    /// [builder-struct]: ./struct.CacheBuilder.html
803    pub fn builder() -> CacheBuilder<K, V, Cache<K, V, RandomState>> {
804        CacheBuilder::default()
805    }
806}
807
808impl<K, V, S> Cache<K, V, S>
809where
810    K: Hash + Eq + Send + Sync + 'static,
811    V: Clone + Send + Sync + 'static,
812    S: BuildHasher + Clone + Send + Sync + 'static,
813{
814    // https://rust-lang.github.io/rust-clippy/master/index.html#too_many_arguments
815    #[allow(clippy::too_many_arguments)]
816    pub(crate) fn with_everything(
817        name: Option<String>,
818        max_capacity: Option<u64>,
819        initial_capacity: Option<usize>,
820        build_hasher: S,
821        weigher: Option<Weigher<K, V>>,
822        eviction_policy: EvictionPolicy,
823        eviction_listener: Option<AsyncEvictionListener<K, V>>,
824        expiration_policy: ExpirationPolicy<K, V>,
825        housekeeper_config: HousekeeperConfig,
826        invalidator_enabled: bool,
827        clock: Clock,
828    ) -> Self {
829        Self {
830            base: BaseCache::new(
831                name,
832                max_capacity,
833                initial_capacity,
834                build_hasher.clone(),
835                weigher,
836                eviction_policy,
837                eviction_listener,
838                expiration_policy,
839                housekeeper_config,
840                invalidator_enabled,
841                clock,
842            ),
843            value_initializer: Arc::new(ValueInitializer::with_hasher(build_hasher)),
844
845            #[cfg(test)]
846            schedule_write_op_should_block: Default::default(), // false
847        }
848    }
849
850    /// Returns `true` if the cache contains a value for the key.
851    ///
852    /// Unlike the `get` method, this method is not considered a cache read operation,
853    /// so it does not update the historic popularity estimator or reset the idle
854    /// timer for the key.
855    ///
856    /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
857    /// on the borrowed form _must_ match those for the key type.
858    pub fn contains_key<Q>(&self, key: &Q) -> bool
859    where
860        Q: Equivalent<K> + Hash + ?Sized,
861    {
862        self.base.contains_key_with_hash(key, self.base.hash(key))
863    }
864
865    /// Returns a _clone_ of the value corresponding to the key.
866    ///
867    /// If you want to store values that will be expensive to clone, wrap them by
868    /// `std::sync::Arc` before storing in a cache. [`Arc`][rustdoc-std-arc] is a
869    /// thread-safe reference-counted pointer and its `clone()` method is cheap.
870    ///
871    /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
872    /// on the borrowed form _must_ match those for the key type.
873    ///
874    /// [rustdoc-std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html
875    pub async fn get<Q>(&self, key: &Q) -> Option<V>
876    where
877        Q: Equivalent<K> + Hash + ?Sized,
878    {
879        let ignore_if = None as Option<&mut fn(&V) -> bool>;
880
881        self.base
882            .get_with_hash(key, self.base.hash(key), ignore_if, false, true)
883            .await
884            .map(Entry::into_value)
885    }
886
887    /// Takes a key `K` and returns an [`OwnedKeyEntrySelector`] that can be used to
888    /// select or insert an entry.
889    ///
890    /// [`OwnedKeyEntrySelector`]: ./struct.OwnedKeyEntrySelector.html
891    ///
892    /// # Example
893    ///
894    /// ```rust
895    /// // Cargo.toml
896    /// //
897    /// // [dependencies]
898    /// // moka = { version = "0.12", features = ["future"] }
899    /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
900    ///
901    /// use moka::future::Cache;
902    ///
903    /// #[tokio::main]
904    /// async fn main() {
905    ///     let cache: Cache<String, u32> = Cache::new(100);
906    ///     let key = "key1".to_string();
907    ///
908    ///     let entry = cache.entry(key.clone()).or_insert(3).await;
909    ///     assert!(entry.is_fresh());
910    ///     assert_eq!(entry.key(), &key);
911    ///     assert_eq!(entry.into_value(), 3);
912    ///
913    ///     let entry = cache.entry(key).or_insert(6).await;
914    ///     // Not fresh because the value was already in the cache.
915    ///     assert!(!entry.is_fresh());
916    ///     assert_eq!(entry.into_value(), 3);
917    /// }
918    /// ```
919    pub fn entry(&self, key: K) -> OwnedKeyEntrySelector<'_, K, V, S>
920    where
921        K: Hash + Eq,
922    {
923        let hash = self.base.hash(&key);
924        OwnedKeyEntrySelector::new(key, hash, self)
925    }
926
927    /// Takes a reference `&Q` of a key and returns an [`RefKeyEntrySelector`] that
928    /// can be used to select or insert an entry.
929    ///
930    /// [`RefKeyEntrySelector`]: ./struct.RefKeyEntrySelector.html
931    ///
932    /// # Example
933    ///
934    /// ```rust
935    /// // Cargo.toml
936    /// //
937    /// // [dependencies]
938    /// // moka = { version = "0.12", features = ["future"] }
939    /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
940    ///
941    /// use moka::future::Cache;
942    ///
943    /// #[tokio::main]
944    /// async fn main() {
945    ///     let cache: Cache<String, u32> = Cache::new(100);
946    ///     let key = "key1".to_string();
947    ///
948    ///     let entry = cache.entry_by_ref(&key).or_insert(3).await;
949    ///     assert!(entry.is_fresh());
950    ///     assert_eq!(entry.key(), &key);
951    ///     assert_eq!(entry.into_value(), 3);
952    ///
953    ///     let entry = cache.entry_by_ref(&key).or_insert(6).await;
954    ///     // Not fresh because the value was already in the cache.
955    ///     assert!(!entry.is_fresh());
956    ///     assert_eq!(entry.into_value(), 3);
957    /// }
958    /// ```
959    pub fn entry_by_ref<'a, Q>(&'a self, key: &'a Q) -> RefKeyEntrySelector<'a, K, Q, V, S>
960    where
961        Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
962    {
963        let hash = self.base.hash(key);
964        RefKeyEntrySelector::new(key, hash, self)
965    }
966
967    /// Returns a _clone_ of the value corresponding to the key. If the value does
968    /// not exist, resolve the `init` future and inserts the output.
969    ///
970    /// # Concurrent calls on the same key
971    ///
972    /// This method guarantees that concurrent calls on the same not-existing key are
973    /// coalesced into one evaluation of the `init` future. Only one of the calls
974    /// evaluates its future, and other calls wait for that future to resolve.
975    ///
976    /// The following code snippet demonstrates this behavior:
977    ///
978    /// ```rust
979    /// // Cargo.toml
980    /// //
981    /// // [dependencies]
982    /// // moka = { version = "0.12", features = ["future"] }
983    /// // futures-util = "0.3"
984    /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
985    /// use moka::future::Cache;
986    /// use std::sync::Arc;
987    ///
988    /// #[tokio::main]
989    /// async fn main() {
990    ///     const TEN_MIB: usize = 10 * 1024 * 1024; // 10MiB
991    ///     let cache = Cache::new(100);
992    ///
993    ///     // Spawn four async tasks.
994    ///     let tasks: Vec<_> = (0..4_u8)
995    ///         .map(|task_id| {
996    ///             let my_cache = cache.clone();
997    ///             tokio::spawn(async move {
998    ///                 println!("Task {task_id} started.");
999    ///
1000    ///                 // Insert and get the value for key1. Although all four async
1001    ///                 // tasks will call `get_with` at the same time, the `init`
1002    ///                 // async block must be resolved only once.
1003    ///                 let value = my_cache
1004    ///                     .get_with("key1", async move {
1005    ///                         println!("Task {task_id} inserting a value.");
1006    ///                         Arc::new(vec![0u8; TEN_MIB])
1007    ///                     })
1008    ///                     .await;
1009    ///
1010    ///                 // Ensure the value exists now.
1011    ///                 assert_eq!(value.len(), TEN_MIB);
1012    ///                 assert!(my_cache.get(&"key1").await.is_some());
1013    ///
1014    ///                 println!("Task {task_id} got the value. (len: {})", value.len());
1015    ///             })
1016    ///         })
1017    ///         .collect();
1018    ///
1019    ///     // Run all tasks concurrently and wait for them to complete.
1020    ///     futures_util::future::join_all(tasks).await;
1021    /// }
1022    /// ```
1023    ///
1024    /// **A Sample Result**
1025    ///
1026    /// - The `init` future (async black) was resolved exactly once by task 3.
1027    /// - Other tasks were blocked until task 3 inserted the value.
1028    ///
1029    /// ```console
1030    /// Task 0 started.
1031    /// Task 3 started.
1032    /// Task 1 started.
1033    /// Task 2 started.
1034    /// Task 3 inserting a value.
1035    /// Task 3 got the value. (len: 10485760)
1036    /// Task 0 got the value. (len: 10485760)
1037    /// Task 1 got the value. (len: 10485760)
1038    /// Task 2 got the value. (len: 10485760)
1039    /// ```
1040    ///
1041    /// # Panics
1042    ///
1043    /// This method panics when the `init` future has panicked. When it happens, only
1044    /// the caller whose `init` future panicked will get the panic (e.g. only task 3
1045    /// in the above sample). If there are other calls in progress (e.g. task 0, 1
1046    /// and 2 above), this method will restart and resolve one of the remaining
1047    /// `init` futures.
1048    ///
1049    pub async fn get_with(&self, key: K, init: impl Future<Output = V>) -> V {
1050        futures_util::pin_mut!(init);
1051        let hash = self.base.hash(&key);
1052        let key = Arc::new(key);
1053        let replace_if = None as Option<fn(&V) -> bool>;
1054        self.get_or_insert_with_hash_and_fun(key, hash, init, replace_if, false)
1055            .await
1056            .into_value()
1057    }
1058
1059    /// Similar to [`get_with`](#method.get_with), but instead of passing an owned
1060    /// key, you can pass a reference to the key. If the key does not exist in the
1061    /// cache, the key will be cloned to create new entry in the cache.
1062    pub async fn get_with_by_ref<Q>(&self, key: &Q, init: impl Future<Output = V>) -> V
1063    where
1064        Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
1065    {
1066        futures_util::pin_mut!(init);
1067        let hash = self.base.hash(key);
1068        let replace_if = None as Option<fn(&V) -> bool>;
1069        self.get_or_insert_with_hash_by_ref_and_fun(key, hash, init, replace_if, false)
1070            .await
1071            .into_value()
1072    }
1073
1074    /// TODO: Remove this in v0.13.0.
1075    /// Deprecated, replaced with
1076    /// [`entry()::or_insert_with_if()`](./struct.OwnedKeyEntrySelector.html#method.or_insert_with_if)
1077    #[deprecated(since = "0.10.0", note = "Replaced with `entry().or_insert_with_if()`")]
1078    pub async fn get_with_if(
1079        &self,
1080        key: K,
1081        init: impl Future<Output = V>,
1082        replace_if: impl FnMut(&V) -> bool + Send,
1083    ) -> V {
1084        futures_util::pin_mut!(init);
1085        let hash = self.base.hash(&key);
1086        let key = Arc::new(key);
1087        self.get_or_insert_with_hash_and_fun(key, hash, init, Some(replace_if), false)
1088            .await
1089            .into_value()
1090    }
1091
1092    /// Returns a _clone_ of the value corresponding to the key. If the value does
1093    /// not exist, resolves the `init` future, and inserts the value if `Some(value)`
1094    /// was returned. If `None` was returned from the future, this method does not
1095    /// insert a value and returns `None`.
1096    ///
1097    /// # Concurrent calls on the same key
1098    ///
1099    /// This method guarantees that concurrent calls on the same not-existing key are
1100    /// coalesced into one evaluation of the `init` future. Only one of the calls
1101    /// evaluates its future, and other calls wait for that future to resolve.
1102    ///
1103    /// The following code snippet demonstrates this behavior:
1104    ///
1105    /// ```rust
1106    /// // Cargo.toml
1107    /// //
1108    /// // [dependencies]
1109    /// // moka = { version = "0.12", features = ["future"] }
1110    /// // futures-util = "0.3"
1111    /// // reqwest = "0.11"
1112    /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
1113    /// use moka::future::Cache;
1114    ///
1115    /// // This async function tries to get HTML from the given URI.
1116    /// async fn get_html(task_id: u8, uri: &str) -> Option<String> {
1117    ///     println!("get_html() called by task {task_id}.");
1118    ///     reqwest::get(uri).await.ok()?.text().await.ok()
1119    /// }
1120    ///
1121    /// #[tokio::main]
1122    /// async fn main() {
1123    ///     let cache = Cache::new(100);
1124    ///
1125    ///     // Spawn four async tasks.
1126    ///     let tasks: Vec<_> = (0..4_u8)
1127    ///         .map(|task_id| {
1128    ///             let my_cache = cache.clone();
1129    ///             tokio::spawn(async move {
1130    ///                 println!("Task {task_id} started.");
1131    ///
1132    ///                 // Try to insert and get the value for key1. Although
1133    ///                 // all four async tasks will call `try_get_with`
1134    ///                 // at the same time, get_html() must be called only once.
1135    ///                 let value = my_cache
1136    ///                     .optionally_get_with(
1137    ///                         "key1",
1138    ///                         get_html(task_id, "https://www.rust-lang.org"),
1139    ///                     ).await;
1140    ///
1141    ///                 // Ensure the value exists now.
1142    ///                 assert!(value.is_some());
1143    ///                 assert!(my_cache.get(&"key1").await.is_some());
1144    ///
1145    ///                 println!(
1146    ///                     "Task {task_id} got the value. (len: {})",
1147    ///                     value.unwrap().len()
1148    ///                 );
1149    ///             })
1150    ///         })
1151    ///         .collect();
1152    ///
1153    ///     // Run all tasks concurrently and wait for them to complete.
1154    ///     futures_util::future::join_all(tasks).await;
1155    /// }
1156    /// ```
1157    ///
1158    /// **A Sample Result**
1159    ///
1160    /// - `get_html()` was called exactly once by task 2.
1161    /// - Other tasks were blocked until task 2 inserted the value.
1162    ///
1163    /// ```console
1164    /// Task 1 started.
1165    /// Task 0 started.
1166    /// Task 2 started.
1167    /// Task 3 started.
1168    /// get_html() called by task 2.
1169    /// Task 2 got the value. (len: 19419)
1170    /// Task 1 got the value. (len: 19419)
1171    /// Task 0 got the value. (len: 19419)
1172    /// Task 3 got the value. (len: 19419)
1173    /// ```
1174    ///
1175    /// # Panics
1176    ///
1177    /// This method panics when the `init` future has panicked. When it happens, only
1178    /// the caller whose `init` future panicked will get the panic (e.g. only task 2
1179    /// in the above sample). If there are other calls in progress (e.g. task 0, 1
1180    /// and 3 above), this method will restart and resolve one of the remaining
1181    /// `init` futures.
1182    ///
1183    pub async fn optionally_get_with<F>(&self, key: K, init: F) -> Option<V>
1184    where
1185        F: Future<Output = Option<V>>,
1186    {
1187        futures_util::pin_mut!(init);
1188        let hash = self.base.hash(&key);
1189        let key = Arc::new(key);
1190        self.get_or_optionally_insert_with_hash_and_fun(key, hash, init, false)
1191            .await
1192            .map(Entry::into_value)
1193    }
1194
1195    /// Similar to [`optionally_get_with`](#method.optionally_get_with), but instead
1196    /// of passing an owned key, you can pass a reference to the key. If the key does
1197    /// not exist in the cache, the key will be cloned to create new entry in the
1198    /// cache.
1199    pub async fn optionally_get_with_by_ref<F, Q>(&self, key: &Q, init: F) -> Option<V>
1200    where
1201        F: Future<Output = Option<V>>,
1202        Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
1203    {
1204        futures_util::pin_mut!(init);
1205        let hash = self.base.hash(key);
1206        self.get_or_optionally_insert_with_hash_by_ref_and_fun(key, hash, init, false)
1207            .await
1208            .map(Entry::into_value)
1209    }
1210
1211    /// Returns a _clone_ of the value corresponding to the key. If the value does
1212    /// not exist, resolves the `init` future, and inserts the value if `Ok(value)`
1213    /// was returned. If `Err(_)` was returned from the future, this method does not
1214    /// insert a value and returns the `Err` wrapped by [`std::sync::Arc`][std-arc].
1215    ///
1216    /// [std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html
1217    ///
1218    /// # Concurrent calls on the same key
1219    ///
1220    /// This method guarantees that concurrent calls on the same not-existing key are
1221    /// coalesced into one evaluation of the `init` future (as long as these
1222    /// futures return the same error type). Only one of the calls evaluates its
1223    /// future, and other calls wait for that future to resolve.
1224    ///
1225    /// The following code snippet demonstrates this behavior:
1226    ///
1227    /// ```rust
1228    /// // Cargo.toml
1229    /// //
1230    /// // [dependencies]
1231    /// // moka = { version = "0.12", features = ["future"] }
1232    /// // futures-util = "0.3"
1233    /// // reqwest = "0.11"
1234    /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
1235    /// use moka::future::Cache;
1236    ///
1237    /// // This async function tries to get HTML from the given URI.
1238    /// async fn get_html(task_id: u8, uri: &str) -> Result<String, reqwest::Error> {
1239    ///     println!("get_html() called by task {task_id}.");
1240    ///     reqwest::get(uri).await?.text().await
1241    /// }
1242    ///
1243    /// #[tokio::main]
1244    /// async fn main() {
1245    ///     let cache = Cache::new(100);
1246    ///
1247    ///     // Spawn four async tasks.
1248    ///     let tasks: Vec<_> = (0..4_u8)
1249    ///         .map(|task_id| {
1250    ///             let my_cache = cache.clone();
1251    ///             tokio::spawn(async move {
1252    ///                 println!("Task {task_id} started.");
1253    ///
1254    ///                 // Try to insert and get the value for key1. Although
1255    ///                 // all four async tasks will call `try_get_with`
1256    ///                 // at the same time, get_html() must be called only once.
1257    ///                 let value = my_cache
1258    ///                     .try_get_with(
1259    ///                         "key1",
1260    ///                         get_html(task_id, "https://www.rust-lang.org"),
1261    ///                     ).await;
1262    ///
1263    ///                 // Ensure the value exists now.
1264    ///                 assert!(value.is_ok());
1265    ///                 assert!(my_cache.get(&"key1").await.is_some());
1266    ///
1267    ///                 println!(
1268    ///                     "Task {task_id} got the value. (len: {})",
1269    ///                     value.unwrap().len()
1270    ///                 );
1271    ///             })
1272    ///         })
1273    ///         .collect();
1274    ///
1275    ///     // Run all tasks concurrently and wait for them to complete.
1276    ///     futures_util::future::join_all(tasks).await;
1277    /// }
1278    /// ```
1279    ///
1280    /// **A Sample Result**
1281    ///
1282    /// - `get_html()` was called exactly once by task 2.
1283    /// - Other tasks were blocked until task 2 inserted the value.
1284    ///
1285    /// ```console
1286    /// Task 1 started.
1287    /// Task 0 started.
1288    /// Task 2 started.
1289    /// Task 3 started.
1290    /// get_html() called by task 2.
1291    /// Task 2 got the value. (len: 19419)
1292    /// Task 1 got the value. (len: 19419)
1293    /// Task 0 got the value. (len: 19419)
1294    /// Task 3 got the value. (len: 19419)
1295    /// ```
1296    ///
1297    /// # Panics
1298    ///
1299    /// This method panics when the `init` future has panicked. When it happens, only
1300    /// the caller whose `init` future panicked will get the panic (e.g. only task 2
1301    /// in the above sample). If there are other calls in progress (e.g. task 0, 1
1302    /// and 3 above), this method will restart and resolve one of the remaining
1303    /// `init` futures.
1304    ///
1305    pub async fn try_get_with<F, E>(&self, key: K, init: F) -> Result<V, Arc<E>>
1306    where
1307        F: Future<Output = Result<V, E>>,
1308        E: Send + Sync + 'static,
1309    {
1310        futures_util::pin_mut!(init);
1311        let hash = self.base.hash(&key);
1312        let key = Arc::new(key);
1313        self.get_or_try_insert_with_hash_and_fun(key, hash, init, false)
1314            .await
1315            .map(Entry::into_value)
1316    }
1317
1318    /// Similar to [`try_get_with`](#method.try_get_with), but instead of passing an
1319    /// owned key, you can pass a reference to the key. If the key does not exist in
1320    /// the cache, the key will be cloned to create new entry in the cache.
1321    pub async fn try_get_with_by_ref<F, E, Q>(&self, key: &Q, init: F) -> Result<V, Arc<E>>
1322    where
1323        F: Future<Output = Result<V, E>>,
1324        E: Send + Sync + 'static,
1325        Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
1326    {
1327        futures_util::pin_mut!(init);
1328        let hash = self.base.hash(key);
1329        self.get_or_try_insert_with_hash_by_ref_and_fun(key, hash, init, false)
1330            .await
1331            .map(Entry::into_value)
1332    }
1333
1334    /// Inserts a key-value pair into the cache.
1335    ///
1336    /// If the cache has this key present, the value is updated.
1337    pub async fn insert(&self, key: K, value: V) {
1338        let hash = self.base.hash(&key);
1339        let key = Arc::new(key);
1340        self.insert_with_hash(key, hash, value).await;
1341    }
1342
1343    /// Discards any cached value for the key.
1344    ///
1345    /// If you need to get the value that has been discarded, use the
1346    /// [`remove`](#method.remove) method instead.
1347    ///
1348    /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
1349    /// on the borrowed form _must_ match those for the key type.
1350    pub async fn invalidate<Q>(&self, key: &Q)
1351    where
1352        Q: Equivalent<K> + Hash + ?Sized,
1353    {
1354        let hash = self.base.hash(key);
1355        self.invalidate_with_hash(key, hash, false).await;
1356    }
1357
1358    /// Discards any cached value for the key and returns a _clone_ of the value.
1359    ///
1360    /// If you do not need to get the value that has been discarded, use the
1361    /// [`invalidate`](#method.invalidate) method instead.
1362    ///
1363    /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
1364    /// on the borrowed form _must_ match those for the key type.
1365    pub async fn remove<Q>(&self, key: &Q) -> Option<V>
1366    where
1367        Q: Equivalent<K> + Hash + ?Sized,
1368    {
1369        let hash = self.base.hash(key);
1370        self.invalidate_with_hash(key, hash, true).await
1371    }
1372
1373    /// Discards all cached values.
1374    ///
1375    /// This method returns immediately by just setting the current time as the
1376    /// invalidation time. `get` and other retrieval methods are guaranteed not to
1377    /// return the entries inserted before or at the invalidation time.
1378    ///
1379    /// The actual removal of the invalidated entries is done as a maintenance task
1380    /// driven by a user thread. For more details, see
1381    /// [the Maintenance Tasks section](../index.html#maintenance-tasks) in the crate
1382    /// level documentation.
1383    ///
1384    /// Like the `invalidate` method, this method does not clear the historic
1385    /// popularity estimator of keys so that it retains the client activities of
1386    /// trying to retrieve an item.
1387    pub fn invalidate_all(&self) {
1388        self.base.invalidate_all();
1389    }
1390
1391    /// Discards cached values that satisfy a predicate.
1392    ///
1393    /// `invalidate_entries_if` takes a closure that returns `true` or `false`. The
1394    /// closure is called against each cached entry inserted before or at the time
1395    /// when this method was called. If the closure returns `true` that entry will be
1396    /// evicted from the cache.
1397    ///
1398    /// This method returns immediately by not actually removing the invalidated
1399    /// entries. Instead, it just sets the predicate to the cache with the time when
1400    /// this method was called. The actual removal of the invalidated entries is done
1401    /// as a maintenance task driven by a user thread. For more details, see
1402    /// [the Maintenance Tasks section](../index.html#maintenance-tasks) in the crate
1403    /// level documentation.
1404    ///
1405    /// Also the `get` and other retrieval methods will apply the closure to a cached
1406    /// entry to determine if it should have been invalidated. Therefore, it is
1407    /// guaranteed that these methods must not return invalidated values.
1408    ///
1409    /// Note that you must call
1410    /// [`CacheBuilder::support_invalidation_closures`][support-invalidation-closures]
1411    /// at the cache creation time as the cache needs to maintain additional internal
1412    /// data structures to support this method. Otherwise, calling this method will
1413    /// fail with a
1414    /// [`PredicateError::InvalidationClosuresDisabled`][invalidation-disabled-error].
1415    ///
1416    /// Like the `invalidate` method, this method does not clear the historic
1417    /// popularity estimator of keys so that it retains the client activities of
1418    /// trying to retrieve an item.
1419    ///
1420    /// [support-invalidation-closures]:
1421    ///     ./struct.CacheBuilder.html#method.support_invalidation_closures
1422    /// [invalidation-disabled-error]:
1423    ///     ../enum.PredicateError.html#variant.InvalidationClosuresDisabled
1424    pub fn invalidate_entries_if<F>(&self, predicate: F) -> Result<PredicateId, PredicateError>
1425    where
1426        F: Fn(&K, &V) -> bool + Send + Sync + 'static,
1427    {
1428        self.base.invalidate_entries_if(Arc::new(predicate))
1429    }
1430
1431    /// Creates an iterator visiting all key-value pairs in arbitrary order. The
1432    /// iterator element type is `(Arc<K>, V)`, where `V` is a clone of a stored
1433    /// value.
1434    ///
1435    /// Iterators do not block concurrent reads and writes on the cache. An entry can
1436    /// be inserted to, invalidated or evicted from a cache while iterators are alive
1437    /// on the same cache.
1438    ///
1439    /// Unlike the `get` method, visiting entries via an iterator do not update the
1440    /// historic popularity estimator or reset idle timers for keys.
1441    ///
1442    /// # Guarantees
1443    ///
1444    /// In order to allow concurrent access to the cache, iterator's `next` method
1445    /// does _not_ guarantee the following:
1446    ///
1447    /// - It does not guarantee to return a key-value pair (an entry) if its key has
1448    ///   been inserted to the cache _after_ the iterator was created.
1449    ///   - Such an entry may or may not be returned depending on key's hash and
1450    ///     timing.
1451    ///
1452    /// and the `next` method guarantees the followings:
1453    ///
1454    /// - It guarantees not to return the same entry more than once.
1455    /// - It guarantees not to return an entry if it has been removed from the cache
1456    ///   after the iterator was created.
1457    ///     - Note: An entry can be removed by following reasons:
1458    ///         - Manually invalidated.
1459    ///         - Expired (e.g. time-to-live).
1460    ///         - Evicted as the cache capacity exceeded.
1461    ///
1462    /// # Examples
1463    ///
1464    /// ```rust
1465    /// // Cargo.toml
1466    /// //
1467    /// // [dependencies]
1468    /// // moka = { version = "0.12", features = ["future"] }
1469    /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
1470    /// use moka::future::Cache;
1471    ///
1472    /// #[tokio::main]
1473    /// async fn main() {
1474    ///     let cache = Cache::new(100);
1475    ///     cache.insert("Julia", 14).await;
1476    ///
1477    ///     let mut iter = cache.iter();
1478    ///     let (k, v) = iter.next().unwrap(); // (Arc<K>, V)
1479    ///     assert_eq!(*k, "Julia");
1480    ///     assert_eq!(v, 14);
1481    ///
1482    ///     assert!(iter.next().is_none());
1483    /// }
1484    /// ```
1485    ///
1486    pub fn iter(&self) -> Iter<'_, K, V> {
1487        use crate::common::iter::{Iter as InnerIter, ScanningGet};
1488
1489        let inner = InnerIter::with_single_cache_segment(&self.base, self.base.num_cht_segments());
1490        Iter::new(inner)
1491    }
1492
1493    /// Performs any pending maintenance operations needed by the cache.
1494    pub async fn run_pending_tasks(&self) {
1495        if let Some(hk) = &self.base.housekeeper {
1496            self.base.retry_interrupted_ops().await;
1497            hk.run_pending_tasks(Arc::clone(&self.base.inner)).await;
1498        }
1499    }
1500}
1501
1502impl<'a, K, V, S> IntoIterator for &'a Cache<K, V, S>
1503where
1504    K: Hash + Eq + Send + Sync + 'static,
1505    V: Clone + Send + Sync + 'static,
1506    S: BuildHasher + Clone + Send + Sync + 'static,
1507{
1508    type Item = (Arc<K>, V);
1509
1510    type IntoIter = Iter<'a, K, V>;
1511
1512    fn into_iter(self) -> Self::IntoIter {
1513        self.iter()
1514    }
1515}
1516
1517//
1518// private methods
1519//
1520impl<K, V, S> Cache<K, V, S>
1521where
1522    K: Hash + Eq + Send + Sync + 'static,
1523    V: Clone + Send + Sync + 'static,
1524    S: BuildHasher + Clone + Send + Sync + 'static,
1525{
1526    pub(crate) async fn get_or_insert_with_hash_and_fun(
1527        &self,
1528        key: Arc<K>,
1529        hash: u64,
1530        init: Pin<&mut impl Future<Output = V>>,
1531        mut replace_if: Option<impl FnMut(&V) -> bool + Send>,
1532        need_key: bool,
1533    ) -> Entry<K, V> {
1534        let maybe_entry = self
1535            .base
1536            .get_with_hash(&*key, hash, replace_if.as_mut(), need_key, true)
1537            .await;
1538        if let Some(entry) = maybe_entry {
1539            entry
1540        } else {
1541            self.insert_with_hash_and_fun(key, hash, init, replace_if, need_key)
1542                .await
1543        }
1544    }
1545
1546    pub(crate) async fn get_or_insert_with_hash_by_ref_and_fun<Q>(
1547        &self,
1548        key: &Q,
1549        hash: u64,
1550        init: Pin<&mut impl Future<Output = V>>,
1551        mut replace_if: Option<impl FnMut(&V) -> bool + Send>,
1552        need_key: bool,
1553    ) -> Entry<K, V>
1554    where
1555        Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
1556    {
1557        let maybe_entry = self
1558            .base
1559            .get_with_hash(key, hash, replace_if.as_mut(), need_key, true)
1560            .await;
1561        if let Some(entry) = maybe_entry {
1562            entry
1563        } else {
1564            let key = Arc::new(key.to_owned());
1565            self.insert_with_hash_and_fun(key, hash, init, replace_if, need_key)
1566                .await
1567        }
1568    }
1569
1570    async fn insert_with_hash_and_fun(
1571        &self,
1572        key: Arc<K>,
1573        hash: u64,
1574        init: Pin<&mut impl Future<Output = V>>,
1575        replace_if: Option<impl FnMut(&V) -> bool + Send>,
1576        need_key: bool,
1577    ) -> Entry<K, V> {
1578        let k = if need_key {
1579            Some(Arc::clone(&key))
1580        } else {
1581            None
1582        };
1583
1584        let type_id = ValueInitializer::<K, V, S>::type_id_for_get_with();
1585        let post_init = ValueInitializer::<K, V, S>::post_init_for_get_with;
1586
1587        match self
1588            .value_initializer
1589            .try_init_or_read(&key, hash, type_id, self, replace_if, init, post_init)
1590            .await
1591        {
1592            InitResult::Initialized(v) => {
1593                crossbeam_epoch::pin().flush();
1594                Entry::new(k, v, true, false)
1595            }
1596            InitResult::ReadExisting(v) => Entry::new(k, v, false, false),
1597            InitResult::InitErr(_) => unreachable!(),
1598        }
1599    }
1600
1601    pub(crate) async fn get_or_insert_with_hash(
1602        &self,
1603        key: Arc<K>,
1604        hash: u64,
1605        init: impl FnOnce() -> V,
1606    ) -> Entry<K, V> {
1607        match self
1608            .base
1609            .get_with_hash(&*key, hash, never_ignore(), true, true)
1610            .await
1611        {
1612            Some(entry) => entry,
1613            None => {
1614                let value = init();
1615                self.insert_with_hash(Arc::clone(&key), hash, value.clone())
1616                    .await;
1617                Entry::new(Some(key), value, true, false)
1618            }
1619        }
1620    }
1621
1622    pub(crate) async fn get_or_insert_with_hash_by_ref<Q>(
1623        &self,
1624        key: &Q,
1625        hash: u64,
1626        init: impl FnOnce() -> V,
1627    ) -> Entry<K, V>
1628    where
1629        Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
1630    {
1631        match self
1632            .base
1633            .get_with_hash(key, hash, never_ignore(), true, true)
1634            .await
1635        {
1636            Some(entry) => entry,
1637            None => {
1638                let key = Arc::new(key.to_owned());
1639                let value = init();
1640                self.insert_with_hash(Arc::clone(&key), hash, value.clone())
1641                    .await;
1642                Entry::new(Some(key), value, true, false)
1643            }
1644        }
1645    }
1646
1647    pub(crate) async fn get_or_optionally_insert_with_hash_and_fun<F>(
1648        &self,
1649        key: Arc<K>,
1650        hash: u64,
1651        init: Pin<&mut F>,
1652        need_key: bool,
1653    ) -> Option<Entry<K, V>>
1654    where
1655        F: Future<Output = Option<V>>,
1656    {
1657        let entry = self
1658            .base
1659            .get_with_hash(&*key, hash, never_ignore(), need_key, true)
1660            .await;
1661        if entry.is_some() {
1662            return entry;
1663        }
1664
1665        self.optionally_insert_with_hash_and_fun(key, hash, init, need_key)
1666            .await
1667    }
1668
1669    pub(crate) async fn get_or_optionally_insert_with_hash_by_ref_and_fun<F, Q>(
1670        &self,
1671        key: &Q,
1672        hash: u64,
1673        init: Pin<&mut F>,
1674        need_key: bool,
1675    ) -> Option<Entry<K, V>>
1676    where
1677        F: Future<Output = Option<V>>,
1678        Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
1679    {
1680        let entry = self
1681            .base
1682            .get_with_hash(key, hash, never_ignore(), need_key, true)
1683            .await;
1684        if entry.is_some() {
1685            return entry;
1686        }
1687
1688        let key = Arc::new(key.to_owned());
1689        self.optionally_insert_with_hash_and_fun(key, hash, init, need_key)
1690            .await
1691    }
1692
1693    async fn optionally_insert_with_hash_and_fun<F>(
1694        &self,
1695        key: Arc<K>,
1696        hash: u64,
1697        init: Pin<&mut F>,
1698        need_key: bool,
1699    ) -> Option<Entry<K, V>>
1700    where
1701        F: Future<Output = Option<V>>,
1702    {
1703        let k = if need_key {
1704            Some(Arc::clone(&key))
1705        } else {
1706            None
1707        };
1708
1709        let type_id = ValueInitializer::<K, V, S>::type_id_for_optionally_get_with();
1710        let post_init = ValueInitializer::<K, V, S>::post_init_for_optionally_get_with;
1711
1712        match self
1713            .value_initializer
1714            .try_init_or_read(&key, hash, type_id, self, never_ignore(), init, post_init)
1715            .await
1716        {
1717            InitResult::Initialized(v) => {
1718                crossbeam_epoch::pin().flush();
1719                Some(Entry::new(k, v, true, false))
1720            }
1721            InitResult::ReadExisting(v) => Some(Entry::new(k, v, false, false)),
1722            InitResult::InitErr(_) => None,
1723        }
1724    }
1725
1726    pub(super) async fn get_or_try_insert_with_hash_and_fun<F, E>(
1727        &self,
1728        key: Arc<K>,
1729        hash: u64,
1730        init: Pin<&mut F>,
1731        need_key: bool,
1732    ) -> Result<Entry<K, V>, Arc<E>>
1733    where
1734        F: Future<Output = Result<V, E>>,
1735        E: Send + Sync + 'static,
1736    {
1737        if let Some(entry) = self
1738            .base
1739            .get_with_hash(&*key, hash, never_ignore(), need_key, true)
1740            .await
1741        {
1742            return Ok(entry);
1743        }
1744
1745        self.try_insert_with_hash_and_fun(key, hash, init, need_key)
1746            .await
1747    }
1748
1749    pub(super) async fn get_or_try_insert_with_hash_by_ref_and_fun<F, E, Q>(
1750        &self,
1751        key: &Q,
1752        hash: u64,
1753        init: Pin<&mut F>,
1754        need_key: bool,
1755    ) -> Result<Entry<K, V>, Arc<E>>
1756    where
1757        F: Future<Output = Result<V, E>>,
1758        E: Send + Sync + 'static,
1759        Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
1760    {
1761        if let Some(entry) = self
1762            .base
1763            .get_with_hash(key, hash, never_ignore(), need_key, true)
1764            .await
1765        {
1766            return Ok(entry);
1767        }
1768        let key = Arc::new(key.to_owned());
1769        self.try_insert_with_hash_and_fun(key, hash, init, need_key)
1770            .await
1771    }
1772
1773    async fn try_insert_with_hash_and_fun<F, E>(
1774        &self,
1775        key: Arc<K>,
1776        hash: u64,
1777        init: Pin<&mut F>,
1778        need_key: bool,
1779    ) -> Result<Entry<K, V>, Arc<E>>
1780    where
1781        F: Future<Output = Result<V, E>>,
1782        E: Send + Sync + 'static,
1783    {
1784        let k = if need_key {
1785            Some(Arc::clone(&key))
1786        } else {
1787            None
1788        };
1789
1790        let type_id = ValueInitializer::<K, V, S>::type_id_for_try_get_with::<E>();
1791        let post_init = ValueInitializer::<K, V, S>::post_init_for_try_get_with;
1792
1793        match self
1794            .value_initializer
1795            .try_init_or_read(&key, hash, type_id, self, never_ignore(), init, post_init)
1796            .await
1797        {
1798            InitResult::Initialized(v) => {
1799                crossbeam_epoch::pin().flush();
1800                Ok(Entry::new(k, v, true, false))
1801            }
1802            InitResult::ReadExisting(v) => Ok(Entry::new(k, v, false, false)),
1803            InitResult::InitErr(e) => {
1804                crossbeam_epoch::pin().flush();
1805                Err(e)
1806            }
1807        }
1808    }
1809
1810    pub(crate) async fn insert_with_hash(&self, key: Arc<K>, hash: u64, value: V) {
1811        if self.base.is_map_disabled() {
1812            return;
1813        }
1814
1815        let (op, ts) = self.base.do_insert_with_hash(key, hash, value).await;
1816        let mut cancel_guard = CancelGuard::new(&self.base.interrupted_op_ch_snd, ts);
1817        cancel_guard.set_op(op.clone());
1818
1819        let should_block;
1820        #[cfg(not(test))]
1821        {
1822            should_block = false;
1823        }
1824        #[cfg(test)]
1825        {
1826            should_block = self.schedule_write_op_should_block.load(Ordering::Acquire);
1827        }
1828
1829        let hk = self.base.housekeeper.as_ref();
1830        let event = self.base.write_op_ch_ready_event();
1831
1832        BaseCache::<K, V, S>::schedule_write_op(
1833            &self.base.inner,
1834            &self.base.write_op_ch,
1835            event,
1836            op,
1837            ts,
1838            hk,
1839            should_block,
1840        )
1841        .await
1842        .expect("Failed to schedule write op for insert");
1843        cancel_guard.clear();
1844    }
1845
1846    pub(crate) async fn compute_with_hash_and_fun<F, Fut>(
1847        &self,
1848        key: Arc<K>,
1849        hash: u64,
1850        f: F,
1851    ) -> compute::CompResult<K, V>
1852    where
1853        F: FnOnce(Option<Entry<K, V>>) -> Fut,
1854        Fut: Future<Output = compute::Op<V>>,
1855    {
1856        let post_init = ValueInitializer::<K, V, S>::post_init_for_compute_with;
1857        match self
1858            .value_initializer
1859            .try_compute(key, hash, self, f, post_init, true)
1860            .await
1861        {
1862            Ok(result) => result,
1863            Err(_) => unreachable!(),
1864        }
1865    }
1866
1867    pub(crate) async fn try_compute_with_hash_and_fun<F, Fut, E>(
1868        &self,
1869        key: Arc<K>,
1870        hash: u64,
1871        f: F,
1872    ) -> Result<compute::CompResult<K, V>, E>
1873    where
1874        F: FnOnce(Option<Entry<K, V>>) -> Fut,
1875        Fut: Future<Output = Result<compute::Op<V>, E>>,
1876        E: Send + Sync + 'static,
1877    {
1878        let post_init = ValueInitializer::<K, V, S>::post_init_for_try_compute_with;
1879        self.value_initializer
1880            .try_compute(key, hash, self, f, post_init, true)
1881            .await
1882    }
1883
1884    pub(crate) async fn try_compute_if_nobody_else_with_hash_and_fun<F, Fut, E>(
1885        &self,
1886        key: Arc<K>,
1887        hash: u64,
1888        f: F,
1889    ) -> Result<compute::CompResult<K, V>, E>
1890    where
1891        F: FnOnce(Option<Entry<K, V>>) -> Fut,
1892        Fut: Future<Output = Result<compute::Op<V>, E>>,
1893        E: Send + Sync + 'static,
1894    {
1895        let post_init = ValueInitializer::<K, V, S>::post_init_for_try_compute_with_if_nobody_else;
1896        self.value_initializer
1897            .try_compute_if_nobody_else(key, hash, self, f, post_init, true)
1898            .await
1899    }
1900
1901    pub(crate) async fn upsert_with_hash_and_fun<F, Fut>(
1902        &self,
1903        key: Arc<K>,
1904        hash: u64,
1905        f: F,
1906    ) -> Entry<K, V>
1907    where
1908        F: FnOnce(Option<Entry<K, V>>) -> Fut,
1909        Fut: Future<Output = V>,
1910    {
1911        let post_init = ValueInitializer::<K, V, S>::post_init_for_upsert_with;
1912        match self
1913            .value_initializer
1914            .try_compute(key, hash, self, f, post_init, false)
1915            .await
1916        {
1917            Ok(CompResult::Inserted(entry) | CompResult::ReplacedWith(entry)) => entry,
1918            _ => unreachable!(),
1919        }
1920    }
1921
1922    pub(crate) async fn invalidate_with_hash<Q>(
1923        &self,
1924        key: &Q,
1925        hash: u64,
1926        need_value: bool,
1927    ) -> Option<V>
1928    where
1929        Q: Equivalent<K> + Hash + ?Sized,
1930    {
1931        use futures_util::FutureExt;
1932
1933        self.base.retry_interrupted_ops().await;
1934
1935        // Lock the key for removal if blocking removal notification is enabled.
1936        let mut kl = None;
1937        let mut klg = None;
1938        if self.base.is_removal_notifier_enabled() {
1939            // To lock the key, we have to get Arc<K> for key (&Q).
1940            //
1941            // TODO: Enhance this if possible. This is rather hack now because
1942            // it cannot prevent race conditions like this:
1943            //
1944            // 1. We miss the key because it does not exist. So we do not lock
1945            //    the key.
1946            // 2. Somebody else (other thread) inserts the key.
1947            // 3. We remove the entry for the key, but without the key lock!
1948            //
1949            if let Some(arc_key) = self.base.get_key_with_hash(key, hash) {
1950                kl = self.base.maybe_key_lock(&arc_key);
1951                klg = if let Some(lock) = &kl {
1952                    Some(lock.lock().await)
1953                } else {
1954                    None
1955                };
1956            }
1957        }
1958
1959        match self.base.remove_entry(key, hash) {
1960            None => None,
1961            Some(kv) => {
1962                let now = self.base.current_time();
1963
1964                let maybe_v = if need_value {
1965                    Some(kv.entry.value.clone())
1966                } else {
1967                    None
1968                };
1969
1970                let info = kv.entry.entry_info();
1971                let entry_gen = info.incr_entry_gen();
1972
1973                let op: WriteOp<K, V> = WriteOp::Remove {
1974                    kv_entry: kv.clone(),
1975                    entry_gen,
1976                };
1977
1978                // Async Cancellation Safety: To ensure the below future should be
1979                // executed even if our caller async task is cancelled, we create a
1980                // cancel guard for the future (and the op). If our caller is
1981                // cancelled while we are awaiting for the future, the cancel guard
1982                // will save the future and the op to the interrupted_op_ch channel,
1983                // so that we can resume/retry later.
1984                let mut cancel_guard = CancelGuard::new(&self.base.interrupted_op_ch_snd, now);
1985
1986                if self.base.is_removal_notifier_enabled() {
1987                    let future = self
1988                        .base
1989                        .notify_invalidate(&kv.key, &kv.entry)
1990                        .boxed()
1991                        .shared();
1992                    cancel_guard.set_future_and_op(future.clone(), op.clone());
1993                    // Send notification to the eviction listener.
1994                    future.await;
1995                    cancel_guard.unset_future();
1996                } else {
1997                    cancel_guard.set_op(op.clone());
1998                }
1999
2000                // Drop the locks before scheduling write op to avoid a potential
2001                // dead lock. (Scheduling write can do spin lock when the queue is
2002                // full, and queue will be drained by the housekeeping thread that
2003                // can lock the same key)
2004                std::mem::drop(klg);
2005                std::mem::drop(kl);
2006
2007                let should_block;
2008                #[cfg(not(test))]
2009                {
2010                    should_block = false;
2011                }
2012                #[cfg(test)]
2013                {
2014                    should_block = self.schedule_write_op_should_block.load(Ordering::Acquire);
2015                }
2016
2017                let event = self.base.write_op_ch_ready_event();
2018                let hk = self.base.housekeeper.as_ref();
2019
2020                BaseCache::<K, V, S>::schedule_write_op(
2021                    &self.base.inner,
2022                    &self.base.write_op_ch,
2023                    event,
2024                    op,
2025                    now,
2026                    hk,
2027                    should_block,
2028                )
2029                .await
2030                .expect("Failed to schedule write op for remove");
2031                cancel_guard.clear();
2032
2033                crossbeam_epoch::pin().flush();
2034                maybe_v
2035            }
2036        }
2037    }
2038}
2039
2040// For unit tests.
2041// For unit tests.
2042#[cfg(test)]
2043impl<K, V, S> Cache<K, V, S> {
2044    pub(crate) fn is_table_empty(&self) -> bool {
2045        self.entry_count() == 0
2046    }
2047
2048    pub(crate) fn is_waiter_map_empty(&self) -> bool {
2049        self.value_initializer.waiter_count() == 0
2050    }
2051}
2052
2053#[cfg(test)]
2054impl<K, V, S> Cache<K, V, S>
2055where
2056    K: Hash + Eq + Send + Sync + 'static,
2057    V: Clone + Send + Sync + 'static,
2058    S: BuildHasher + Clone + Send + Sync + 'static,
2059{
2060    fn invalidation_predicate_count(&self) -> usize {
2061        self.base.invalidation_predicate_count()
2062    }
2063
2064    async fn reconfigure_for_testing(&mut self) {
2065        self.base.reconfigure_for_testing().await;
2066    }
2067
2068    fn key_locks_map_is_empty(&self) -> bool {
2069        self.base.key_locks_map_is_empty()
2070    }
2071
2072    fn run_pending_tasks_initiation_count(&self) -> usize {
2073        self.base
2074            .housekeeper
2075            .as_ref()
2076            .map(|hk| hk.start_count.load(Ordering::Acquire))
2077            .expect("housekeeper is not set")
2078    }
2079
2080    fn run_pending_tasks_completion_count(&self) -> usize {
2081        self.base
2082            .housekeeper
2083            .as_ref()
2084            .map(|hk| hk.complete_count.load(Ordering::Acquire))
2085            .expect("housekeeper is not set")
2086    }
2087}
2088
2089// AS of Rust 1.71, we cannot make this function into a `const fn` because mutable
2090// references are not allowed.
2091// See [#57349](https://github.com/rust-lang/rust/issues/57349).
2092#[inline]
2093fn never_ignore<'a, V>() -> Option<&'a mut fn(&V) -> bool> {
2094    None
2095}
2096
2097// To see the debug prints, run test as `cargo test -- --nocapture`
2098#[cfg(test)]
2099mod tests {
2100    use super::Cache;
2101    use crate::{
2102        common::{time::Clock, HousekeeperConfig},
2103        future::FutureExt,
2104        notification::{ListenerFuture, RemovalCause},
2105        ops::compute,
2106        policy::{test_utils::ExpiryCallCounters, EvictionPolicy},
2107        Expiry,
2108    };
2109
2110    use async_lock::{Barrier, Mutex};
2111    use std::{
2112        convert::Infallible,
2113        sync::{
2114            atomic::{AtomicU32, AtomicU8, Ordering},
2115            Arc,
2116        },
2117        time::{Duration, Instant as StdInstant},
2118        vec,
2119    };
2120    use tokio::time::sleep;
2121
2122    #[test]
2123    fn futures_are_send() {
2124        let cache = Cache::new(0);
2125
2126        fn is_send(_: impl Send) {}
2127
2128        // pub fns
2129        is_send(cache.get(&()));
2130        is_send(cache.get_with((), async {}));
2131        is_send(cache.get_with_by_ref(&(), async {}));
2132        #[allow(deprecated)]
2133        is_send(cache.get_with_if((), async {}, |_| false));
2134        is_send(cache.insert((), ()));
2135        is_send(cache.invalidate(&()));
2136        is_send(cache.optionally_get_with((), async { None }));
2137        is_send(cache.optionally_get_with_by_ref(&(), async { None }));
2138        is_send(cache.remove(&()));
2139        is_send(cache.run_pending_tasks());
2140        is_send(cache.try_get_with((), async { Err(()) }));
2141        is_send(cache.try_get_with_by_ref(&(), async { Err(()) }));
2142
2143        // entry fns
2144        is_send(
2145            cache
2146                .entry(())
2147                .and_compute_with(|_| async { compute::Op::Nop }),
2148        );
2149        is_send(
2150            cache
2151                .entry(())
2152                .and_try_compute_with(|_| async { Ok(compute::Op::Nop) as Result<_, Infallible> }),
2153        );
2154        is_send(cache.entry(()).and_upsert_with(|_| async {}));
2155        is_send(cache.entry(()).or_default());
2156        is_send(cache.entry(()).or_insert(()));
2157        is_send(cache.entry(()).or_insert_with(async {}));
2158        is_send(cache.entry(()).or_insert_with_if(async {}, |_| false));
2159        is_send(cache.entry(()).or_optionally_insert_with(async { None }));
2160        is_send(cache.entry(()).or_try_insert_with(async { Err(()) }));
2161
2162        // entry_by_ref fns
2163        is_send(
2164            cache
2165                .entry_by_ref(&())
2166                .and_compute_with(|_| async { compute::Op::Nop }),
2167        );
2168        is_send(
2169            cache
2170                .entry_by_ref(&())
2171                .and_try_compute_with(|_| async { Ok(compute::Op::Nop) as Result<_, Infallible> }),
2172        );
2173        is_send(cache.entry_by_ref(&()).and_upsert_with(|_| async {}));
2174        is_send(cache.entry_by_ref(&()).or_default());
2175        is_send(cache.entry_by_ref(&()).or_insert(()));
2176        is_send(cache.entry_by_ref(&()).or_insert_with(async {}));
2177        is_send(
2178            cache
2179                .entry_by_ref(&())
2180                .or_insert_with_if(async {}, |_| false),
2181        );
2182        is_send(
2183            cache
2184                .entry_by_ref(&())
2185                .or_optionally_insert_with(async { None }),
2186        );
2187        is_send(
2188            cache
2189                .entry_by_ref(&())
2190                .or_try_insert_with(async { Err(()) }),
2191        );
2192    }
2193
2194    #[tokio::test]
2195    async fn max_capacity_zero() {
2196        let mut cache = Cache::new(0);
2197        cache.reconfigure_for_testing().await;
2198
2199        // Make the cache exterior immutable.
2200        let cache = cache;
2201
2202        cache.insert(0, ()).await;
2203
2204        assert!(!cache.contains_key(&0));
2205        assert!(cache.get(&0).await.is_none());
2206        cache.run_pending_tasks().await;
2207        assert!(!cache.contains_key(&0));
2208        assert!(cache.get(&0).await.is_none());
2209        assert_eq!(cache.entry_count(), 0)
2210    }
2211
2212    #[tokio::test]
2213    async fn basic_single_async_task() {
2214        // The following `Vec`s will hold actual and expected notifications.
2215        let actual = Arc::new(Mutex::new(Vec::new()));
2216        let mut expected = Vec::new();
2217
2218        // Create an eviction listener.
2219        let a1 = Arc::clone(&actual);
2220        let listener = move |k, v, cause| -> ListenerFuture {
2221            let a2 = Arc::clone(&a1);
2222            async move {
2223                a2.lock().await.push((k, v, cause));
2224            }
2225            .boxed()
2226        };
2227
2228        // Create a cache with the eviction listener.
2229        let mut cache = Cache::builder()
2230            .max_capacity(3)
2231            .async_eviction_listener(listener)
2232            .build();
2233        cache.reconfigure_for_testing().await;
2234
2235        // Make the cache exterior immutable.
2236        let cache = cache;
2237
2238        cache.insert("a", "alice").await;
2239        cache.insert("b", "bob").await;
2240        assert_eq!(cache.get(&"a").await, Some("alice"));
2241        assert!(cache.contains_key(&"a"));
2242        assert!(cache.contains_key(&"b"));
2243        assert_eq!(cache.get(&"b").await, Some("bob"));
2244        cache.run_pending_tasks().await;
2245        // counts: a -> 1, b -> 1
2246
2247        cache.insert("c", "cindy").await;
2248        assert_eq!(cache.get(&"c").await, Some("cindy"));
2249        assert!(cache.contains_key(&"c"));
2250        // counts: a -> 1, b -> 1, c -> 1
2251        cache.run_pending_tasks().await;
2252
2253        assert!(cache.contains_key(&"a"));
2254        assert_eq!(cache.get(&"a").await, Some("alice"));
2255        assert_eq!(cache.get(&"b").await, Some("bob"));
2256        assert!(cache.contains_key(&"b"));
2257        cache.run_pending_tasks().await;
2258        // counts: a -> 2, b -> 2, c -> 1
2259
2260        // "d" should not be admitted because its frequency is too low.
2261        cache.insert("d", "david").await; //   count: d -> 0
2262        expected.push((Arc::new("d"), "david", RemovalCause::Size));
2263        cache.run_pending_tasks().await;
2264        assert_eq!(cache.get(&"d").await, None); //   d -> 1
2265        assert!(!cache.contains_key(&"d"));
2266
2267        cache.insert("d", "david").await;
2268        expected.push((Arc::new("d"), "david", RemovalCause::Size));
2269        cache.run_pending_tasks().await;
2270        assert!(!cache.contains_key(&"d"));
2271        assert_eq!(cache.get(&"d").await, None); //   d -> 2
2272
2273        // "d" should be admitted and "c" should be evicted
2274        // because d's frequency is higher than c's.
2275        cache.insert("d", "dennis").await;
2276        expected.push((Arc::new("c"), "cindy", RemovalCause::Size));
2277        cache.run_pending_tasks().await;
2278        assert_eq!(cache.get(&"a").await, Some("alice"));
2279        assert_eq!(cache.get(&"b").await, Some("bob"));
2280        assert_eq!(cache.get(&"c").await, None);
2281        assert_eq!(cache.get(&"d").await, Some("dennis"));
2282        assert!(cache.contains_key(&"a"));
2283        assert!(cache.contains_key(&"b"));
2284        assert!(!cache.contains_key(&"c"));
2285        assert!(cache.contains_key(&"d"));
2286
2287        cache.invalidate(&"b").await;
2288        expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
2289        cache.run_pending_tasks().await;
2290        assert_eq!(cache.get(&"b").await, None);
2291        assert!(!cache.contains_key(&"b"));
2292
2293        assert!(cache.remove(&"b").await.is_none());
2294        assert_eq!(cache.remove(&"d").await, Some("dennis"));
2295        expected.push((Arc::new("d"), "dennis", RemovalCause::Explicit));
2296        cache.run_pending_tasks().await;
2297        assert_eq!(cache.get(&"d").await, None);
2298        assert!(!cache.contains_key(&"d"));
2299
2300        verify_notification_vec(&cache, actual, &expected).await;
2301        assert!(cache.key_locks_map_is_empty());
2302    }
2303
2304    #[tokio::test]
2305    async fn basic_lru_single_thread() {
2306        // The following `Vec`s will hold actual and expected notifications.
2307        let actual = Arc::new(Mutex::new(Vec::new()));
2308        let mut expected = Vec::new();
2309
2310        // Create an eviction listener.
2311        let a1 = Arc::clone(&actual);
2312        let listener = move |k, v, cause| -> ListenerFuture {
2313            let a2 = Arc::clone(&a1);
2314            async move {
2315                a2.lock().await.push((k, v, cause));
2316            }
2317            .boxed()
2318        };
2319
2320        // Create a cache with the eviction listener.
2321        let mut cache = Cache::builder()
2322            .max_capacity(3)
2323            .eviction_policy(EvictionPolicy::lru())
2324            .async_eviction_listener(listener)
2325            .build();
2326        cache.reconfigure_for_testing().await;
2327
2328        // Make the cache exterior immutable.
2329        let cache = cache;
2330
2331        cache.insert("a", "alice").await;
2332        cache.insert("b", "bob").await;
2333        assert_eq!(cache.get(&"a").await, Some("alice"));
2334        assert!(cache.contains_key(&"a"));
2335        assert!(cache.contains_key(&"b"));
2336        assert_eq!(cache.get(&"b").await, Some("bob"));
2337        cache.run_pending_tasks().await;
2338        // a -> b
2339
2340        cache.insert("c", "cindy").await;
2341        assert_eq!(cache.get(&"c").await, Some("cindy"));
2342        assert!(cache.contains_key(&"c"));
2343        cache.run_pending_tasks().await;
2344        // a -> b -> c
2345
2346        assert!(cache.contains_key(&"a"));
2347        assert_eq!(cache.get(&"a").await, Some("alice"));
2348        assert_eq!(cache.get(&"b").await, Some("bob"));
2349        assert!(cache.contains_key(&"b"));
2350        cache.run_pending_tasks().await;
2351        // c -> a -> b
2352
2353        // "d" should be admitted because the cache uses the LRU strategy.
2354        cache.insert("d", "david").await;
2355        // "c" is the LRU and should have be evicted.
2356        expected.push((Arc::new("c"), "cindy", RemovalCause::Size));
2357        cache.run_pending_tasks().await;
2358
2359        assert_eq!(cache.get(&"a").await, Some("alice"));
2360        assert_eq!(cache.get(&"b").await, Some("bob"));
2361        assert_eq!(cache.get(&"c").await, None);
2362        assert_eq!(cache.get(&"d").await, Some("david"));
2363        assert!(cache.contains_key(&"a"));
2364        assert!(cache.contains_key(&"b"));
2365        assert!(!cache.contains_key(&"c"));
2366        assert!(cache.contains_key(&"d"));
2367        cache.run_pending_tasks().await;
2368        // a -> b -> d
2369
2370        cache.invalidate(&"b").await;
2371        expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
2372        cache.run_pending_tasks().await;
2373        // a -> d
2374        assert_eq!(cache.get(&"b").await, None);
2375        assert!(!cache.contains_key(&"b"));
2376
2377        assert!(cache.remove(&"b").await.is_none());
2378        assert_eq!(cache.remove(&"d").await, Some("david"));
2379        expected.push((Arc::new("d"), "david", RemovalCause::Explicit));
2380        cache.run_pending_tasks().await;
2381        // a
2382        assert_eq!(cache.get(&"d").await, None);
2383        assert!(!cache.contains_key(&"d"));
2384
2385        cache.insert("e", "emily").await;
2386        cache.insert("f", "frank").await;
2387        // "a" should be evicted because it is the LRU.
2388        cache.insert("g", "gina").await;
2389        expected.push((Arc::new("a"), "alice", RemovalCause::Size));
2390        cache.run_pending_tasks().await;
2391        // e -> f -> g
2392        assert_eq!(cache.get(&"a").await, None);
2393        assert_eq!(cache.get(&"e").await, Some("emily"));
2394        assert_eq!(cache.get(&"f").await, Some("frank"));
2395        assert_eq!(cache.get(&"g").await, Some("gina"));
2396        assert!(!cache.contains_key(&"a"));
2397        assert!(cache.contains_key(&"e"));
2398        assert!(cache.contains_key(&"f"));
2399        assert!(cache.contains_key(&"g"));
2400
2401        verify_notification_vec(&cache, actual, &expected).await;
2402        assert!(cache.key_locks_map_is_empty());
2403    }
2404
2405    #[tokio::test]
2406    async fn size_aware_eviction() {
2407        let weigher = |_k: &&str, v: &(&str, u32)| v.1;
2408
2409        let alice = ("alice", 10);
2410        let bob = ("bob", 15);
2411        let bill = ("bill", 20);
2412        let cindy = ("cindy", 5);
2413        let david = ("david", 15);
2414        let dennis = ("dennis", 15);
2415
2416        // The following `Vec`s will hold actual and expected notifications.
2417        let actual = Arc::new(Mutex::new(Vec::new()));
2418        let mut expected = Vec::new();
2419
2420        // Create an eviction listener.
2421        let a1 = Arc::clone(&actual);
2422        let listener = move |k, v, cause| -> ListenerFuture {
2423            let a2 = Arc::clone(&a1);
2424            async move {
2425                a2.lock().await.push((k, v, cause));
2426            }
2427            .boxed()
2428        };
2429
2430        // Create a cache with the eviction listener.
2431        let mut cache = Cache::builder()
2432            .max_capacity(31)
2433            .weigher(weigher)
2434            .async_eviction_listener(listener)
2435            .build();
2436        cache.reconfigure_for_testing().await;
2437
2438        // Make the cache exterior immutable.
2439        let cache = cache;
2440
2441        cache.insert("a", alice).await;
2442        cache.insert("b", bob).await;
2443        assert_eq!(cache.get(&"a").await, Some(alice));
2444        assert!(cache.contains_key(&"a"));
2445        assert!(cache.contains_key(&"b"));
2446        assert_eq!(cache.get(&"b").await, Some(bob));
2447        cache.run_pending_tasks().await;
2448        // order (LRU -> MRU) and counts: a -> 1, b -> 1
2449
2450        cache.insert("c", cindy).await;
2451        assert_eq!(cache.get(&"c").await, Some(cindy));
2452        assert!(cache.contains_key(&"c"));
2453        // order and counts: a -> 1, b -> 1, c -> 1
2454        cache.run_pending_tasks().await;
2455
2456        assert!(cache.contains_key(&"a"));
2457        assert_eq!(cache.get(&"a").await, Some(alice));
2458        assert_eq!(cache.get(&"b").await, Some(bob));
2459        assert!(cache.contains_key(&"b"));
2460        cache.run_pending_tasks().await;
2461        // order and counts: c -> 1, a -> 2, b -> 2
2462
2463        // To enter "d" (weight: 15), it needs to evict "c" (w: 5) and "a" (w: 10).
2464        // "d" must have higher count than 3, which is the aggregated count
2465        // of "a" and "c".
2466        cache.insert("d", david).await; //   count: d -> 0
2467        expected.push((Arc::new("d"), david, RemovalCause::Size));
2468        cache.run_pending_tasks().await;
2469        assert_eq!(cache.get(&"d").await, None); //   d -> 1
2470        assert!(!cache.contains_key(&"d"));
2471
2472        cache.insert("d", david).await;
2473        expected.push((Arc::new("d"), david, RemovalCause::Size));
2474        cache.run_pending_tasks().await;
2475        assert!(!cache.contains_key(&"d"));
2476        assert_eq!(cache.get(&"d").await, None); //   d -> 2
2477
2478        cache.insert("d", david).await;
2479        expected.push((Arc::new("d"), david, RemovalCause::Size));
2480        cache.run_pending_tasks().await;
2481        assert_eq!(cache.get(&"d").await, None); //   d -> 3
2482        assert!(!cache.contains_key(&"d"));
2483
2484        cache.insert("d", david).await;
2485        expected.push((Arc::new("d"), david, RemovalCause::Size));
2486        cache.run_pending_tasks().await;
2487        assert!(!cache.contains_key(&"d"));
2488        assert_eq!(cache.get(&"d").await, None); //   d -> 4
2489
2490        // Finally "d" should be admitted by evicting "c" and "a".
2491        cache.insert("d", dennis).await;
2492        expected.push((Arc::new("c"), cindy, RemovalCause::Size));
2493        expected.push((Arc::new("a"), alice, RemovalCause::Size));
2494        cache.run_pending_tasks().await;
2495        assert_eq!(cache.get(&"a").await, None);
2496        assert_eq!(cache.get(&"b").await, Some(bob));
2497        assert_eq!(cache.get(&"c").await, None);
2498        assert_eq!(cache.get(&"d").await, Some(dennis));
2499        assert!(!cache.contains_key(&"a"));
2500        assert!(cache.contains_key(&"b"));
2501        assert!(!cache.contains_key(&"c"));
2502        assert!(cache.contains_key(&"d"));
2503
2504        // Update "b" with "bill" (w: 15 -> 20). This should evict "d" (w: 15).
2505        cache.insert("b", bill).await;
2506        expected.push((Arc::new("b"), bob, RemovalCause::Replaced));
2507        expected.push((Arc::new("d"), dennis, RemovalCause::Size));
2508        cache.run_pending_tasks().await;
2509        assert_eq!(cache.get(&"b").await, Some(bill));
2510        assert_eq!(cache.get(&"d").await, None);
2511        assert!(cache.contains_key(&"b"));
2512        assert!(!cache.contains_key(&"d"));
2513
2514        // Re-add "a" (w: 10) and update "b" with "bob" (w: 20 -> 15).
2515        cache.insert("a", alice).await;
2516        cache.insert("b", bob).await;
2517        expected.push((Arc::new("b"), bill, RemovalCause::Replaced));
2518        cache.run_pending_tasks().await;
2519        assert_eq!(cache.get(&"a").await, Some(alice));
2520        assert_eq!(cache.get(&"b").await, Some(bob));
2521        assert_eq!(cache.get(&"d").await, None);
2522        assert!(cache.contains_key(&"a"));
2523        assert!(cache.contains_key(&"b"));
2524        assert!(!cache.contains_key(&"d"));
2525
2526        // Verify the sizes.
2527        assert_eq!(cache.entry_count(), 2);
2528        assert_eq!(cache.weighted_size(), 25);
2529
2530        verify_notification_vec(&cache, actual, &expected).await;
2531        assert!(cache.key_locks_map_is_empty());
2532    }
2533
2534    #[tokio::test]
2535    async fn basic_multi_async_tasks() {
2536        let num_tasks = 2;
2537        let num_threads = 2;
2538
2539        let cache = Cache::new(100);
2540        let barrier = Arc::new(Barrier::new(num_tasks + num_threads as usize));
2541
2542        let tasks = (0..num_tasks)
2543            .map(|id| {
2544                let cache = cache.clone();
2545                let barrier = Arc::clone(&barrier);
2546
2547                tokio::spawn(async move {
2548                    barrier.wait().await;
2549
2550                    cache.insert(10, format!("{id}-100")).await;
2551                    cache.get(&10).await;
2552                    cache.insert(20, format!("{id}-200")).await;
2553                    cache.invalidate(&10).await;
2554                })
2555            })
2556            .collect::<Vec<_>>();
2557
2558        let threads = (0..num_threads)
2559            .map(|id| {
2560                let cache = cache.clone();
2561                let barrier = Arc::clone(&barrier);
2562                let rt = tokio::runtime::Handle::current();
2563
2564                std::thread::spawn(move || {
2565                    rt.block_on(barrier.wait());
2566
2567                    rt.block_on(cache.insert(10, format!("{id}-100")));
2568                    rt.block_on(cache.get(&10));
2569                    rt.block_on(cache.insert(20, format!("{id}-200")));
2570                    rt.block_on(cache.invalidate(&10));
2571                })
2572            })
2573            .collect::<Vec<_>>();
2574
2575        let _ = futures_util::future::join_all(tasks).await;
2576        threads.into_iter().for_each(|t| t.join().unwrap());
2577
2578        assert!(cache.get(&10).await.is_none());
2579        assert!(cache.get(&20).await.is_some());
2580        assert!(!cache.contains_key(&10));
2581        assert!(cache.contains_key(&20));
2582    }
2583
2584    #[tokio::test]
2585    async fn invalidate_all() {
2586        // The following `Vec`s will hold actual and expected notifications.
2587        let actual = Arc::new(Mutex::new(Vec::new()));
2588        let mut expected = Vec::new();
2589
2590        // Create an eviction listener.
2591        let a1 = Arc::clone(&actual);
2592        let listener = move |k, v, cause| -> ListenerFuture {
2593            let a2 = Arc::clone(&a1);
2594            async move {
2595                a2.lock().await.push((k, v, cause));
2596            }
2597            .boxed()
2598        };
2599
2600        // Create a cache with the eviction listener.
2601        let mut cache = Cache::builder()
2602            .max_capacity(100)
2603            .async_eviction_listener(listener)
2604            .build();
2605        cache.reconfigure_for_testing().await;
2606
2607        // Make the cache exterior immutable.
2608        let cache = cache;
2609
2610        cache.insert("a", "alice").await;
2611        cache.insert("b", "bob").await;
2612        cache.insert("c", "cindy").await;
2613        assert_eq!(cache.get(&"a").await, Some("alice"));
2614        assert_eq!(cache.get(&"b").await, Some("bob"));
2615        assert_eq!(cache.get(&"c").await, Some("cindy"));
2616        assert!(cache.contains_key(&"a"));
2617        assert!(cache.contains_key(&"b"));
2618        assert!(cache.contains_key(&"c"));
2619
2620        // `cache.run_pending_tasks().await` is no longer needed here before invalidating. The last
2621        // modified timestamp of the entries were updated when they were inserted.
2622        // https://github.com/moka-rs/moka/issues/155
2623
2624        cache.invalidate_all();
2625        expected.push((Arc::new("a"), "alice", RemovalCause::Explicit));
2626        expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
2627        expected.push((Arc::new("c"), "cindy", RemovalCause::Explicit));
2628        cache.run_pending_tasks().await;
2629
2630        cache.insert("d", "david").await;
2631        cache.run_pending_tasks().await;
2632
2633        assert!(cache.get(&"a").await.is_none());
2634        assert!(cache.get(&"b").await.is_none());
2635        assert!(cache.get(&"c").await.is_none());
2636        assert_eq!(cache.get(&"d").await, Some("david"));
2637        assert!(!cache.contains_key(&"a"));
2638        assert!(!cache.contains_key(&"b"));
2639        assert!(!cache.contains_key(&"c"));
2640        assert!(cache.contains_key(&"d"));
2641
2642        verify_notification_vec(&cache, actual, &expected).await;
2643    }
2644
2645    // This test is for https://github.com/moka-rs/moka/issues/155
2646    #[tokio::test]
2647    async fn invalidate_all_without_running_pending_tasks() {
2648        let cache = Cache::new(1024);
2649
2650        assert_eq!(cache.get(&0).await, None);
2651        cache.insert(0, 1).await;
2652        assert_eq!(cache.get(&0).await, Some(1));
2653
2654        cache.invalidate_all();
2655        assert_eq!(cache.get(&0).await, None);
2656    }
2657
2658    #[tokio::test]
2659    async fn invalidate_entries_if() -> Result<(), Box<dyn std::error::Error>> {
2660        use std::collections::HashSet;
2661
2662        // The following `Vec`s will hold actual and expected notifications.
2663        let actual = Arc::new(Mutex::new(Vec::new()));
2664        let mut expected = Vec::new();
2665
2666        // Create an eviction listener.
2667        let a1 = Arc::clone(&actual);
2668        let listener = move |k, v, cause| -> ListenerFuture {
2669            let a2 = Arc::clone(&a1);
2670            async move {
2671                a2.lock().await.push((k, v, cause));
2672            }
2673            .boxed()
2674        };
2675
2676        let (clock, mock) = Clock::mock();
2677
2678        // Create a cache with the eviction listener.
2679        let mut cache = Cache::builder()
2680            .max_capacity(100)
2681            .support_invalidation_closures()
2682            .async_eviction_listener(listener)
2683            .clock(clock)
2684            .build();
2685        cache.reconfigure_for_testing().await;
2686
2687        // Make the cache exterior immutable.
2688        let cache = cache;
2689
2690        cache.insert(0, "alice").await;
2691        cache.insert(1, "bob").await;
2692        cache.insert(2, "alex").await;
2693        cache.run_pending_tasks().await;
2694
2695        mock.increment(Duration::from_secs(5)); // 5 secs from the start.
2696        cache.run_pending_tasks().await;
2697
2698        assert_eq!(cache.get(&0).await, Some("alice"));
2699        assert_eq!(cache.get(&1).await, Some("bob"));
2700        assert_eq!(cache.get(&2).await, Some("alex"));
2701        assert!(cache.contains_key(&0));
2702        assert!(cache.contains_key(&1));
2703        assert!(cache.contains_key(&2));
2704
2705        let names = ["alice", "alex"].iter().cloned().collect::<HashSet<_>>();
2706        cache.invalidate_entries_if(move |_k, &v| names.contains(v))?;
2707        assert_eq!(cache.invalidation_predicate_count(), 1);
2708        expected.push((Arc::new(0), "alice", RemovalCause::Explicit));
2709        expected.push((Arc::new(2), "alex", RemovalCause::Explicit));
2710
2711        mock.increment(Duration::from_secs(5)); // 10 secs from the start.
2712
2713        cache.insert(3, "alice").await;
2714
2715        // Run the invalidation task and wait for it to finish. (TODO: Need a better way than sleeping)
2716        cache.run_pending_tasks().await; // To submit the invalidation task.
2717        std::thread::sleep(Duration::from_millis(200));
2718        cache.run_pending_tasks().await; // To process the task result.
2719        std::thread::sleep(Duration::from_millis(200));
2720
2721        assert!(cache.get(&0).await.is_none());
2722        assert!(cache.get(&2).await.is_none());
2723        assert_eq!(cache.get(&1).await, Some("bob"));
2724        // This should survive as it was inserted after calling invalidate_entries_if.
2725        assert_eq!(cache.get(&3).await, Some("alice"));
2726
2727        assert!(!cache.contains_key(&0));
2728        assert!(cache.contains_key(&1));
2729        assert!(!cache.contains_key(&2));
2730        assert!(cache.contains_key(&3));
2731
2732        assert_eq!(cache.entry_count(), 2);
2733        assert_eq!(cache.invalidation_predicate_count(), 0);
2734
2735        mock.increment(Duration::from_secs(5)); // 15 secs from the start.
2736
2737        cache.invalidate_entries_if(|_k, &v| v == "alice")?;
2738        cache.invalidate_entries_if(|_k, &v| v == "bob")?;
2739        assert_eq!(cache.invalidation_predicate_count(), 2);
2740        // key 1 was inserted before key 3.
2741        expected.push((Arc::new(1), "bob", RemovalCause::Explicit));
2742        expected.push((Arc::new(3), "alice", RemovalCause::Explicit));
2743
2744        // Run the invalidation task and wait for it to finish. (TODO: Need a better way than sleeping)
2745        cache.run_pending_tasks().await; // To submit the invalidation task.
2746        std::thread::sleep(Duration::from_millis(200));
2747        cache.run_pending_tasks().await; // To process the task result.
2748        std::thread::sleep(Duration::from_millis(200));
2749
2750        assert!(cache.get(&1).await.is_none());
2751        assert!(cache.get(&3).await.is_none());
2752
2753        assert!(!cache.contains_key(&1));
2754        assert!(!cache.contains_key(&3));
2755
2756        assert_eq!(cache.entry_count(), 0);
2757        assert_eq!(cache.invalidation_predicate_count(), 0);
2758
2759        verify_notification_vec(&cache, actual, &expected).await;
2760
2761        Ok(())
2762    }
2763
2764    #[tokio::test]
2765    async fn time_to_live() {
2766        // The following `Vec`s will hold actual and expected notifications.
2767        let actual = Arc::new(Mutex::new(Vec::new()));
2768        let mut expected = Vec::new();
2769
2770        // Create an eviction listener.
2771        let a1 = Arc::clone(&actual);
2772        let listener = move |k, v, cause| -> ListenerFuture {
2773            let a2 = Arc::clone(&a1);
2774            async move {
2775                a2.lock().await.push((k, v, cause));
2776            }
2777            .boxed()
2778        };
2779
2780        let (clock, mock) = Clock::mock();
2781
2782        // Create a cache with the eviction listener.
2783        let mut cache = Cache::builder()
2784            .max_capacity(100)
2785            .time_to_live(Duration::from_secs(10))
2786            .async_eviction_listener(listener)
2787            .clock(clock)
2788            .build();
2789        cache.reconfigure_for_testing().await;
2790
2791        // Make the cache exterior immutable.
2792        let cache = cache;
2793
2794        cache.insert("a", "alice").await;
2795        cache.run_pending_tasks().await;
2796
2797        mock.increment(Duration::from_secs(5)); // 5 secs from the start.
2798        cache.run_pending_tasks().await;
2799
2800        assert_eq!(cache.get(&"a").await, Some("alice"));
2801        assert!(cache.contains_key(&"a"));
2802
2803        mock.increment(Duration::from_secs(5)); // 10 secs.
2804        expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
2805        assert_eq!(cache.get(&"a").await, None);
2806        assert!(!cache.contains_key(&"a"));
2807
2808        assert_eq!(cache.iter().count(), 0);
2809
2810        cache.run_pending_tasks().await;
2811        assert!(cache.is_table_empty());
2812
2813        cache.insert("b", "bob").await;
2814        cache.run_pending_tasks().await;
2815
2816        assert_eq!(cache.entry_count(), 1);
2817
2818        mock.increment(Duration::from_secs(5)); // 15 secs.
2819        cache.run_pending_tasks().await;
2820
2821        assert_eq!(cache.get(&"b").await, Some("bob"));
2822        assert!(cache.contains_key(&"b"));
2823        assert_eq!(cache.entry_count(), 1);
2824
2825        cache.insert("b", "bill").await;
2826        expected.push((Arc::new("b"), "bob", RemovalCause::Replaced));
2827        cache.run_pending_tasks().await;
2828
2829        mock.increment(Duration::from_secs(5)); // 20 secs
2830        cache.run_pending_tasks().await;
2831
2832        assert_eq!(cache.get(&"b").await, Some("bill"));
2833        assert!(cache.contains_key(&"b"));
2834        assert_eq!(cache.entry_count(), 1);
2835
2836        mock.increment(Duration::from_secs(5)); // 25 secs
2837        expected.push((Arc::new("b"), "bill", RemovalCause::Expired));
2838
2839        assert_eq!(cache.get(&"a").await, None);
2840        assert_eq!(cache.get(&"b").await, None);
2841        assert!(!cache.contains_key(&"a"));
2842        assert!(!cache.contains_key(&"b"));
2843
2844        assert_eq!(cache.iter().count(), 0);
2845
2846        cache.run_pending_tasks().await;
2847        assert!(cache.is_table_empty());
2848
2849        verify_notification_vec(&cache, actual, &expected).await;
2850    }
2851
2852    #[tokio::test]
2853    async fn time_to_idle() {
2854        // The following `Vec`s will hold actual and expected notifications.
2855        let actual = Arc::new(Mutex::new(Vec::new()));
2856        let mut expected = Vec::new();
2857
2858        // Create an eviction listener.
2859        let a1 = Arc::clone(&actual);
2860        let listener = move |k, v, cause| -> ListenerFuture {
2861            let a2 = Arc::clone(&a1);
2862            async move {
2863                a2.lock().await.push((k, v, cause));
2864            }
2865            .boxed()
2866        };
2867
2868        let (clock, mock) = Clock::mock();
2869
2870        // Create a cache with the eviction listener.
2871        let mut cache = Cache::builder()
2872            .max_capacity(100)
2873            .time_to_idle(Duration::from_secs(10))
2874            .async_eviction_listener(listener)
2875            .clock(clock)
2876            .build();
2877        cache.reconfigure_for_testing().await;
2878
2879        // Make the cache exterior immutable.
2880        let cache = cache;
2881
2882        cache.insert("a", "alice").await;
2883        cache.run_pending_tasks().await;
2884
2885        mock.increment(Duration::from_secs(5)); // 5 secs from the start.
2886        cache.run_pending_tasks().await;
2887
2888        assert_eq!(cache.get(&"a").await, Some("alice"));
2889
2890        mock.increment(Duration::from_secs(5)); // 10 secs.
2891        cache.run_pending_tasks().await;
2892
2893        cache.insert("b", "bob").await;
2894        cache.run_pending_tasks().await;
2895
2896        assert_eq!(cache.entry_count(), 2);
2897
2898        mock.increment(Duration::from_secs(2)); // 12 secs.
2899        cache.run_pending_tasks().await;
2900
2901        // contains_key does not reset the idle timer for the key.
2902        assert!(cache.contains_key(&"a"));
2903        assert!(cache.contains_key(&"b"));
2904        cache.run_pending_tasks().await;
2905
2906        assert_eq!(cache.entry_count(), 2);
2907
2908        mock.increment(Duration::from_secs(3)); // 15 secs.
2909        expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
2910
2911        assert_eq!(cache.get(&"a").await, None);
2912        assert_eq!(cache.get(&"b").await, Some("bob"));
2913        assert!(!cache.contains_key(&"a"));
2914        assert!(cache.contains_key(&"b"));
2915
2916        assert_eq!(cache.iter().count(), 1);
2917
2918        cache.run_pending_tasks().await;
2919        assert_eq!(cache.entry_count(), 1);
2920
2921        mock.increment(Duration::from_secs(10)); // 25 secs
2922        expected.push((Arc::new("b"), "bob", RemovalCause::Expired));
2923
2924        assert_eq!(cache.get(&"a").await, None);
2925        assert_eq!(cache.get(&"b").await, None);
2926        assert!(!cache.contains_key(&"a"));
2927        assert!(!cache.contains_key(&"b"));
2928
2929        assert_eq!(cache.iter().count(), 0);
2930
2931        cache.run_pending_tasks().await;
2932        assert!(cache.is_table_empty());
2933
2934        verify_notification_vec(&cache, actual, &expected).await;
2935    }
2936
2937    // https://github.com/moka-rs/moka/issues/359
2938    #[tokio::test]
2939    async fn ensure_access_time_is_updated_immediately_after_read() {
2940        let (clock, mock) = Clock::mock();
2941        let mut cache = Cache::builder()
2942            .max_capacity(10)
2943            .time_to_idle(Duration::from_secs(5))
2944            .clock(clock)
2945            .build();
2946        cache.reconfigure_for_testing().await;
2947        // Make the cache exterior immutable.
2948        let cache = cache;
2949
2950        cache.insert(1, 1).await;
2951
2952        mock.increment(Duration::from_secs(4));
2953        assert_eq!(cache.get(&1).await, Some(1));
2954
2955        mock.increment(Duration::from_secs(2));
2956        assert_eq!(cache.get(&1).await, Some(1));
2957        cache.run_pending_tasks().await;
2958        assert_eq!(cache.get(&1).await, Some(1));
2959    }
2960
2961    #[tokio::test]
2962    async fn time_to_live_by_expiry_type() {
2963        // The following `Vec`s will hold actual and expected notifications.
2964        let actual = Arc::new(Mutex::new(Vec::new()));
2965        let mut expected = Vec::new();
2966
2967        // Define an expiry type.
2968        struct MyExpiry {
2969            counters: Arc<ExpiryCallCounters>,
2970        }
2971
2972        impl MyExpiry {
2973            fn new(counters: Arc<ExpiryCallCounters>) -> Self {
2974                Self { counters }
2975            }
2976        }
2977
2978        impl Expiry<&str, &str> for MyExpiry {
2979            fn expire_after_create(
2980                &self,
2981                _key: &&str,
2982                _value: &&str,
2983                _current_time: StdInstant,
2984            ) -> Option<Duration> {
2985                self.counters.incl_actual_creations();
2986                Some(Duration::from_secs(10))
2987            }
2988
2989            fn expire_after_update(
2990                &self,
2991                _key: &&str,
2992                _value: &&str,
2993                _current_time: StdInstant,
2994                _current_duration: Option<Duration>,
2995            ) -> Option<Duration> {
2996                self.counters.incl_actual_updates();
2997                Some(Duration::from_secs(10))
2998            }
2999        }
3000
3001        // Create an eviction listener.
3002        let a1 = Arc::clone(&actual);
3003        let listener = move |k, v, cause| -> ListenerFuture {
3004            let a2 = Arc::clone(&a1);
3005            async move {
3006                a2.lock().await.push((k, v, cause));
3007            }
3008            .boxed()
3009        };
3010
3011        // Create expiry counters and the expiry.
3012        let expiry_counters = Arc::new(ExpiryCallCounters::default());
3013        let expiry = MyExpiry::new(Arc::clone(&expiry_counters));
3014
3015        let (clock, mock) = Clock::mock();
3016
3017        // Create a cache with the expiry and eviction listener.
3018        let mut cache = Cache::builder()
3019            .max_capacity(100)
3020            .expire_after(expiry)
3021            .async_eviction_listener(listener)
3022            .clock(clock)
3023            .build();
3024        cache.reconfigure_for_testing().await;
3025
3026        // Make the cache exterior immutable.
3027        let cache = cache;
3028
3029        cache.insert("a", "alice").await;
3030        expiry_counters.incl_expected_creations();
3031        cache.run_pending_tasks().await;
3032
3033        mock.increment(Duration::from_secs(5)); // 5 secs from the start.
3034        cache.run_pending_tasks().await;
3035
3036        assert_eq!(cache.get(&"a").await, Some("alice"));
3037        assert!(cache.contains_key(&"a"));
3038
3039        mock.increment(Duration::from_secs(5)); // 10 secs.
3040        expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
3041        assert_eq!(cache.get(&"a").await, None);
3042        assert!(!cache.contains_key(&"a"));
3043
3044        assert_eq!(cache.iter().count(), 0);
3045
3046        cache.run_pending_tasks().await;
3047        assert!(cache.is_table_empty());
3048
3049        cache.insert("b", "bob").await;
3050        expiry_counters.incl_expected_creations();
3051        cache.run_pending_tasks().await;
3052
3053        assert_eq!(cache.entry_count(), 1);
3054
3055        mock.increment(Duration::from_secs(5)); // 15 secs.
3056        cache.run_pending_tasks().await;
3057
3058        assert_eq!(cache.get(&"b").await, Some("bob"));
3059        assert!(cache.contains_key(&"b"));
3060        assert_eq!(cache.entry_count(), 1);
3061
3062        cache.insert("b", "bill").await;
3063        expected.push((Arc::new("b"), "bob", RemovalCause::Replaced));
3064        expiry_counters.incl_expected_updates();
3065        cache.run_pending_tasks().await;
3066
3067        mock.increment(Duration::from_secs(5)); // 20 secs
3068        cache.run_pending_tasks().await;
3069
3070        assert_eq!(cache.get(&"b").await, Some("bill"));
3071        assert!(cache.contains_key(&"b"));
3072        assert_eq!(cache.entry_count(), 1);
3073
3074        mock.increment(Duration::from_secs(5)); // 25 secs
3075        expected.push((Arc::new("b"), "bill", RemovalCause::Expired));
3076
3077        assert_eq!(cache.get(&"a").await, None);
3078        assert_eq!(cache.get(&"b").await, None);
3079        assert!(!cache.contains_key(&"a"));
3080        assert!(!cache.contains_key(&"b"));
3081
3082        assert_eq!(cache.iter().count(), 0);
3083
3084        cache.run_pending_tasks().await;
3085        assert!(cache.is_table_empty());
3086
3087        expiry_counters.verify();
3088        verify_notification_vec(&cache, actual, &expected).await;
3089    }
3090
3091    #[tokio::test]
3092    async fn time_to_idle_by_expiry_type() {
3093        // The following `Vec`s will hold actual and expected notifications.
3094        let actual = Arc::new(Mutex::new(Vec::new()));
3095        let mut expected = Vec::new();
3096
3097        // Define an expiry type.
3098        struct MyExpiry {
3099            counters: Arc<ExpiryCallCounters>,
3100        }
3101
3102        impl MyExpiry {
3103            fn new(counters: Arc<ExpiryCallCounters>) -> Self {
3104                Self { counters }
3105            }
3106        }
3107
3108        impl Expiry<&str, &str> for MyExpiry {
3109            fn expire_after_read(
3110                &self,
3111                _key: &&str,
3112                _value: &&str,
3113                _current_time: StdInstant,
3114                _current_duration: Option<Duration>,
3115                _last_modified_at: StdInstant,
3116            ) -> Option<Duration> {
3117                self.counters.incl_actual_reads();
3118                Some(Duration::from_secs(10))
3119            }
3120        }
3121
3122        // Create an eviction listener.
3123        let a1 = Arc::clone(&actual);
3124        let listener = move |k, v, cause| -> ListenerFuture {
3125            let a2 = Arc::clone(&a1);
3126            async move {
3127                a2.lock().await.push((k, v, cause));
3128            }
3129            .boxed()
3130        };
3131
3132        // Create expiry counters and the expiry.
3133        let expiry_counters = Arc::new(ExpiryCallCounters::default());
3134        let expiry = MyExpiry::new(Arc::clone(&expiry_counters));
3135
3136        let (clock, mock) = Clock::mock();
3137
3138        // Create a cache with the expiry and eviction listener.
3139        let mut cache = Cache::builder()
3140            .max_capacity(100)
3141            .expire_after(expiry)
3142            .async_eviction_listener(listener)
3143            .clock(clock)
3144            .build();
3145        cache.reconfigure_for_testing().await;
3146
3147        // Make the cache exterior immutable.
3148        let cache = cache;
3149
3150        cache.insert("a", "alice").await;
3151        cache.run_pending_tasks().await;
3152
3153        mock.increment(Duration::from_secs(5)); // 5 secs from the start.
3154        cache.run_pending_tasks().await;
3155
3156        assert_eq!(cache.get(&"a").await, Some("alice"));
3157        expiry_counters.incl_expected_reads();
3158
3159        mock.increment(Duration::from_secs(5)); // 10 secs.
3160        cache.run_pending_tasks().await;
3161
3162        cache.insert("b", "bob").await;
3163        cache.run_pending_tasks().await;
3164
3165        assert_eq!(cache.entry_count(), 2);
3166
3167        mock.increment(Duration::from_secs(2)); // 12 secs.
3168        cache.run_pending_tasks().await;
3169
3170        // contains_key does not reset the idle timer for the key.
3171        assert!(cache.contains_key(&"a"));
3172        assert!(cache.contains_key(&"b"));
3173        cache.run_pending_tasks().await;
3174
3175        assert_eq!(cache.entry_count(), 2);
3176
3177        mock.increment(Duration::from_secs(3)); // 15 secs.
3178        expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
3179
3180        assert_eq!(cache.get(&"a").await, None);
3181        assert_eq!(cache.get(&"b").await, Some("bob"));
3182        expiry_counters.incl_expected_reads();
3183        assert!(!cache.contains_key(&"a"));
3184        assert!(cache.contains_key(&"b"));
3185
3186        assert_eq!(cache.iter().count(), 1);
3187
3188        cache.run_pending_tasks().await;
3189        assert_eq!(cache.entry_count(), 1);
3190
3191        mock.increment(Duration::from_secs(10)); // 25 secs
3192        expected.push((Arc::new("b"), "bob", RemovalCause::Expired));
3193
3194        assert_eq!(cache.get(&"a").await, None);
3195        assert_eq!(cache.get(&"b").await, None);
3196        assert!(!cache.contains_key(&"a"));
3197        assert!(!cache.contains_key(&"b"));
3198
3199        assert_eq!(cache.iter().count(), 0);
3200
3201        cache.run_pending_tasks().await;
3202        assert!(cache.is_table_empty());
3203
3204        expiry_counters.verify();
3205        verify_notification_vec(&cache, actual, &expected).await;
3206    }
3207
3208    /// Verify that the `Expiry::expire_after_read()` method is called in `get_with`
3209    /// only when the key was already present in the cache.
3210    #[tokio::test]
3211    async fn test_expiry_using_get_with() {
3212        // Define an expiry type, which always return `None`.
3213        struct NoExpiry {
3214            counters: Arc<ExpiryCallCounters>,
3215        }
3216
3217        impl NoExpiry {
3218            fn new(counters: Arc<ExpiryCallCounters>) -> Self {
3219                Self { counters }
3220            }
3221        }
3222
3223        impl Expiry<&str, &str> for NoExpiry {
3224            fn expire_after_create(
3225                &self,
3226                _key: &&str,
3227                _value: &&str,
3228                _current_time: StdInstant,
3229            ) -> Option<Duration> {
3230                self.counters.incl_actual_creations();
3231                None
3232            }
3233
3234            fn expire_after_read(
3235                &self,
3236                _key: &&str,
3237                _value: &&str,
3238                _current_time: StdInstant,
3239                _current_duration: Option<Duration>,
3240                _last_modified_at: StdInstant,
3241            ) -> Option<Duration> {
3242                self.counters.incl_actual_reads();
3243                None
3244            }
3245
3246            fn expire_after_update(
3247                &self,
3248                _key: &&str,
3249                _value: &&str,
3250                _current_time: StdInstant,
3251                _current_duration: Option<Duration>,
3252            ) -> Option<Duration> {
3253                unreachable!("The `expire_after_update()` method should not be called.");
3254            }
3255        }
3256
3257        // Create expiry counters and the expiry.
3258        let expiry_counters = Arc::new(ExpiryCallCounters::default());
3259        let expiry = NoExpiry::new(Arc::clone(&expiry_counters));
3260
3261        // Create a cache with the expiry and eviction listener.
3262        let mut cache = Cache::builder()
3263            .max_capacity(100)
3264            .expire_after(expiry)
3265            .build();
3266        cache.reconfigure_for_testing().await;
3267
3268        // Make the cache exterior immutable.
3269        let cache = cache;
3270
3271        // The key is not present.
3272        cache.get_with("a", async { "alice" }).await;
3273        expiry_counters.incl_expected_creations();
3274        cache.run_pending_tasks().await;
3275
3276        // The key is present.
3277        cache.get_with("a", async { "alex" }).await;
3278        expiry_counters.incl_expected_reads();
3279        cache.run_pending_tasks().await;
3280
3281        // The key is not present.
3282        cache.invalidate("a").await;
3283        cache.get_with("a", async { "amanda" }).await;
3284        expiry_counters.incl_expected_creations();
3285        cache.run_pending_tasks().await;
3286
3287        expiry_counters.verify();
3288    }
3289
3290    // https://github.com/moka-rs/moka/issues/345
3291    #[tokio::test]
3292    async fn test_race_between_updating_entry_and_processing_its_write_ops() {
3293        let (clock, mock) = Clock::mock();
3294        let cache = Cache::builder()
3295            .max_capacity(2)
3296            .time_to_idle(Duration::from_secs(1))
3297            .clock(clock)
3298            .build();
3299
3300        cache.insert("a", "alice").await;
3301        cache.insert("b", "bob").await;
3302        cache.insert("c", "cathy").await; // c1
3303        mock.increment(Duration::from_secs(2));
3304
3305        // The following `insert` will do the followings:
3306        // 1. Replaces current "c" (c1) in the concurrent hash table (cht).
3307        // 2. Runs the pending tasks implicitly.
3308        //    (1) "a" will be admitted.
3309        //    (2) "b" will be admitted.
3310        //    (3) c1 will be evicted by size constraint.
3311        //    (4) "a" will be evicted due to expiration.
3312        //    (5) "b" will be evicted due to expiration.
3313        // 3. Send its `WriteOp` log to the channel.
3314        cache.insert("c", "cindy").await; // c2
3315
3316        // Remove "c" (c2) from the cht.
3317        assert_eq!(cache.remove(&"c").await, Some("cindy")); // c-remove
3318
3319        mock.increment(Duration::from_secs(2));
3320
3321        // The following `run_pending_tasks` will do the followings:
3322        // 1. Admits "c" (c2) to the cache. (Create a node in the LRU deque)
3323        // 2. Because of c-remove, removes c2's node from the LRU deque.
3324        cache.run_pending_tasks().await;
3325        assert_eq!(cache.entry_count(), 0);
3326    }
3327
3328    #[tokio::test]
3329    async fn test_race_between_recreating_entry_and_processing_its_write_ops() {
3330        let cache = Cache::builder().max_capacity(2).build();
3331
3332        cache.insert('a', "a").await;
3333        cache.insert('b', "b").await;
3334        cache.run_pending_tasks().await;
3335
3336        cache.insert('c', "c1").await; // (a) `EntryInfo` 1, gen: 1
3337        assert!(cache.remove(&'a').await.is_some()); // (b)
3338        assert!(cache.remove(&'b').await.is_some()); // (c)
3339        assert!(cache.remove(&'c').await.is_some()); // (d) `EntryInfo` 1, gen: 2
3340        cache.insert('c', "c2").await; // (e) `EntryInfo` 2, gen: 1
3341
3342        // Now the `write_op_ch` channel contains the following `WriteOp`s:
3343        //
3344        // - 0: (a) insert "c1" (`EntryInfo` 1, gen: 1)
3345        // - 1: (b) remove "a"
3346        // - 2: (c) remove "b"
3347        // - 3: (d) remove "c1" (`EntryInfo` 1, gen: 2)
3348        // - 4: (e) insert "c2" (`EntryInfo` 2, gen: 1)
3349        //
3350        // 0 for "c1" is going to be rejected because the cache is full. Let's ensure
3351        // processing 0 must not remove "c2" from the concurrent hash table. (Their
3352        // gen are the same, but `EntryInfo`s are different)
3353        cache.run_pending_tasks().await;
3354        assert_eq!(cache.get(&'c').await, Some("c2"));
3355    }
3356
3357    #[tokio::test]
3358    async fn test_iter() {
3359        const NUM_KEYS: usize = 50;
3360
3361        fn make_value(key: usize) -> String {
3362            format!("val: {key}")
3363        }
3364
3365        let cache = Cache::builder()
3366            .max_capacity(100)
3367            .time_to_idle(Duration::from_secs(10))
3368            .build();
3369
3370        for key in 0..NUM_KEYS {
3371            cache.insert(key, make_value(key)).await;
3372        }
3373
3374        let mut key_set = std::collections::HashSet::new();
3375
3376        for (key, value) in &cache {
3377            assert_eq!(value, make_value(*key));
3378
3379            key_set.insert(*key);
3380        }
3381
3382        // Ensure there are no missing or duplicate keys in the iteration.
3383        assert_eq!(key_set.len(), NUM_KEYS);
3384    }
3385
3386    /// Runs 16 async tasks at the same time and ensures no deadlock occurs.
3387    ///
3388    /// - Eight of the task will update key-values in the cache.
3389    /// - Eight others will iterate the cache.
3390    ///
3391    #[tokio::test]
3392    async fn test_iter_multi_async_tasks() {
3393        use std::collections::HashSet;
3394
3395        const NUM_KEYS: usize = 1024;
3396        const NUM_TASKS: usize = 16;
3397
3398        fn make_value(key: usize) -> String {
3399            format!("val: {key}")
3400        }
3401
3402        let cache = Cache::builder()
3403            .max_capacity(2048)
3404            .time_to_idle(Duration::from_secs(10))
3405            .build();
3406
3407        // Initialize the cache.
3408        for key in 0..NUM_KEYS {
3409            cache.insert(key, make_value(key)).await;
3410        }
3411
3412        let rw_lock = Arc::new(tokio::sync::RwLock::<()>::default());
3413        let write_lock = rw_lock.write().await;
3414
3415        let tasks = (0..NUM_TASKS)
3416            .map(|n| {
3417                let cache = cache.clone();
3418                let rw_lock = Arc::clone(&rw_lock);
3419
3420                if n % 2 == 0 {
3421                    // This thread will update the cache.
3422                    tokio::spawn(async move {
3423                        let read_lock = rw_lock.read().await;
3424                        for key in 0..NUM_KEYS {
3425                            // TODO: Update keys in a random order?
3426                            cache.insert(key, make_value(key)).await;
3427                        }
3428                        std::mem::drop(read_lock);
3429                    })
3430                } else {
3431                    // This thread will iterate the cache.
3432                    tokio::spawn(async move {
3433                        let read_lock = rw_lock.read().await;
3434                        let mut key_set = HashSet::new();
3435                        // let mut key_count = 0usize;
3436                        for (key, value) in &cache {
3437                            assert_eq!(value, make_value(*key));
3438                            key_set.insert(*key);
3439                            // key_count += 1;
3440                        }
3441                        // Ensure there are no missing or duplicate keys in the iteration.
3442                        assert_eq!(key_set.len(), NUM_KEYS);
3443                        std::mem::drop(read_lock);
3444                    })
3445                }
3446            })
3447            .collect::<Vec<_>>();
3448
3449        // Let these threads to run by releasing the write lock.
3450        std::mem::drop(write_lock);
3451
3452        let _ = futures_util::future::join_all(tasks).await;
3453
3454        // Ensure there are no missing or duplicate keys in the iteration.
3455        let key_set = cache.iter().map(|(k, _v)| *k).collect::<HashSet<_>>();
3456        assert_eq!(key_set.len(), NUM_KEYS);
3457    }
3458
3459    #[tokio::test]
3460    async fn get_with() {
3461        let cache = Cache::new(100);
3462        const KEY: u32 = 0;
3463
3464        // This test will run five async tasks:
3465        //
3466        // Task1 will be the first task to call `get_with` for a key, so its async
3467        // block will be evaluated and then a &str value "task1" will be inserted to
3468        // the cache.
3469        let task1 = {
3470            let cache1 = cache.clone();
3471            async move {
3472                // Call `get_with` immediately.
3473                let v = cache1
3474                    .get_with(KEY, async {
3475                        // Wait for 300 ms and return a &str value.
3476                        sleep(Duration::from_millis(300)).await;
3477                        "task1"
3478                    })
3479                    .await;
3480                assert_eq!(v, "task1");
3481            }
3482        };
3483
3484        // Task2 will be the second task to call `get_with` for the same key, so its
3485        // async block will not be evaluated. Once task1's async block finishes, it
3486        // will get the value inserted by task1's async block.
3487        let task2 = {
3488            let cache2 = cache.clone();
3489            async move {
3490                // Wait for 100 ms before calling `get_with`.
3491                sleep(Duration::from_millis(100)).await;
3492                let v = cache2.get_with(KEY, async { unreachable!() }).await;
3493                assert_eq!(v, "task1");
3494            }
3495        };
3496
3497        // Task3 will be the third task to call `get_with` for the same key. By the
3498        // time it calls, task1's async block should have finished already and the
3499        // value should be already inserted to the cache. So its async block will not
3500        // be evaluated and will get the value inserted by task1's async block
3501        // immediately.
3502        let task3 = {
3503            let cache3 = cache.clone();
3504            async move {
3505                // Wait for 400 ms before calling `get_with`.
3506                sleep(Duration::from_millis(400)).await;
3507                let v = cache3.get_with(KEY, async { unreachable!() }).await;
3508                assert_eq!(v, "task1");
3509            }
3510        };
3511
3512        // Task4 will call `get` for the same key. It will call when task1's async
3513        // block is still running, so it will get none for the key.
3514        let task4 = {
3515            let cache4 = cache.clone();
3516            async move {
3517                // Wait for 200 ms before calling `get`.
3518                sleep(Duration::from_millis(200)).await;
3519                let maybe_v = cache4.get(&KEY).await;
3520                assert!(maybe_v.is_none());
3521            }
3522        };
3523
3524        // Task5 will call `get` for the same key. It will call after task1's async
3525        // block finished, so it will get the value insert by task1's async block.
3526        let task5 = {
3527            let cache5 = cache.clone();
3528            async move {
3529                // Wait for 400 ms before calling `get`.
3530                sleep(Duration::from_millis(400)).await;
3531                let maybe_v = cache5.get(&KEY).await;
3532                assert_eq!(maybe_v, Some("task1"));
3533            }
3534        };
3535
3536        futures_util::join!(task1, task2, task3, task4, task5);
3537
3538        assert!(cache.is_waiter_map_empty());
3539    }
3540
3541    #[tokio::test]
3542    async fn get_with_by_ref() {
3543        let cache = Cache::new(100);
3544        const KEY: &u32 = &0;
3545
3546        // This test will run five async tasks:
3547        //
3548        // Task1 will be the first task to call `get_with_by_ref` for a key, so its async
3549        // block will be evaluated and then a &str value "task1" will be inserted to
3550        // the cache.
3551        let task1 = {
3552            let cache1 = cache.clone();
3553            async move {
3554                // Call `get_with_by_ref` immediately.
3555                let v = cache1
3556                    .get_with_by_ref(KEY, async {
3557                        // Wait for 300 ms and return a &str value.
3558                        sleep(Duration::from_millis(300)).await;
3559                        "task1"
3560                    })
3561                    .await;
3562                assert_eq!(v, "task1");
3563            }
3564        };
3565
3566        // Task2 will be the second task to call `get_with_by_ref` for the same key, so its
3567        // async block will not be evaluated. Once task1's async block finishes, it
3568        // will get the value inserted by task1's async block.
3569        let task2 = {
3570            let cache2 = cache.clone();
3571            async move {
3572                // Wait for 100 ms before calling `get_with_by_ref`.
3573                sleep(Duration::from_millis(100)).await;
3574                let v = cache2.get_with_by_ref(KEY, async { unreachable!() }).await;
3575                assert_eq!(v, "task1");
3576            }
3577        };
3578
3579        // Task3 will be the third task to call `get_with_by_ref` for the same key. By the
3580        // time it calls, task1's async block should have finished already and the
3581        // value should be already inserted to the cache. So its async block will not
3582        // be evaluated and will get the value inserted by task1's async block
3583        // immediately.
3584        let task3 = {
3585            let cache3 = cache.clone();
3586            async move {
3587                // Wait for 400 ms before calling `get_with_by_ref`.
3588                sleep(Duration::from_millis(400)).await;
3589                let v = cache3.get_with_by_ref(KEY, async { unreachable!() }).await;
3590                assert_eq!(v, "task1");
3591            }
3592        };
3593
3594        // Task4 will call `get` for the same key. It will call when task1's async
3595        // block is still running, so it will get none for the key.
3596        let task4 = {
3597            let cache4 = cache.clone();
3598            async move {
3599                // Wait for 200 ms before calling `get`.
3600                sleep(Duration::from_millis(200)).await;
3601                let maybe_v = cache4.get(KEY).await;
3602                assert!(maybe_v.is_none());
3603            }
3604        };
3605
3606        // Task5 will call `get` for the same key. It will call after task1's async
3607        // block finished, so it will get the value insert by task1's async block.
3608        let task5 = {
3609            let cache5 = cache.clone();
3610            async move {
3611                // Wait for 400 ms before calling `get`.
3612                sleep(Duration::from_millis(400)).await;
3613                let maybe_v = cache5.get(KEY).await;
3614                assert_eq!(maybe_v, Some("task1"));
3615            }
3616        };
3617
3618        futures_util::join!(task1, task2, task3, task4, task5);
3619
3620        assert!(cache.is_waiter_map_empty());
3621    }
3622
3623    #[tokio::test]
3624    async fn entry_or_insert_with_if() {
3625        let cache = Cache::new(100);
3626        const KEY: u32 = 0;
3627
3628        // This test will run seven async tasks:
3629        //
3630        // Task1 will be the first task to call `or_insert_with_if` for a key, so its
3631        // async block will be evaluated and then a &str value "task1" will be
3632        // inserted to the cache.
3633        let task1 = {
3634            let cache1 = cache.clone();
3635            async move {
3636                // Call `or_insert_with_if` immediately.
3637                let entry = cache1
3638                    .entry(KEY)
3639                    .or_insert_with_if(
3640                        async {
3641                            // Wait for 300 ms and return a &str value.
3642                            sleep(Duration::from_millis(300)).await;
3643                            "task1"
3644                        },
3645                        |_v| unreachable!(),
3646                    )
3647                    .await;
3648                // Entry should be fresh because our async block should have been
3649                // evaluated.
3650                assert!(entry.is_fresh());
3651                assert_eq!(entry.into_value(), "task1");
3652            }
3653        };
3654
3655        // Task2 will be the second task to call `or_insert_with_if` for the same
3656        // key, so its async block will not be evaluated. Once task1's async block
3657        // finishes, it will get the value inserted by task1's async block.
3658        let task2 = {
3659            let cache2 = cache.clone();
3660            async move {
3661                // Wait for 100 ms before calling `or_insert_with_if`.
3662                sleep(Duration::from_millis(100)).await;
3663                let entry = cache2
3664                    .entry(KEY)
3665                    .or_insert_with_if(async { unreachable!() }, |_v| unreachable!())
3666                    .await;
3667                // Entry should not be fresh because task1's async block should have
3668                // been evaluated instead of ours.
3669                assert!(!entry.is_fresh());
3670                assert_eq!(entry.into_value(), "task1");
3671            }
3672        };
3673
3674        // Task3 will be the third task to call `or_insert_with_if` for the same key.
3675        // By the time it calls, task1's async block should have finished already and
3676        // the value should be already inserted to the cache. Also task3's
3677        // `replace_if` closure returns `false`. So its async block will not be
3678        // evaluated and will get the value inserted by task1's async block
3679        // immediately.
3680        let task3 = {
3681            let cache3 = cache.clone();
3682            async move {
3683                // Wait for 350 ms before calling `or_insert_with_if`.
3684                sleep(Duration::from_millis(350)).await;
3685                let entry = cache3
3686                    .entry(KEY)
3687                    .or_insert_with_if(async { unreachable!() }, |v| {
3688                        assert_eq!(v, &"task1");
3689                        false
3690                    })
3691                    .await;
3692                assert!(!entry.is_fresh());
3693                assert_eq!(entry.into_value(), "task1");
3694            }
3695        };
3696
3697        // Task4 will be the fourth task to call `or_insert_with_if` for the same
3698        // key. The value should have been already inserted to the cache by task1.
3699        // However task4's `replace_if` closure returns `true`. So its async block
3700        // will be evaluated to replace the current value.
3701        let task4 = {
3702            let cache4 = cache.clone();
3703            async move {
3704                // Wait for 400 ms before calling `or_insert_with_if`.
3705                sleep(Duration::from_millis(400)).await;
3706                let entry = cache4
3707                    .entry(KEY)
3708                    .or_insert_with_if(async { "task4" }, |v| {
3709                        assert_eq!(v, &"task1");
3710                        true
3711                    })
3712                    .await;
3713                assert!(entry.is_fresh());
3714                assert_eq!(entry.into_value(), "task4");
3715            }
3716        };
3717
3718        // Task5 will call `get` for the same key. It will call when task1's async
3719        // block is still running, so it will get none for the key.
3720        let task5 = {
3721            let cache5 = cache.clone();
3722            async move {
3723                // Wait for 200 ms before calling `get`.
3724                sleep(Duration::from_millis(200)).await;
3725                let maybe_v = cache5.get(&KEY).await;
3726                assert!(maybe_v.is_none());
3727            }
3728        };
3729
3730        // Task6 will call `get` for the same key. It will call after task1's async
3731        // block finished, so it will get the value insert by task1's async block.
3732        let task6 = {
3733            let cache6 = cache.clone();
3734            async move {
3735                // Wait for 350 ms before calling `get`.
3736                sleep(Duration::from_millis(350)).await;
3737                let maybe_v = cache6.get(&KEY).await;
3738                assert_eq!(maybe_v, Some("task1"));
3739            }
3740        };
3741
3742        // Task7 will call `get` for the same key. It will call after task4's async
3743        // block finished, so it will get the value insert by task4's async block.
3744        let task7 = {
3745            let cache7 = cache.clone();
3746            async move {
3747                // Wait for 450 ms before calling `get`.
3748                sleep(Duration::from_millis(450)).await;
3749                let maybe_v = cache7.get(&KEY).await;
3750                assert_eq!(maybe_v, Some("task4"));
3751            }
3752        };
3753
3754        futures_util::join!(task1, task2, task3, task4, task5, task6, task7);
3755
3756        assert!(cache.is_waiter_map_empty());
3757    }
3758
3759    #[tokio::test]
3760    async fn entry_by_ref_or_insert_with_if() {
3761        let cache = Cache::new(100);
3762        const KEY: &u32 = &0;
3763
3764        // This test will run seven async tasks:
3765        //
3766        // Task1 will be the first task to call `or_insert_with_if` for a key, so its
3767        // async block will be evaluated and then a &str value "task1" will be
3768        // inserted to the cache.
3769        let task1 = {
3770            let cache1 = cache.clone();
3771            async move {
3772                // Call `or_insert_with_if` immediately.
3773                let entry = cache1
3774                    .entry_by_ref(KEY)
3775                    .or_insert_with_if(
3776                        async {
3777                            // Wait for 300 ms and return a &str value.
3778                            sleep(Duration::from_millis(300)).await;
3779                            "task1"
3780                        },
3781                        |_v| unreachable!(),
3782                    )
3783                    .await;
3784                // Entry should be fresh because our async block should have been
3785                // evaluated.
3786                assert!(entry.is_fresh());
3787                assert_eq!(entry.into_value(), "task1");
3788            }
3789        };
3790
3791        // Task2 will be the second task to call `or_insert_with_if` for the same
3792        // key, so its async block will not be evaluated. Once task1's async block
3793        // finishes, it will get the value inserted by task1's async block.
3794        let task2 = {
3795            let cache2 = cache.clone();
3796            async move {
3797                // Wait for 100 ms before calling `or_insert_with_if`.
3798                sleep(Duration::from_millis(100)).await;
3799                let entry = cache2
3800                    .entry_by_ref(KEY)
3801                    .or_insert_with_if(async { unreachable!() }, |_v| unreachable!())
3802                    .await;
3803                // Entry should not be fresh because task1's async block should have
3804                // been evaluated instead of ours.
3805                assert!(!entry.is_fresh());
3806                assert_eq!(entry.into_value(), "task1");
3807            }
3808        };
3809
3810        // Task3 will be the third task to call `or_insert_with_if` for the same key.
3811        // By the time it calls, task1's async block should have finished already and
3812        // the value should be already inserted to the cache. Also task3's
3813        // `replace_if` closure returns `false`. So its async block will not be
3814        // evaluated and will get the value inserted by task1's async block
3815        // immediately.
3816        let task3 = {
3817            let cache3 = cache.clone();
3818            async move {
3819                // Wait for 350 ms before calling `or_insert_with_if`.
3820                sleep(Duration::from_millis(350)).await;
3821                let entry = cache3
3822                    .entry_by_ref(KEY)
3823                    .or_insert_with_if(async { unreachable!() }, |v| {
3824                        assert_eq!(v, &"task1");
3825                        false
3826                    })
3827                    .await;
3828                assert!(!entry.is_fresh());
3829                assert_eq!(entry.into_value(), "task1");
3830            }
3831        };
3832
3833        // Task4 will be the fourth task to call `or_insert_with_if` for the same
3834        // key. The value should have been already inserted to the cache by task1.
3835        // However task4's `replace_if` closure returns `true`. So its async block
3836        // will be evaluated to replace the current value.
3837        let task4 = {
3838            let cache4 = cache.clone();
3839            async move {
3840                // Wait for 400 ms before calling `or_insert_with_if`.
3841                sleep(Duration::from_millis(400)).await;
3842                let entry = cache4
3843                    .entry_by_ref(KEY)
3844                    .or_insert_with_if(async { "task4" }, |v| {
3845                        assert_eq!(v, &"task1");
3846                        true
3847                    })
3848                    .await;
3849                assert!(entry.is_fresh());
3850                assert_eq!(entry.into_value(), "task4");
3851            }
3852        };
3853
3854        // Task5 will call `get` for the same key. It will call when task1's async
3855        // block is still running, so it will get none for the key.
3856        let task5 = {
3857            let cache5 = cache.clone();
3858            async move {
3859                // Wait for 200 ms before calling `get`.
3860                sleep(Duration::from_millis(200)).await;
3861                let maybe_v = cache5.get(KEY).await;
3862                assert!(maybe_v.is_none());
3863            }
3864        };
3865
3866        // Task6 will call `get` for the same key. It will call after task1's async
3867        // block finished, so it will get the value insert by task1's async block.
3868        let task6 = {
3869            let cache6 = cache.clone();
3870            async move {
3871                // Wait for 350 ms before calling `get`.
3872                sleep(Duration::from_millis(350)).await;
3873                let maybe_v = cache6.get(KEY).await;
3874                assert_eq!(maybe_v, Some("task1"));
3875            }
3876        };
3877
3878        // Task7 will call `get` for the same key. It will call after task4's async
3879        // block finished, so it will get the value insert by task4's async block.
3880        let task7 = {
3881            let cache7 = cache.clone();
3882            async move {
3883                // Wait for 450 ms before calling `get`.
3884                sleep(Duration::from_millis(450)).await;
3885                let maybe_v = cache7.get(KEY).await;
3886                assert_eq!(maybe_v, Some("task4"));
3887            }
3888        };
3889
3890        futures_util::join!(task1, task2, task3, task4, task5, task6, task7);
3891
3892        assert!(cache.is_waiter_map_empty());
3893    }
3894
3895    #[tokio::test]
3896    async fn try_get_with() {
3897        use std::sync::Arc;
3898
3899        // Note that MyError does not implement std::error::Error trait
3900        // like anyhow::Error.
3901        #[derive(Debug)]
3902        pub struct MyError(#[allow(dead_code)] String);
3903
3904        type MyResult<T> = Result<T, Arc<MyError>>;
3905
3906        let cache = Cache::new(100);
3907        const KEY: u32 = 0;
3908
3909        // This test will run eight async tasks:
3910        //
3911        // Task1 will be the first task to call `get_with` for a key, so its async
3912        // block will be evaluated and then an error will be returned. Nothing will
3913        // be inserted to the cache.
3914        let task1 = {
3915            let cache1 = cache.clone();
3916            async move {
3917                // Call `try_get_with` immediately.
3918                let v = cache1
3919                    .try_get_with(KEY, async {
3920                        // Wait for 300 ms and return an error.
3921                        sleep(Duration::from_millis(300)).await;
3922                        Err(MyError("task1 error".into()))
3923                    })
3924                    .await;
3925                assert!(v.is_err());
3926            }
3927        };
3928
3929        // Task2 will be the second task to call `get_with` for the same key, so its
3930        // async block will not be evaluated. Once task1's async block finishes, it
3931        // will get the same error value returned by task1's async block.
3932        let task2 = {
3933            let cache2 = cache.clone();
3934            async move {
3935                // Wait for 100 ms before calling `try_get_with`.
3936                sleep(Duration::from_millis(100)).await;
3937                let v: MyResult<_> = cache2.try_get_with(KEY, async { unreachable!() }).await;
3938                assert!(v.is_err());
3939            }
3940        };
3941
3942        // Task3 will be the third task to call `get_with` for the same key. By the
3943        // time it calls, task1's async block should have finished already, but the
3944        // key still does not exist in the cache. So its async block will be
3945        // evaluated and then an okay &str value will be returned. That value will be
3946        // inserted to the cache.
3947        let task3 = {
3948            let cache3 = cache.clone();
3949            async move {
3950                // Wait for 400 ms before calling `try_get_with`.
3951                sleep(Duration::from_millis(400)).await;
3952                let v: MyResult<_> = cache3
3953                    .try_get_with(KEY, async {
3954                        // Wait for 300 ms and return an Ok(&str) value.
3955                        sleep(Duration::from_millis(300)).await;
3956                        Ok("task3")
3957                    })
3958                    .await;
3959                assert_eq!(v.unwrap(), "task3");
3960            }
3961        };
3962
3963        // Task4 will be the fourth task to call `get_with` for the same key. So its
3964        // async block will not be evaluated. Once task3's async block finishes, it
3965        // will get the same okay &str value.
3966        let task4 = {
3967            let cache4 = cache.clone();
3968            async move {
3969                // Wait for 500 ms before calling `try_get_with`.
3970                sleep(Duration::from_millis(500)).await;
3971                let v: MyResult<_> = cache4.try_get_with(KEY, async { unreachable!() }).await;
3972                assert_eq!(v.unwrap(), "task3");
3973            }
3974        };
3975
3976        // Task5 will be the fifth task to call `get_with` for the same key. So its
3977        // async block will not be evaluated. By the time it calls, task3's async
3978        // block should have finished already, so its async block will not be
3979        // evaluated and will get the value insert by task3's async block
3980        // immediately.
3981        let task5 = {
3982            let cache5 = cache.clone();
3983            async move {
3984                // Wait for 800 ms before calling `try_get_with`.
3985                sleep(Duration::from_millis(800)).await;
3986                let v: MyResult<_> = cache5.try_get_with(KEY, async { unreachable!() }).await;
3987                assert_eq!(v.unwrap(), "task3");
3988            }
3989        };
3990
3991        // Task6 will call `get` for the same key. It will call when task1's async
3992        // block is still running, so it will get none for the key.
3993        let task6 = {
3994            let cache6 = cache.clone();
3995            async move {
3996                // Wait for 200 ms before calling `get`.
3997                sleep(Duration::from_millis(200)).await;
3998                let maybe_v = cache6.get(&KEY).await;
3999                assert!(maybe_v.is_none());
4000            }
4001        };
4002
4003        // Task7 will call `get` for the same key. It will call after task1's async
4004        // block finished with an error. So it will get none for the key.
4005        let task7 = {
4006            let cache7 = cache.clone();
4007            async move {
4008                // Wait for 400 ms before calling `get`.
4009                sleep(Duration::from_millis(400)).await;
4010                let maybe_v = cache7.get(&KEY).await;
4011                assert!(maybe_v.is_none());
4012            }
4013        };
4014
4015        // Task8 will call `get` for the same key. It will call after task3's async
4016        // block finished, so it will get the value insert by task3's async block.
4017        let task8 = {
4018            let cache8 = cache.clone();
4019            async move {
4020                // Wait for 800 ms before calling `get`.
4021                sleep(Duration::from_millis(800)).await;
4022                let maybe_v = cache8.get(&KEY).await;
4023                assert_eq!(maybe_v, Some("task3"));
4024            }
4025        };
4026
4027        futures_util::join!(task1, task2, task3, task4, task5, task6, task7, task8);
4028
4029        assert!(cache.is_waiter_map_empty());
4030    }
4031
4032    #[tokio::test]
4033    async fn try_get_with_by_ref() {
4034        use std::sync::Arc;
4035
4036        // Note that MyError does not implement std::error::Error trait
4037        // like anyhow::Error.
4038        #[derive(Debug)]
4039        pub struct MyError(#[allow(dead_code)] String);
4040
4041        type MyResult<T> = Result<T, Arc<MyError>>;
4042
4043        let cache = Cache::new(100);
4044        const KEY: &u32 = &0;
4045
4046        // This test will run eight async tasks:
4047        //
4048        // Task1 will be the first task to call `try_get_with_by_ref` for a key, so
4049        // its async block will be evaluated and then an error will be returned.
4050        // Nothing will be inserted to the cache.
4051        let task1 = {
4052            let cache1 = cache.clone();
4053            async move {
4054                // Call `try_get_with_by_ref` immediately.
4055                let v = cache1
4056                    .try_get_with_by_ref(KEY, async {
4057                        // Wait for 300 ms and return an error.
4058                        sleep(Duration::from_millis(300)).await;
4059                        Err(MyError("task1 error".into()))
4060                    })
4061                    .await;
4062                assert!(v.is_err());
4063            }
4064        };
4065
4066        // Task2 will be the second task to call `get_with` for the same key, so its
4067        // async block will not be evaluated. Once task1's async block finishes, it
4068        // will get the same error value returned by task1's async block.
4069        let task2 = {
4070            let cache2 = cache.clone();
4071            async move {
4072                // Wait for 100 ms before calling `try_get_with_by_ref`.
4073                sleep(Duration::from_millis(100)).await;
4074                let v: MyResult<_> = cache2
4075                    .try_get_with_by_ref(KEY, async { unreachable!() })
4076                    .await;
4077                assert!(v.is_err());
4078            }
4079        };
4080
4081        // Task3 will be the third task to call `get_with` for the same key. By the
4082        // time it calls, task1's async block should have finished already, but the
4083        // key still does not exist in the cache. So its async block will be
4084        // evaluated and then an okay &str value will be returned. That value will be
4085        // inserted to the cache.
4086        let task3 = {
4087            let cache3 = cache.clone();
4088            async move {
4089                // Wait for 400 ms before calling `try_get_with_by_ref`.
4090                sleep(Duration::from_millis(400)).await;
4091                let v: MyResult<_> = cache3
4092                    .try_get_with_by_ref(KEY, async {
4093                        // Wait for 300 ms and return an Ok(&str) value.
4094                        sleep(Duration::from_millis(300)).await;
4095                        Ok("task3")
4096                    })
4097                    .await;
4098                assert_eq!(v.unwrap(), "task3");
4099            }
4100        };
4101
4102        // Task4 will be the fourth task to call `get_with` for the same key. So its
4103        // async block will not be evaluated. Once task3's async block finishes, it
4104        // will get the same okay &str value.
4105        let task4 = {
4106            let cache4 = cache.clone();
4107            async move {
4108                // Wait for 500 ms before calling `try_get_with_by_ref`.
4109                sleep(Duration::from_millis(500)).await;
4110                let v: MyResult<_> = cache4
4111                    .try_get_with_by_ref(KEY, async { unreachable!() })
4112                    .await;
4113                assert_eq!(v.unwrap(), "task3");
4114            }
4115        };
4116
4117        // Task5 will be the fifth task to call `get_with` for the same key. So its
4118        // async block will not be evaluated. By the time it calls, task3's async
4119        // block should have finished already, so its async block will not be
4120        // evaluated and will get the value insert by task3's async block
4121        // immediately.
4122        let task5 = {
4123            let cache5 = cache.clone();
4124            async move {
4125                // Wait for 800 ms before calling `try_get_with_by_ref`.
4126                sleep(Duration::from_millis(800)).await;
4127                let v: MyResult<_> = cache5
4128                    .try_get_with_by_ref(KEY, async { unreachable!() })
4129                    .await;
4130                assert_eq!(v.unwrap(), "task3");
4131            }
4132        };
4133
4134        // Task6 will call `get` for the same key. It will call when task1's async
4135        // block is still running, so it will get none for the key.
4136        let task6 = {
4137            let cache6 = cache.clone();
4138            async move {
4139                // Wait for 200 ms before calling `get`.
4140                sleep(Duration::from_millis(200)).await;
4141                let maybe_v = cache6.get(KEY).await;
4142                assert!(maybe_v.is_none());
4143            }
4144        };
4145
4146        // Task7 will call `get` for the same key. It will call after task1's async
4147        // block finished with an error. So it will get none for the key.
4148        let task7 = {
4149            let cache7 = cache.clone();
4150            async move {
4151                // Wait for 400 ms before calling `get`.
4152                sleep(Duration::from_millis(400)).await;
4153                let maybe_v = cache7.get(KEY).await;
4154                assert!(maybe_v.is_none());
4155            }
4156        };
4157
4158        // Task8 will call `get` for the same key. It will call after task3's async
4159        // block finished, so it will get the value insert by task3's async block.
4160        let task8 = {
4161            let cache8 = cache.clone();
4162            async move {
4163                // Wait for 800 ms before calling `get`.
4164                sleep(Duration::from_millis(800)).await;
4165                let maybe_v = cache8.get(KEY).await;
4166                assert_eq!(maybe_v, Some("task3"));
4167            }
4168        };
4169
4170        futures_util::join!(task1, task2, task3, task4, task5, task6, task7, task8);
4171
4172        assert!(cache.is_waiter_map_empty());
4173    }
4174
4175    #[tokio::test]
4176    async fn optionally_get_with() {
4177        let cache = Cache::new(100);
4178        const KEY: u32 = 0;
4179
4180        // This test will run eight async tasks:
4181        //
4182        // Task1 will be the first task to call `optionally_get_with` for a key,
4183        // so its async block will be evaluated and then an None will be
4184        // returned. Nothing will be inserted to the cache.
4185        let task1 = {
4186            let cache1 = cache.clone();
4187            async move {
4188                // Call `try_get_with` immediately.
4189                let v = cache1
4190                    .optionally_get_with(KEY, async {
4191                        // Wait for 300 ms and return an None.
4192                        sleep(Duration::from_millis(300)).await;
4193                        None
4194                    })
4195                    .await;
4196                assert!(v.is_none());
4197            }
4198        };
4199
4200        // Task2 will be the second task to call `optionally_get_with` for the same
4201        // key, so its async block will not be evaluated. Once task1's async block
4202        // finishes, it will get the same error value returned by task1's async
4203        // block.
4204        let task2 = {
4205            let cache2 = cache.clone();
4206            async move {
4207                // Wait for 100 ms before calling `optionally_get_with`.
4208                sleep(Duration::from_millis(100)).await;
4209                let v = cache2
4210                    .optionally_get_with(KEY, async { unreachable!() })
4211                    .await;
4212                assert!(v.is_none());
4213            }
4214        };
4215
4216        // Task3 will be the third task to call `optionally_get_with` for the
4217        // same key. By the time it calls, task1's async block should have
4218        // finished already, but the key still does not exist in the cache. So
4219        // its async block will be evaluated and then an okay &str value will be
4220        // returned. That value will be inserted to the cache.
4221        let task3 = {
4222            let cache3 = cache.clone();
4223            async move {
4224                // Wait for 400 ms before calling `optionally_get_with`.
4225                sleep(Duration::from_millis(400)).await;
4226                let v = cache3
4227                    .optionally_get_with(KEY, async {
4228                        // Wait for 300 ms and return an Some(&str) value.
4229                        sleep(Duration::from_millis(300)).await;
4230                        Some("task3")
4231                    })
4232                    .await;
4233                assert_eq!(v.unwrap(), "task3");
4234            }
4235        };
4236
4237        // Task4 will be the fourth task to call `optionally_get_with` for the
4238        // same key. So its async block will not be evaluated. Once task3's
4239        // async block finishes, it will get the same okay &str value.
4240        let task4 = {
4241            let cache4 = cache.clone();
4242            async move {
4243                // Wait for 500 ms before calling `try_get_with`.
4244                sleep(Duration::from_millis(500)).await;
4245                let v = cache4
4246                    .optionally_get_with(KEY, async { unreachable!() })
4247                    .await;
4248                assert_eq!(v.unwrap(), "task3");
4249            }
4250        };
4251
4252        // Task5 will be the fifth task to call `optionally_get_with` for the
4253        // same key. So its async block will not be evaluated. By the time it
4254        // calls, task3's async block should have finished already, so its async
4255        // block will not be evaluated and will get the value insert by task3's
4256        // async block immediately.
4257        let task5 = {
4258            let cache5 = cache.clone();
4259            async move {
4260                // Wait for 800 ms before calling `optionally_get_with`.
4261                sleep(Duration::from_millis(800)).await;
4262                let v = cache5
4263                    .optionally_get_with(KEY, async { unreachable!() })
4264                    .await;
4265                assert_eq!(v.unwrap(), "task3");
4266            }
4267        };
4268
4269        // Task6 will call `get` for the same key. It will call when task1's async
4270        // block is still running, so it will get none for the key.
4271        let task6 = {
4272            let cache6 = cache.clone();
4273            async move {
4274                // Wait for 200 ms before calling `get`.
4275                sleep(Duration::from_millis(200)).await;
4276                let maybe_v = cache6.get(&KEY).await;
4277                assert!(maybe_v.is_none());
4278            }
4279        };
4280
4281        // Task7 will call `get` for the same key. It will call after task1's async
4282        // block finished with an error. So it will get none for the key.
4283        let task7 = {
4284            let cache7 = cache.clone();
4285            async move {
4286                // Wait for 400 ms before calling `get`.
4287                sleep(Duration::from_millis(400)).await;
4288                let maybe_v = cache7.get(&KEY).await;
4289                assert!(maybe_v.is_none());
4290            }
4291        };
4292
4293        // Task8 will call `get` for the same key. It will call after task3's async
4294        // block finished, so it will get the value insert by task3's async block.
4295        let task8 = {
4296            let cache8 = cache.clone();
4297            async move {
4298                // Wait for 800 ms before calling `get`.
4299                sleep(Duration::from_millis(800)).await;
4300                let maybe_v = cache8.get(&KEY).await;
4301                assert_eq!(maybe_v, Some("task3"));
4302            }
4303        };
4304
4305        futures_util::join!(task1, task2, task3, task4, task5, task6, task7, task8);
4306    }
4307
4308    #[tokio::test]
4309    async fn optionally_get_with_by_ref() {
4310        let cache = Cache::new(100);
4311        const KEY: &u32 = &0;
4312
4313        // This test will run eight async tasks:
4314        //
4315        // Task1 will be the first task to call `optionally_get_with_by_ref` for a
4316        // key, so its async block will be evaluated and then an None will be
4317        // returned. Nothing will be inserted to the cache.
4318        let task1 = {
4319            let cache1 = cache.clone();
4320            async move {
4321                // Call `try_get_with` immediately.
4322                let v = cache1
4323                    .optionally_get_with_by_ref(KEY, async {
4324                        // Wait for 300 ms and return an None.
4325                        sleep(Duration::from_millis(300)).await;
4326                        None
4327                    })
4328                    .await;
4329                assert!(v.is_none());
4330            }
4331        };
4332
4333        // Task2 will be the second task to call `optionally_get_with_by_ref` for the
4334        // same key, so its async block will not be evaluated. Once task1's async
4335        // block finishes, it will get the same error value returned by task1's async
4336        // block.
4337        let task2 = {
4338            let cache2 = cache.clone();
4339            async move {
4340                // Wait for 100 ms before calling `optionally_get_with_by_ref`.
4341                sleep(Duration::from_millis(100)).await;
4342                let v = cache2
4343                    .optionally_get_with_by_ref(KEY, async { unreachable!() })
4344                    .await;
4345                assert!(v.is_none());
4346            }
4347        };
4348
4349        // Task3 will be the third task to call `optionally_get_with_by_ref` for the
4350        // same key. By the time it calls, task1's async block should have
4351        // finished already, but the key still does not exist in the cache. So
4352        // its async block will be evaluated and then an okay &str value will be
4353        // returned. That value will be inserted to the cache.
4354        let task3 = {
4355            let cache3 = cache.clone();
4356            async move {
4357                // Wait for 400 ms before calling `optionally_get_with_by_ref`.
4358                sleep(Duration::from_millis(400)).await;
4359                let v = cache3
4360                    .optionally_get_with_by_ref(KEY, async {
4361                        // Wait for 300 ms and return an Some(&str) value.
4362                        sleep(Duration::from_millis(300)).await;
4363                        Some("task3")
4364                    })
4365                    .await;
4366                assert_eq!(v.unwrap(), "task3");
4367            }
4368        };
4369
4370        // Task4 will be the fourth task to call `optionally_get_with_by_ref` for the
4371        // same key. So its async block will not be evaluated. Once task3's
4372        // async block finishes, it will get the same okay &str value.
4373        let task4 = {
4374            let cache4 = cache.clone();
4375            async move {
4376                // Wait for 500 ms before calling `try_get_with`.
4377                sleep(Duration::from_millis(500)).await;
4378                let v = cache4
4379                    .optionally_get_with_by_ref(KEY, async { unreachable!() })
4380                    .await;
4381                assert_eq!(v.unwrap(), "task3");
4382            }
4383        };
4384
4385        // Task5 will be the fifth task to call `optionally_get_with_by_ref` for the
4386        // same key. So its async block will not be evaluated. By the time it
4387        // calls, task3's async block should have finished already, so its async
4388        // block will not be evaluated and will get the value insert by task3's
4389        // async block immediately.
4390        let task5 = {
4391            let cache5 = cache.clone();
4392            async move {
4393                // Wait for 800 ms before calling `optionally_get_with_by_ref`.
4394                sleep(Duration::from_millis(800)).await;
4395                let v = cache5
4396                    .optionally_get_with_by_ref(KEY, async { unreachable!() })
4397                    .await;
4398                assert_eq!(v.unwrap(), "task3");
4399            }
4400        };
4401
4402        // Task6 will call `get` for the same key. It will call when task1's async
4403        // block is still running, so it will get none for the key.
4404        let task6 = {
4405            let cache6 = cache.clone();
4406            async move {
4407                // Wait for 200 ms before calling `get`.
4408                sleep(Duration::from_millis(200)).await;
4409                let maybe_v = cache6.get(KEY).await;
4410                assert!(maybe_v.is_none());
4411            }
4412        };
4413
4414        // Task7 will call `get` for the same key. It will call after task1's async
4415        // block finished with an error. So it will get none for the key.
4416        let task7 = {
4417            let cache7 = cache.clone();
4418            async move {
4419                // Wait for 400 ms before calling `get`.
4420                sleep(Duration::from_millis(400)).await;
4421                let maybe_v = cache7.get(KEY).await;
4422                assert!(maybe_v.is_none());
4423            }
4424        };
4425
4426        // Task8 will call `get` for the same key. It will call after task3's async
4427        // block finished, so it will get the value insert by task3's async block.
4428        let task8 = {
4429            let cache8 = cache.clone();
4430            async move {
4431                // Wait for 800 ms before calling `get`.
4432                sleep(Duration::from_millis(800)).await;
4433                let maybe_v = cache8.get(KEY).await;
4434                assert_eq!(maybe_v, Some("task3"));
4435            }
4436        };
4437
4438        futures_util::join!(task1, task2, task3, task4, task5, task6, task7, task8);
4439
4440        assert!(cache.is_waiter_map_empty());
4441    }
4442
4443    #[tokio::test]
4444    async fn upsert_with() {
4445        let cache = Cache::new(100);
4446        const KEY: u32 = 0;
4447
4448        // Spawn three async tasks to call `and_upsert_with` for the same key and
4449        // each task increments the current value by 1. Ensure the key-level lock is
4450        // working by verifying the value is 3 after all tasks finish.
4451        //
4452        // |        |  task 1  |  task 2  |  task 3  |
4453        // |--------|----------|----------|----------|
4454        // |   0 ms | get none |          |          |
4455        // | 100 ms |          | blocked  |          |
4456        // | 200 ms | insert 1 |          |          |
4457        // |        |          | get 1    |          |
4458        // | 300 ms |          |          | blocked  |
4459        // | 400 ms |          | insert 2 |          |
4460        // |        |          |          | get 2    |
4461        // | 500 ms |          |          | insert 3 |
4462
4463        let task1 = {
4464            let cache1 = cache.clone();
4465            async move {
4466                cache1
4467                    .entry(KEY)
4468                    .and_upsert_with(|maybe_entry| async move {
4469                        sleep(Duration::from_millis(200)).await;
4470                        assert!(maybe_entry.is_none());
4471                        1
4472                    })
4473                    .await
4474            }
4475        };
4476
4477        let task2 = {
4478            let cache2 = cache.clone();
4479            async move {
4480                sleep(Duration::from_millis(100)).await;
4481                cache2
4482                    .entry_by_ref(&KEY)
4483                    .and_upsert_with(|maybe_entry| async move {
4484                        sleep(Duration::from_millis(200)).await;
4485                        let entry = maybe_entry.expect("The entry should exist");
4486                        entry.into_value() + 1
4487                    })
4488                    .await
4489            }
4490        };
4491
4492        let task3 = {
4493            let cache3 = cache.clone();
4494            async move {
4495                sleep(Duration::from_millis(300)).await;
4496                cache3
4497                    .entry_by_ref(&KEY)
4498                    .and_upsert_with(|maybe_entry| async move {
4499                        sleep(Duration::from_millis(100)).await;
4500                        let entry = maybe_entry.expect("The entry should exist");
4501                        entry.into_value() + 1
4502                    })
4503                    .await
4504            }
4505        };
4506
4507        let (ent1, ent2, ent3) = futures_util::join!(task1, task2, task3);
4508        assert_eq!(ent1.into_value(), 1);
4509        assert_eq!(ent2.into_value(), 2);
4510        assert_eq!(ent3.into_value(), 3);
4511
4512        assert_eq!(cache.get(&KEY).await, Some(3));
4513
4514        assert!(cache.is_waiter_map_empty());
4515    }
4516
4517    #[tokio::test]
4518    async fn compute_with() {
4519        use crate::ops::compute;
4520        use tokio::sync::RwLock;
4521
4522        let cache = Cache::new(100);
4523        const KEY: u32 = 0;
4524
4525        // Spawn six async tasks to call `and_compute_with` for the same key. Ensure
4526        // the key-level lock is working by verifying the value after all tasks
4527        // finish.
4528        //
4529        // |         |   task 1   |    task 2     |   task 3   |  task 4  |   task 5   | task 6  |
4530        // |---------|------------|---------------|------------|----------|------------|---------|
4531        // |    0 ms | get none   |               |            |          |            |         |
4532        // |  100 ms |            | blocked       |            |          |            |         |
4533        // |  200 ms | insert [1] |               |            |          |            |         |
4534        // |         |            | get [1]       |            |          |            |         |
4535        // |  300 ms |            |               | blocked    |          |            |         |
4536        // |  400 ms |            | insert [1, 2] |            |          |            |         |
4537        // |         |            |               | get [1, 2] |          |            |         |
4538        // |  500 ms |            |               |            | blocked  |            |         |
4539        // |  600 ms |            |               | remove     |          |            |         |
4540        // |         |            |               |            | get none |            |         |
4541        // |  700 ms |            |               |            |          | blocked    |         |
4542        // |  800 ms |            |               |            | nop      |            |         |
4543        // |         |            |               |            |          | get none   |         |
4544        // |  900 ms |            |               |            |          |            | blocked |
4545        // | 1000 ms |            |               |            |          | insert [5] |         |
4546        // |         |            |               |            |          |            | get [5] |
4547        // | 1100 ms |            |               |            |          |            | nop     |
4548
4549        let task1 = {
4550            let cache1 = cache.clone();
4551            async move {
4552                cache1
4553                    .entry(KEY)
4554                    .and_compute_with(|maybe_entry| async move {
4555                        sleep(Duration::from_millis(200)).await;
4556                        assert!(maybe_entry.is_none());
4557                        compute::Op::Put(Arc::new(RwLock::new(vec![1])))
4558                    })
4559                    .await
4560            }
4561        };
4562
4563        let task2 = {
4564            let cache2 = cache.clone();
4565            async move {
4566                sleep(Duration::from_millis(100)).await;
4567                cache2
4568                    .entry_by_ref(&KEY)
4569                    .and_compute_with(|maybe_entry| async move {
4570                        let entry = maybe_entry.expect("The entry should exist");
4571                        let value = entry.into_value();
4572                        assert_eq!(*value.read().await, vec![1]);
4573                        sleep(Duration::from_millis(200)).await;
4574                        value.write().await.push(2);
4575                        compute::Op::Put(value)
4576                    })
4577                    .await
4578            }
4579        };
4580
4581        let task3 = {
4582            let cache3 = cache.clone();
4583            async move {
4584                sleep(Duration::from_millis(300)).await;
4585                cache3
4586                    .entry(KEY)
4587                    .and_compute_with(|maybe_entry| async move {
4588                        let entry = maybe_entry.expect("The entry should exist");
4589                        let value = entry.into_value();
4590                        assert_eq!(*value.read().await, vec![1, 2]);
4591                        sleep(Duration::from_millis(200)).await;
4592                        compute::Op::Remove
4593                    })
4594                    .await
4595            }
4596        };
4597
4598        let task4 = {
4599            let cache4 = cache.clone();
4600            async move {
4601                sleep(Duration::from_millis(500)).await;
4602                cache4
4603                    .entry(KEY)
4604                    .and_compute_with(|maybe_entry| async move {
4605                        assert!(maybe_entry.is_none());
4606                        sleep(Duration::from_millis(200)).await;
4607                        compute::Op::Nop
4608                    })
4609                    .await
4610            }
4611        };
4612
4613        let task5 = {
4614            let cache5 = cache.clone();
4615            async move {
4616                sleep(Duration::from_millis(700)).await;
4617                cache5
4618                    .entry_by_ref(&KEY)
4619                    .and_compute_with(|maybe_entry| async move {
4620                        assert!(maybe_entry.is_none());
4621                        sleep(Duration::from_millis(200)).await;
4622                        compute::Op::Put(Arc::new(RwLock::new(vec![5])))
4623                    })
4624                    .await
4625            }
4626        };
4627
4628        let task6 = {
4629            let cache6 = cache.clone();
4630            async move {
4631                sleep(Duration::from_millis(900)).await;
4632                cache6
4633                    .entry_by_ref(&KEY)
4634                    .and_compute_with(|maybe_entry| async move {
4635                        let entry = maybe_entry.expect("The entry should exist");
4636                        let value = entry.into_value();
4637                        assert_eq!(*value.read().await, vec![5]);
4638                        sleep(Duration::from_millis(100)).await;
4639                        compute::Op::Nop
4640                    })
4641                    .await
4642            }
4643        };
4644
4645        let (res1, res2, res3, res4, res5, res6) =
4646            futures_util::join!(task1, task2, task3, task4, task5, task6);
4647
4648        let compute::CompResult::Inserted(entry) = res1 else {
4649            panic!("Expected `Inserted`. Got {res1:?}")
4650        };
4651        assert_eq!(
4652            *entry.into_value().read().await,
4653            vec![1, 2] // The same Vec was modified by task2.
4654        );
4655
4656        let compute::CompResult::ReplacedWith(entry) = res2 else {
4657            panic!("Expected `ReplacedWith`. Got {res2:?}")
4658        };
4659        assert_eq!(*entry.into_value().read().await, vec![1, 2]);
4660
4661        let compute::CompResult::Removed(entry) = res3 else {
4662            panic!("Expected `Removed`. Got {res3:?}")
4663        };
4664        assert_eq!(*entry.into_value().read().await, vec![1, 2]);
4665
4666        let compute::CompResult::StillNone(key) = res4 else {
4667            panic!("Expected `StillNone`. Got {res4:?}")
4668        };
4669        assert_eq!(*key, KEY);
4670
4671        let compute::CompResult::Inserted(entry) = res5 else {
4672            panic!("Expected `Inserted`. Got {res5:?}")
4673        };
4674        assert_eq!(*entry.into_value().read().await, vec![5]);
4675
4676        let compute::CompResult::Unchanged(entry) = res6 else {
4677            panic!("Expected `Unchanged`. Got {res6:?}")
4678        };
4679        assert_eq!(*entry.into_value().read().await, vec![5]);
4680
4681        assert!(cache.is_waiter_map_empty());
4682    }
4683
4684    #[tokio::test]
4685    async fn try_compute_with() {
4686        use crate::ops::compute;
4687        use tokio::sync::RwLock;
4688
4689        let cache: Cache<u32, Arc<RwLock<Vec<i32>>>> = Cache::new(100);
4690        const KEY: u32 = 0;
4691
4692        // Spawn four async tasks to call `and_try_compute_with` for the same key.
4693        // Ensure the key-level lock is working by verifying the value after all
4694        // tasks finish.
4695        //
4696        // |         |   task 1   |    task 2     |   task 3   |  task 4    |
4697        // |---------|------------|---------------|------------|------------|
4698        // |    0 ms | get none   |               |            |            |
4699        // |  100 ms |            | blocked       |            |            |
4700        // |  200 ms | insert [1] |               |            |            |
4701        // |         |            | get [1]       |            |            |
4702        // |  300 ms |            |               | blocked    |            |
4703        // |  400 ms |            | insert [1, 2] |            |            |
4704        // |         |            |               | get [1, 2] |            |
4705        // |  500 ms |            |               |            | blocked    |
4706        // |  600 ms |            |               | err        |            |
4707        // |         |            |               |            | get [1, 2] |
4708        // |  700 ms |            |               |            | remove     |
4709        //
4710        // This test is shorter than `compute_with` test because this one omits `Nop`
4711        // cases.
4712
4713        let task1 = {
4714            let cache1 = cache.clone();
4715            async move {
4716                cache1
4717                    .entry(KEY)
4718                    .and_try_compute_with(|maybe_entry| async move {
4719                        sleep(Duration::from_millis(200)).await;
4720                        assert!(maybe_entry.is_none());
4721                        Ok(compute::Op::Put(Arc::new(RwLock::new(vec![1])))) as Result<_, ()>
4722                    })
4723                    .await
4724            }
4725        };
4726
4727        let task2 = {
4728            let cache2 = cache.clone();
4729            async move {
4730                sleep(Duration::from_millis(100)).await;
4731                cache2
4732                    .entry_by_ref(&KEY)
4733                    .and_try_compute_with(|maybe_entry| async move {
4734                        let entry = maybe_entry.expect("The entry should exist");
4735                        let value = entry.into_value();
4736                        assert_eq!(*value.read().await, vec![1]);
4737                        sleep(Duration::from_millis(200)).await;
4738                        value.write().await.push(2);
4739                        Ok(compute::Op::Put(value)) as Result<_, ()>
4740                    })
4741                    .await
4742            }
4743        };
4744
4745        let task3 = {
4746            let cache3 = cache.clone();
4747            async move {
4748                sleep(Duration::from_millis(300)).await;
4749                cache3
4750                    .entry(KEY)
4751                    .and_try_compute_with(|maybe_entry| async move {
4752                        let entry = maybe_entry.expect("The entry should exist");
4753                        let value = entry.into_value();
4754                        assert_eq!(*value.read().await, vec![1, 2]);
4755                        sleep(Duration::from_millis(200)).await;
4756                        Err(())
4757                    })
4758                    .await
4759            }
4760        };
4761
4762        let task4 = {
4763            let cache4 = cache.clone();
4764            async move {
4765                sleep(Duration::from_millis(500)).await;
4766                cache4
4767                    .entry(KEY)
4768                    .and_try_compute_with(|maybe_entry| async move {
4769                        let entry = maybe_entry.expect("The entry should exist");
4770                        let value = entry.into_value();
4771                        assert_eq!(*value.read().await, vec![1, 2]);
4772                        sleep(Duration::from_millis(100)).await;
4773                        Ok(compute::Op::Remove) as Result<_, ()>
4774                    })
4775                    .await
4776            }
4777        };
4778
4779        let (res1, res2, res3, res4) = futures_util::join!(task1, task2, task3, task4);
4780
4781        let Ok(compute::CompResult::Inserted(entry)) = res1 else {
4782            panic!("Expected `Inserted`. Got {res1:?}")
4783        };
4784        assert_eq!(
4785            *entry.into_value().read().await,
4786            vec![1, 2] // The same Vec was modified by task2.
4787        );
4788
4789        let Ok(compute::CompResult::ReplacedWith(entry)) = res2 else {
4790            panic!("Expected `ReplacedWith`. Got {res2:?}")
4791        };
4792        assert_eq!(*entry.into_value().read().await, vec![1, 2]);
4793
4794        assert!(res3.is_err());
4795
4796        let Ok(compute::CompResult::Removed(entry)) = res4 else {
4797            panic!("Expected `Removed`. Got {res4:?}")
4798        };
4799        assert_eq!(
4800            *entry.into_value().read().await,
4801            vec![1, 2] // Removed value.
4802        );
4803
4804        assert!(cache.is_waiter_map_empty());
4805    }
4806
4807    #[tokio::test]
4808    // https://github.com/moka-rs/moka/issues/43
4809    async fn handle_panic_in_get_with() {
4810        use tokio::time::{sleep, Duration};
4811
4812        let cache = Cache::new(16);
4813        let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
4814        {
4815            let cache_ref = cache.clone();
4816            let semaphore_ref = semaphore.clone();
4817            tokio::task::spawn(async move {
4818                let _ = cache_ref
4819                    .get_with(1, async move {
4820                        semaphore_ref.add_permits(1);
4821                        sleep(Duration::from_millis(50)).await;
4822                        panic!("Panic during try_get_with");
4823                    })
4824                    .await;
4825            });
4826        }
4827        let _ = semaphore.acquire().await.expect("semaphore acquire failed");
4828        assert_eq!(cache.get_with(1, async { 5 }).await, 5);
4829    }
4830
4831    #[tokio::test]
4832    // https://github.com/moka-rs/moka/issues/43
4833    async fn handle_panic_in_try_get_with() {
4834        use tokio::time::{sleep, Duration};
4835
4836        let cache = Cache::new(16);
4837        let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
4838        {
4839            let cache_ref = cache.clone();
4840            let semaphore_ref = semaphore.clone();
4841            tokio::task::spawn(async move {
4842                let _ = cache_ref
4843                    .try_get_with(1, async move {
4844                        semaphore_ref.add_permits(1);
4845                        sleep(Duration::from_millis(50)).await;
4846                        panic!("Panic during try_get_with");
4847                    })
4848                    .await as Result<_, Arc<Infallible>>;
4849            });
4850        }
4851        let _ = semaphore.acquire().await.expect("semaphore acquire failed");
4852        assert_eq!(
4853            cache.try_get_with(1, async { Ok(5) }).await as Result<_, Arc<Infallible>>,
4854            Ok(5)
4855        );
4856
4857        assert!(cache.is_waiter_map_empty());
4858    }
4859
4860    #[tokio::test]
4861    // https://github.com/moka-rs/moka/issues/59
4862    async fn abort_get_with() {
4863        use tokio::time::{sleep, Duration};
4864
4865        let cache = Cache::new(16);
4866        let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
4867
4868        let handle;
4869        {
4870            let cache_ref = cache.clone();
4871            let semaphore_ref = semaphore.clone();
4872
4873            handle = tokio::task::spawn(async move {
4874                let _ = cache_ref
4875                    .get_with(1, async move {
4876                        semaphore_ref.add_permits(1);
4877                        sleep(Duration::from_millis(50)).await;
4878                        unreachable!();
4879                    })
4880                    .await;
4881            });
4882        }
4883
4884        let _ = semaphore.acquire().await.expect("semaphore acquire failed");
4885        handle.abort();
4886
4887        assert_eq!(cache.get_with(1, async { 5 }).await, 5);
4888
4889        assert!(cache.is_waiter_map_empty());
4890    }
4891
4892    #[tokio::test]
4893    // https://github.com/moka-rs/moka/issues/59
4894    async fn abort_try_get_with() {
4895        use tokio::time::{sleep, Duration};
4896
4897        let cache = Cache::new(16);
4898        let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
4899
4900        let handle;
4901        {
4902            let cache_ref = cache.clone();
4903            let semaphore_ref = semaphore.clone();
4904
4905            handle = tokio::task::spawn(async move {
4906                let _ = cache_ref
4907                    .try_get_with(1, async move {
4908                        semaphore_ref.add_permits(1);
4909                        sleep(Duration::from_millis(50)).await;
4910                        unreachable!();
4911                    })
4912                    .await as Result<_, Arc<Infallible>>;
4913            });
4914        }
4915
4916        let _ = semaphore.acquire().await.expect("semaphore acquire failed");
4917        handle.abort();
4918
4919        assert_eq!(
4920            cache.try_get_with(1, async { Ok(5) }).await as Result<_, Arc<Infallible>>,
4921            Ok(5)
4922        );
4923
4924        assert!(cache.is_waiter_map_empty());
4925    }
4926
4927    #[tokio::test]
4928    async fn test_removal_notifications() {
4929        // The following `Vec`s will hold actual and expected notifications.
4930        let actual = Arc::new(Mutex::new(Vec::new()));
4931        let mut expected = Vec::new();
4932
4933        // Create an eviction listener.
4934        let a1 = Arc::clone(&actual);
4935        let listener = move |k, v, cause| -> ListenerFuture {
4936            let a2 = Arc::clone(&a1);
4937            async move {
4938                a2.lock().await.push((k, v, cause));
4939            }
4940            .boxed()
4941        };
4942
4943        // Create a cache with the eviction listener.
4944        let mut cache = Cache::builder()
4945            .max_capacity(3)
4946            .async_eviction_listener(listener)
4947            .build();
4948        cache.reconfigure_for_testing().await;
4949
4950        // Make the cache exterior immutable.
4951        let cache = cache;
4952
4953        cache.insert('a', "alice").await;
4954        cache.invalidate(&'a').await;
4955        expected.push((Arc::new('a'), "alice", RemovalCause::Explicit));
4956
4957        cache.run_pending_tasks().await;
4958        assert_eq!(cache.entry_count(), 0);
4959
4960        cache.insert('b', "bob").await;
4961        cache.insert('c', "cathy").await;
4962        cache.insert('d', "david").await;
4963        cache.run_pending_tasks().await;
4964        assert_eq!(cache.entry_count(), 3);
4965
4966        // This will be rejected due to the size constraint.
4967        cache.insert('e', "emily").await;
4968        expected.push((Arc::new('e'), "emily", RemovalCause::Size));
4969        cache.run_pending_tasks().await;
4970        assert_eq!(cache.entry_count(), 3);
4971
4972        // Raise the popularity of 'e' so it will be accepted next time.
4973        cache.get(&'e').await;
4974        cache.run_pending_tasks().await;
4975
4976        // Retry.
4977        cache.insert('e', "eliza").await;
4978        // and the LRU entry will be evicted.
4979        expected.push((Arc::new('b'), "bob", RemovalCause::Size));
4980        cache.run_pending_tasks().await;
4981        assert_eq!(cache.entry_count(), 3);
4982
4983        // Replace an existing entry.
4984        cache.insert('d', "dennis").await;
4985        expected.push((Arc::new('d'), "david", RemovalCause::Replaced));
4986        cache.run_pending_tasks().await;
4987        assert_eq!(cache.entry_count(), 3);
4988
4989        verify_notification_vec(&cache, actual, &expected).await;
4990    }
4991
4992    #[tokio::test]
4993    async fn test_removal_notifications_with_updates() {
4994        // The following `Vec`s will hold actual and expected notifications.
4995        let actual = Arc::new(Mutex::new(Vec::new()));
4996        let mut expected = Vec::new();
4997
4998        // Create an eviction listener.
4999        let a1 = Arc::clone(&actual);
5000        let listener = move |k, v, cause| -> ListenerFuture {
5001            let a2 = Arc::clone(&a1);
5002            async move {
5003                a2.lock().await.push((k, v, cause));
5004            }
5005            .boxed()
5006        };
5007
5008        let (clock, mock) = Clock::mock();
5009
5010        // Create a cache with the eviction listener and also TTL and TTI.
5011        let mut cache = Cache::builder()
5012            .async_eviction_listener(listener)
5013            .time_to_live(Duration::from_secs(7))
5014            .time_to_idle(Duration::from_secs(5))
5015            .clock(clock)
5016            .build();
5017        cache.reconfigure_for_testing().await;
5018
5019        // Make the cache exterior immutable.
5020        let cache = cache;
5021
5022        cache.insert("alice", "a0").await;
5023        cache.run_pending_tasks().await;
5024
5025        // Now alice (a0) has been expired by the idle timeout (TTI).
5026        mock.increment(Duration::from_secs(6));
5027        expected.push((Arc::new("alice"), "a0", RemovalCause::Expired));
5028        assert_eq!(cache.get(&"alice").await, None);
5029
5030        // We have not ran sync after the expiration of alice (a0), so it is
5031        // still in the cache.
5032        assert_eq!(cache.entry_count(), 1);
5033
5034        // Re-insert alice with a different value. Since alice (a0) is still
5035        // in the cache, this is actually a replace operation rather than an
5036        // insert operation. We want to verify that the RemovalCause of a0 is
5037        // Expired, not Replaced.
5038        cache.insert("alice", "a1").await;
5039        cache.run_pending_tasks().await;
5040
5041        mock.increment(Duration::from_secs(4));
5042        assert_eq!(cache.get(&"alice").await, Some("a1"));
5043        cache.run_pending_tasks().await;
5044
5045        // Now alice has been expired by time-to-live (TTL).
5046        mock.increment(Duration::from_secs(4));
5047        expected.push((Arc::new("alice"), "a1", RemovalCause::Expired));
5048        assert_eq!(cache.get(&"alice").await, None);
5049
5050        // But, again, it is still in the cache.
5051        assert_eq!(cache.entry_count(), 1);
5052
5053        // Re-insert alice with a different value and verify that the
5054        // RemovalCause of a1 is Expired (not Replaced).
5055        cache.insert("alice", "a2").await;
5056        cache.run_pending_tasks().await;
5057
5058        assert_eq!(cache.entry_count(), 1);
5059
5060        // Now alice (a2) has been expired by the idle timeout.
5061        mock.increment(Duration::from_secs(6));
5062        expected.push((Arc::new("alice"), "a2", RemovalCause::Expired));
5063        assert_eq!(cache.get(&"alice").await, None);
5064        assert_eq!(cache.entry_count(), 1);
5065
5066        // This invalidate will internally remove alice (a2).
5067        cache.invalidate(&"alice").await;
5068        cache.run_pending_tasks().await;
5069        assert_eq!(cache.entry_count(), 0);
5070
5071        // Re-insert, and this time, make it expired by the TTL.
5072        cache.insert("alice", "a3").await;
5073        cache.run_pending_tasks().await;
5074        mock.increment(Duration::from_secs(4));
5075        assert_eq!(cache.get(&"alice").await, Some("a3"));
5076        cache.run_pending_tasks().await;
5077        mock.increment(Duration::from_secs(4));
5078        expected.push((Arc::new("alice"), "a3", RemovalCause::Expired));
5079        assert_eq!(cache.get(&"alice").await, None);
5080        assert_eq!(cache.entry_count(), 1);
5081
5082        // This invalidate will internally remove alice (a2).
5083        cache.invalidate(&"alice").await;
5084        cache.run_pending_tasks().await;
5085        assert_eq!(cache.entry_count(), 0);
5086
5087        verify_notification_vec(&cache, actual, &expected).await;
5088    }
5089
5090    // When the eviction listener is not set, calling `run_pending_tasks` once should
5091    // evict all entries that can be removed.
5092    #[tokio::test]
5093    async fn no_batch_size_limit_on_eviction() {
5094        const MAX_CAPACITY: u64 = 20;
5095
5096        const EVICTION_TIMEOUT: Duration = Duration::from_nanos(0);
5097        const MAX_LOG_SYNC_REPEATS: u32 = 1;
5098        const EVICTION_BATCH_SIZE: u32 = 1;
5099
5100        let hk_conf = HousekeeperConfig::new(
5101            // Timeout should be ignored when the eviction listener is not provided.
5102            Some(EVICTION_TIMEOUT),
5103            Some(MAX_LOG_SYNC_REPEATS),
5104            Some(EVICTION_BATCH_SIZE),
5105        );
5106
5107        // Create a cache with the LRU policy.
5108        let mut cache = Cache::builder()
5109            .max_capacity(MAX_CAPACITY)
5110            .eviction_policy(EvictionPolicy::lru())
5111            .housekeeper_config(hk_conf)
5112            .build();
5113        cache.reconfigure_for_testing().await;
5114
5115        // Make the cache exterior immutable.
5116        let cache = cache;
5117
5118        // Fill the cache.
5119        for i in 0..MAX_CAPACITY {
5120            let v = format!("v{i}");
5121            cache.insert(i, v).await
5122        }
5123        // The max capacity should not change because we have not called
5124        // `run_pending_tasks` yet.
5125        assert_eq!(cache.entry_count(), 0);
5126
5127        cache.run_pending_tasks().await;
5128        assert_eq!(cache.entry_count(), MAX_CAPACITY);
5129
5130        // Insert more items to the cache.
5131        for i in MAX_CAPACITY..(MAX_CAPACITY * 2) {
5132            let v = format!("v{i}");
5133            cache.insert(i, v).await
5134        }
5135        // The max capacity should not change because we have not called
5136        // `run_pending_tasks` yet.
5137        assert_eq!(cache.entry_count(), MAX_CAPACITY);
5138        // Both old and new keys should exist.
5139        assert!(cache.contains_key(&0)); // old
5140        assert!(cache.contains_key(&(MAX_CAPACITY - 1))); // old
5141        assert!(cache.contains_key(&(MAX_CAPACITY * 2 - 1))); // new
5142
5143        // Process the remaining write op logs (there should be MAX_CAPACITY logs),
5144        // and evict the LRU entries.
5145        cache.run_pending_tasks().await;
5146        assert_eq!(cache.entry_count(), MAX_CAPACITY);
5147
5148        // Now all the old keys should be gone.
5149        assert!(!cache.contains_key(&0));
5150        assert!(!cache.contains_key(&(MAX_CAPACITY - 1)));
5151        // And the new keys should exist.
5152        assert!(cache.contains_key(&(MAX_CAPACITY * 2 - 1)));
5153    }
5154
5155    #[tokio::test]
5156    async fn slow_eviction_listener() {
5157        const MAX_CAPACITY: u64 = 20;
5158
5159        const EVICTION_TIMEOUT: Duration = Duration::from_millis(30);
5160        const LISTENER_DELAY: Duration = Duration::from_millis(11);
5161        const MAX_LOG_SYNC_REPEATS: u32 = 1;
5162        const EVICTION_BATCH_SIZE: u32 = 1;
5163
5164        let hk_conf = HousekeeperConfig::new(
5165            Some(EVICTION_TIMEOUT),
5166            Some(MAX_LOG_SYNC_REPEATS),
5167            Some(EVICTION_BATCH_SIZE),
5168        );
5169
5170        let (clock, mock) = Clock::mock();
5171        let listener_call_count = Arc::new(AtomicU8::new(0));
5172        let lcc = Arc::clone(&listener_call_count);
5173
5174        // A slow eviction listener that spend `LISTENER_DELAY` to process a removal
5175        // notification.
5176        let listener = move |_k, _v, _cause| {
5177            mock.increment(LISTENER_DELAY);
5178            lcc.fetch_add(1, Ordering::AcqRel);
5179        };
5180
5181        // Create a cache with the LRU policy.
5182        let mut cache = Cache::builder()
5183            .max_capacity(MAX_CAPACITY)
5184            .eviction_policy(EvictionPolicy::lru())
5185            .eviction_listener(listener)
5186            .housekeeper_config(hk_conf)
5187            .clock(clock)
5188            .build();
5189        cache.reconfigure_for_testing().await;
5190
5191        // Make the cache exterior immutable.
5192        let cache = cache;
5193
5194        // Fill the cache.
5195        for i in 0..MAX_CAPACITY {
5196            let v = format!("v{i}");
5197            cache.insert(i, v).await
5198        }
5199        // The max capacity should not change because we have not called
5200        // `run_pending_tasks` yet.
5201        assert_eq!(cache.entry_count(), 0);
5202
5203        cache.run_pending_tasks().await;
5204        assert_eq!(listener_call_count.load(Ordering::Acquire), 0);
5205        assert_eq!(cache.entry_count(), MAX_CAPACITY);
5206
5207        // Insert more items to the cache.
5208        for i in MAX_CAPACITY..(MAX_CAPACITY * 2) {
5209            let v = format!("v{i}");
5210            cache.insert(i, v).await
5211        }
5212        assert_eq!(cache.entry_count(), MAX_CAPACITY);
5213
5214        cache.run_pending_tasks().await;
5215        // Because of the slow listener, cache should get an over capacity.
5216        let mut expected_call_count = 3;
5217        assert_eq!(
5218            listener_call_count.load(Ordering::Acquire) as u64,
5219            expected_call_count
5220        );
5221        assert_eq!(cache.entry_count(), MAX_CAPACITY * 2 - expected_call_count);
5222
5223        loop {
5224            cache.run_pending_tasks().await;
5225
5226            expected_call_count += 3;
5227            if expected_call_count > MAX_CAPACITY {
5228                expected_call_count = MAX_CAPACITY;
5229            }
5230
5231            let actual_count = listener_call_count.load(Ordering::Acquire) as u64;
5232            assert_eq!(actual_count, expected_call_count);
5233            let expected_entry_count = MAX_CAPACITY * 2 - expected_call_count;
5234            assert_eq!(cache.entry_count(), expected_entry_count);
5235
5236            if expected_call_count >= MAX_CAPACITY {
5237                break;
5238            }
5239        }
5240
5241        assert_eq!(cache.entry_count(), MAX_CAPACITY);
5242    }
5243
5244    // NOTE: To enable the panic logging, run the following command:
5245    //
5246    // RUST_LOG=moka=info cargo test --features 'future, logging' -- \
5247    //   future::cache::tests::recover_from_panicking_eviction_listener --exact --nocapture
5248    //
5249    #[tokio::test]
5250    async fn recover_from_panicking_eviction_listener() {
5251        #[cfg(feature = "logging")]
5252        let _ = env_logger::builder().is_test(true).try_init();
5253
5254        // The following `Vec`s will hold actual and expected notifications.
5255        let actual = Arc::new(Mutex::new(Vec::new()));
5256        let mut expected = Vec::new();
5257
5258        // Create an eviction listener that panics when it see
5259        // a value "panic now!".
5260        let a1 = Arc::clone(&actual);
5261        let listener = move |k, v, cause| -> ListenerFuture {
5262            let a2 = Arc::clone(&a1);
5263            async move {
5264                if v == "panic now!" {
5265                    panic!("Panic now!");
5266                }
5267                a2.lock().await.push((k, v, cause));
5268            }
5269            .boxed()
5270        };
5271
5272        // Create a cache with the eviction listener.
5273        let mut cache = Cache::builder()
5274            .name("My Future Cache")
5275            .async_eviction_listener(listener)
5276            .build();
5277        cache.reconfigure_for_testing().await;
5278
5279        // Make the cache exterior immutable.
5280        let cache = cache;
5281
5282        // Insert an okay value.
5283        cache.insert("alice", "a0").await;
5284        cache.run_pending_tasks().await;
5285
5286        // Insert a value that will cause the eviction listener to panic.
5287        cache.insert("alice", "panic now!").await;
5288        expected.push((Arc::new("alice"), "a0", RemovalCause::Replaced));
5289        cache.run_pending_tasks().await;
5290
5291        // Insert an okay value. This will replace the previous
5292        // value "panic now!" so the eviction listener will panic.
5293        cache.insert("alice", "a2").await;
5294        cache.run_pending_tasks().await;
5295        // No more removal notification should be sent.
5296
5297        // Invalidate the okay value.
5298        cache.invalidate(&"alice").await;
5299        cache.run_pending_tasks().await;
5300
5301        verify_notification_vec(&cache, actual, &expected).await;
5302    }
5303
5304    #[tokio::test]
5305    async fn cancel_future_while_running_pending_tasks() {
5306        use crate::future::FutureExt;
5307        use futures_util::future::poll_immediate;
5308        use tokio::task::yield_now;
5309
5310        let listener_initiation_count: Arc<AtomicU32> = Default::default();
5311        let listener_completion_count: Arc<AtomicU32> = Default::default();
5312
5313        let listener = {
5314            // Variables to capture.
5315            let init_count = Arc::clone(&listener_initiation_count);
5316            let comp_count = Arc::clone(&listener_completion_count);
5317
5318            // Our eviction listener closure.
5319            move |_k, _v, _r| {
5320                init_count.fetch_add(1, Ordering::AcqRel);
5321                let comp_count1 = Arc::clone(&comp_count);
5322
5323                async move {
5324                    yield_now().await;
5325                    comp_count1.fetch_add(1, Ordering::AcqRel);
5326                }
5327                .boxed()
5328            }
5329        };
5330
5331        let (clock, mock) = Clock::mock();
5332
5333        let mut cache: Cache<u32, u32> = Cache::builder()
5334            .time_to_live(Duration::from_millis(10))
5335            .async_eviction_listener(listener)
5336            .clock(clock)
5337            .build();
5338
5339        cache.reconfigure_for_testing().await;
5340
5341        // Make the cache exterior immutable.
5342        let cache = cache;
5343
5344        cache.insert(1, 1).await;
5345        assert_eq!(cache.run_pending_tasks_initiation_count(), 0);
5346        assert_eq!(cache.run_pending_tasks_completion_count(), 0);
5347
5348        // Key 1 is not yet expired.
5349        mock.increment(Duration::from_millis(7));
5350
5351        cache.run_pending_tasks().await;
5352        assert_eq!(cache.run_pending_tasks_initiation_count(), 1);
5353        assert_eq!(cache.run_pending_tasks_completion_count(), 1);
5354        assert_eq!(listener_initiation_count.load(Ordering::Acquire), 0);
5355        assert_eq!(listener_completion_count.load(Ordering::Acquire), 0);
5356
5357        // Now key 1 is expired, so the eviction listener should be called when we
5358        // call run_pending_tasks() and poll the returned future.
5359        mock.increment(Duration::from_millis(7));
5360
5361        let fut = cache.run_pending_tasks();
5362        // Poll the fut only once, and drop it. The fut should not be completed (so
5363        // it is cancelled) because the eviction listener performed a yield_now().
5364        assert!(poll_immediate(fut).await.is_none());
5365
5366        // The task is initiated but not completed.
5367        assert_eq!(cache.run_pending_tasks_initiation_count(), 2);
5368        assert_eq!(cache.run_pending_tasks_completion_count(), 1);
5369        // The listener is initiated but not completed.
5370        assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
5371        assert_eq!(listener_completion_count.load(Ordering::Acquire), 0);
5372
5373        // This will resume the task and the listener, and continue polling
5374        // until complete.
5375        cache.run_pending_tasks().await;
5376        // Now the task is completed.
5377        assert_eq!(cache.run_pending_tasks_initiation_count(), 2);
5378        assert_eq!(cache.run_pending_tasks_completion_count(), 2);
5379        // Now the listener is completed.
5380        assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
5381        assert_eq!(listener_completion_count.load(Ordering::Acquire), 1);
5382    }
5383
5384    #[tokio::test]
5385    async fn cancel_future_while_calling_eviction_listener() {
5386        use crate::future::FutureExt;
5387        use futures_util::future::poll_immediate;
5388        use tokio::task::yield_now;
5389
5390        let listener_initiation_count: Arc<AtomicU32> = Default::default();
5391        let listener_completion_count: Arc<AtomicU32> = Default::default();
5392
5393        let listener = {
5394            // Variables to capture.
5395            let init_count = Arc::clone(&listener_initiation_count);
5396            let comp_count = Arc::clone(&listener_completion_count);
5397
5398            // Our eviction listener closure.
5399            move |_k, _v, _r| {
5400                init_count.fetch_add(1, Ordering::AcqRel);
5401                let comp_count1 = Arc::clone(&comp_count);
5402
5403                async move {
5404                    yield_now().await;
5405                    comp_count1.fetch_add(1, Ordering::AcqRel);
5406                }
5407                .boxed()
5408            }
5409        };
5410
5411        let mut cache: Cache<u32, u32> = Cache::builder()
5412            .time_to_live(Duration::from_millis(10))
5413            .async_eviction_listener(listener)
5414            .build();
5415
5416        cache.reconfigure_for_testing().await;
5417
5418        // Make the cache exterior immutable.
5419        let cache = cache;
5420
5421        // ------------------------------------------------------------
5422        // Interrupt the eviction listener while calling `insert`
5423        // ------------------------------------------------------------
5424
5425        cache.insert(1, 1).await;
5426
5427        let fut = cache.insert(1, 2);
5428        // Poll the fut only once, and drop it. The fut should not be completed (so
5429        // it is cancelled) because the eviction listener performed a yield_now().
5430        assert!(poll_immediate(fut).await.is_none());
5431
5432        // The listener is initiated but not completed.
5433        assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
5434        assert_eq!(listener_completion_count.load(Ordering::Acquire), 0);
5435
5436        // This will call retry_interrupted_ops() and resume the interrupted
5437        // listener.
5438        cache.run_pending_tasks().await;
5439        assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
5440        assert_eq!(listener_completion_count.load(Ordering::Acquire), 1);
5441
5442        // ------------------------------------------------------------
5443        // Interrupt the eviction listener while calling `invalidate`
5444        // ------------------------------------------------------------
5445
5446        let fut = cache.invalidate(&1);
5447        // Cancel the fut after one poll.
5448        assert!(poll_immediate(fut).await.is_none());
5449        // The listener is initiated but not completed.
5450        assert_eq!(listener_initiation_count.load(Ordering::Acquire), 2);
5451        assert_eq!(listener_completion_count.load(Ordering::Acquire), 1);
5452
5453        // This will call retry_interrupted_ops() and resume the interrupted
5454        // listener.
5455        cache.get(&99).await;
5456        assert_eq!(listener_initiation_count.load(Ordering::Acquire), 2);
5457        assert_eq!(listener_completion_count.load(Ordering::Acquire), 2);
5458
5459        // ------------------------------------------------------------
5460        // Ensure retry_interrupted_ops() is called
5461        // ------------------------------------------------------------
5462
5463        // Repeat the same test with `insert`, but this time, call different methods
5464        // to ensure retry_interrupted_ops() is called.
5465        let prepare = || async {
5466            cache.invalidate(&1).await;
5467
5468            // Reset the counters.
5469            listener_initiation_count.store(0, Ordering::Release);
5470            listener_completion_count.store(0, Ordering::Release);
5471
5472            cache.insert(1, 1).await;
5473
5474            let fut = cache.insert(1, 2);
5475            // Poll the fut only once, and drop it. The fut should not be completed (so
5476            // it is cancelled) because the eviction listener performed a yield_now().
5477            assert!(poll_immediate(fut).await.is_none());
5478
5479            // The listener is initiated but not completed.
5480            assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
5481            assert_eq!(listener_completion_count.load(Ordering::Acquire), 0);
5482        };
5483
5484        // Methods to test:
5485        //
5486        // - run_pending_tasks (Already tested in a previous test)
5487        // - get               (Already tested in a previous test)
5488        // - insert
5489        // - invalidate
5490        // - remove
5491
5492        // insert
5493        prepare().await;
5494        cache.insert(99, 99).await;
5495        assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
5496        assert_eq!(listener_completion_count.load(Ordering::Acquire), 1);
5497
5498        // invalidate
5499        prepare().await;
5500        cache.invalidate(&88).await;
5501        assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
5502        assert_eq!(listener_completion_count.load(Ordering::Acquire), 1);
5503
5504        // remove
5505        prepare().await;
5506        cache.remove(&77).await;
5507        assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
5508        assert_eq!(listener_completion_count.load(Ordering::Acquire), 1);
5509    }
5510
5511    #[tokio::test]
5512    async fn cancel_future_while_scheduling_write_op() {
5513        use futures_util::future::poll_immediate;
5514
5515        let mut cache: Cache<u32, u32> = Cache::builder().build();
5516        cache.reconfigure_for_testing().await;
5517
5518        // Make the cache exterior immutable.
5519        let cache = cache;
5520
5521        // --------------------------------------------------------------
5522        // Interrupt `insert` while blocking in `schedule_write_op`
5523        // --------------------------------------------------------------
5524
5525        cache
5526            .schedule_write_op_should_block
5527            .store(true, Ordering::Release);
5528        let fut = cache.insert(1, 1);
5529        // Poll the fut only once, and drop it. The fut should not be completed (so
5530        // it is cancelled) because schedule_write_op should be awaiting for a lock.
5531        assert!(poll_immediate(fut).await.is_none());
5532
5533        assert_eq!(cache.base.interrupted_op_ch_snd.len(), 1);
5534        assert_eq!(cache.base.write_op_ch.len(), 0);
5535
5536        // This should retry the interrupted operation.
5537        cache
5538            .schedule_write_op_should_block
5539            .store(false, Ordering::Release);
5540        cache.get(&99).await;
5541        assert_eq!(cache.base.interrupted_op_ch_snd.len(), 0);
5542        assert_eq!(cache.base.write_op_ch.len(), 1);
5543
5544        cache.run_pending_tasks().await;
5545        assert_eq!(cache.base.write_op_ch.len(), 0);
5546
5547        // --------------------------------------------------------------
5548        // Interrupt `invalidate` while blocking in `schedule_write_op`
5549        // --------------------------------------------------------------
5550
5551        cache
5552            .schedule_write_op_should_block
5553            .store(true, Ordering::Release);
5554        let fut = cache.invalidate(&1);
5555        // Poll the fut only once, and drop it. The fut should not be completed (so
5556        // it is cancelled) because schedule_write_op should be awaiting for a lock.
5557        assert!(poll_immediate(fut).await.is_none());
5558
5559        assert_eq!(cache.base.interrupted_op_ch_snd.len(), 1);
5560        assert_eq!(cache.base.write_op_ch.len(), 0);
5561
5562        // This should retry the interrupted operation.
5563        cache
5564            .schedule_write_op_should_block
5565            .store(false, Ordering::Release);
5566        cache.get(&99).await;
5567        assert_eq!(cache.base.interrupted_op_ch_snd.len(), 0);
5568        assert_eq!(cache.base.write_op_ch.len(), 1);
5569
5570        cache.run_pending_tasks().await;
5571        assert_eq!(cache.base.write_op_ch.len(), 0);
5572    }
5573
5574    // This test ensures that the `contains_key`, `get` and `invalidate` can use
5575    // borrowed form `&[u8]` for key with type `Vec<u8>`.
5576    // https://github.com/moka-rs/moka/issues/166
5577    #[tokio::test]
5578    async fn borrowed_forms_of_key() {
5579        let cache: Cache<Vec<u8>, ()> = Cache::new(1);
5580
5581        let key = vec![1_u8];
5582        cache.insert(key.clone(), ()).await;
5583
5584        // key as &Vec<u8>
5585        let key_v: &Vec<u8> = &key;
5586        assert!(cache.contains_key(key_v));
5587        assert_eq!(cache.get(key_v).await, Some(()));
5588        cache.invalidate(key_v).await;
5589
5590        cache.insert(key, ()).await;
5591
5592        // key as &[u8]
5593        let key_s: &[u8] = &[1_u8];
5594        assert!(cache.contains_key(key_s));
5595        assert_eq!(cache.get(key_s).await, Some(()));
5596        cache.invalidate(key_s).await;
5597    }
5598
5599    #[tokio::test]
5600    async fn drop_value_immediately_after_eviction() {
5601        use crate::common::test_utils::{Counters, Value};
5602
5603        const MAX_CAPACITY: u32 = 500;
5604        const KEYS: u32 = ((MAX_CAPACITY as f64) * 1.2) as u32;
5605
5606        let counters = Arc::new(Counters::default());
5607        let counters1 = Arc::clone(&counters);
5608
5609        let listener = move |_k, _v, cause| match cause {
5610            RemovalCause::Size => counters1.incl_evicted(),
5611            RemovalCause::Explicit => counters1.incl_invalidated(),
5612            _ => (),
5613        };
5614
5615        let mut cache = Cache::builder()
5616            .max_capacity(MAX_CAPACITY as u64)
5617            .eviction_listener(listener)
5618            .build();
5619        cache.reconfigure_for_testing().await;
5620
5621        // Make the cache exterior immutable.
5622        let cache = cache;
5623
5624        for key in 0..KEYS {
5625            let value = Arc::new(Value::new(vec![0u8; 1024], &counters));
5626            cache.insert(key, value).await;
5627            counters.incl_inserted();
5628            cache.run_pending_tasks().await;
5629        }
5630
5631        let eviction_count = KEYS - MAX_CAPACITY;
5632
5633        // Retries will be needed when testing in a QEMU VM.
5634        const MAX_RETRIES: usize = 5;
5635        let mut retries = 0;
5636        loop {
5637            // Ensure all scheduled notifications have been processed.
5638            std::thread::sleep(Duration::from_millis(500));
5639
5640            if counters.evicted() != eviction_count || counters.value_dropped() != eviction_count {
5641                if retries <= MAX_RETRIES {
5642                    retries += 1;
5643                    cache.run_pending_tasks().await;
5644                    continue;
5645                } else {
5646                    assert_eq!(counters.evicted(), eviction_count, "Retries exhausted");
5647                    assert_eq!(
5648                        counters.value_dropped(),
5649                        eviction_count,
5650                        "Retries exhausted"
5651                    );
5652                }
5653            }
5654
5655            assert_eq!(counters.inserted(), KEYS, "inserted");
5656            assert_eq!(counters.value_created(), KEYS, "value_created");
5657            assert_eq!(counters.evicted(), eviction_count, "evicted");
5658            assert_eq!(counters.invalidated(), 0, "invalidated");
5659            assert_eq!(counters.value_dropped(), eviction_count, "value_dropped");
5660
5661            break;
5662        }
5663
5664        for key in 0..KEYS {
5665            cache.invalidate(&key).await;
5666            cache.run_pending_tasks().await;
5667        }
5668
5669        let mut retries = 0;
5670        loop {
5671            // Ensure all scheduled notifications have been processed.
5672            std::thread::sleep(Duration::from_millis(500));
5673
5674            if counters.invalidated() != MAX_CAPACITY || counters.value_dropped() != KEYS {
5675                if retries <= MAX_RETRIES {
5676                    retries += 1;
5677                    cache.run_pending_tasks().await;
5678                    continue;
5679                } else {
5680                    assert_eq!(counters.invalidated(), MAX_CAPACITY, "Retries exhausted");
5681                    assert_eq!(counters.value_dropped(), KEYS, "Retries exhausted");
5682                }
5683            }
5684
5685            assert_eq!(counters.inserted(), KEYS, "inserted");
5686            assert_eq!(counters.value_created(), KEYS, "value_created");
5687            assert_eq!(counters.evicted(), eviction_count, "evicted");
5688            assert_eq!(counters.invalidated(), MAX_CAPACITY, "invalidated");
5689            assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
5690
5691            break;
5692        }
5693
5694        std::mem::drop(cache);
5695        assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
5696    }
5697
5698    // https://github.com/moka-rs/moka/issues/383
5699    #[tokio::test]
5700    async fn ensure_gc_runs_when_dropping_cache() {
5701        let cache = Cache::builder().build();
5702        let val = Arc::new(0);
5703        cache
5704            .get_with(1, std::future::ready(Arc::clone(&val)))
5705            .await;
5706        drop(cache);
5707        assert_eq!(Arc::strong_count(&val), 1);
5708    }
5709
5710    #[tokio::test]
5711    async fn test_debug_format() {
5712        let cache = Cache::new(10);
5713        cache.insert('a', "alice").await;
5714        cache.insert('b', "bob").await;
5715        cache.insert('c', "cindy").await;
5716
5717        let debug_str = format!("{cache:?}");
5718        assert!(debug_str.starts_with('{'));
5719        assert!(debug_str.contains(r#"'a': "alice""#));
5720        assert!(debug_str.contains(r#"'b': "bob""#));
5721        assert!(debug_str.contains(r#"'c': "cindy""#));
5722        assert!(debug_str.ends_with('}'));
5723    }
5724
5725    type NotificationTuple<K, V> = (Arc<K>, V, RemovalCause);
5726
5727    async fn verify_notification_vec<K, V, S>(
5728        cache: &Cache<K, V, S>,
5729        actual: Arc<Mutex<Vec<NotificationTuple<K, V>>>>,
5730        expected: &[NotificationTuple<K, V>],
5731    ) where
5732        K: std::hash::Hash + Eq + std::fmt::Debug + Send + Sync + 'static,
5733        V: Eq + std::fmt::Debug + Clone + Send + Sync + 'static,
5734        S: std::hash::BuildHasher + Clone + Send + Sync + 'static,
5735    {
5736        // Retries will be needed when testing in a QEMU VM.
5737        const MAX_RETRIES: usize = 5;
5738        let mut retries = 0;
5739        loop {
5740            // Ensure all scheduled notifications have been processed.
5741            std::thread::sleep(Duration::from_millis(500));
5742
5743            let actual = &*actual.lock().await;
5744            if actual.len() != expected.len() {
5745                if retries <= MAX_RETRIES {
5746                    retries += 1;
5747                    cache.run_pending_tasks().await;
5748                    continue;
5749                } else {
5750                    assert_eq!(actual.len(), expected.len(), "Retries exhausted");
5751                }
5752            }
5753
5754            for (i, (actual, expected)) in actual.iter().zip(expected).enumerate() {
5755                assert_eq!(actual, expected, "expected[{i}]");
5756            }
5757
5758            break;
5759        }
5760    }
5761}