moka/future/
cache.rs

1use super::{
2    base_cache::BaseCache,
3    value_initializer::{InitResult, ValueInitializer},
4    CacheBuilder, CancelGuard, Iter, OwnedKeyEntrySelector, PredicateId, RefKeyEntrySelector,
5    WriteOp,
6};
7use crate::{
8    common::{concurrent::Weigher, time::Clock, HousekeeperConfig},
9    notification::AsyncEvictionListener,
10    ops::compute::{self, CompResult},
11    policy::{EvictionPolicy, ExpirationPolicy},
12    Entry, Policy, PredicateError,
13};
14
15#[cfg(feature = "unstable-debug-counters")]
16use crate::common::concurrent::debug_counters::CacheDebugStats;
17
18use std::{
19    borrow::Borrow,
20    collections::hash_map::RandomState,
21    fmt,
22    future::Future,
23    hash::{BuildHasher, Hash},
24    pin::Pin,
25    sync::Arc,
26};
27
28#[cfg(test)]
29use std::sync::atomic::{AtomicBool, Ordering};
30
31/// A thread-safe, futures-aware concurrent in-memory cache.
32///
33/// `Cache` supports full concurrency of retrievals and a high expected concurrency
34/// for updates. It utilizes a lock-free concurrent hash table as the central
35/// key-value storage. It performs a best-effort bounding of the map using an entry
36/// replacement algorithm to determine which entries to evict when the capacity is
37/// exceeded.
38///
39/// To use this cache, enable a crate feature called "future".
40///
41/// # Table of Contents
42///
43/// - [Example: `insert`, `get` and `invalidate`](#example-insert-get-and-invalidate)
44/// - [Avoiding to clone the value at `get`](#avoiding-to-clone-the-value-at-get)
45/// - [Sharing a cache across asynchronous tasks](#sharing-a-cache-across-asynchronous-tasks)
46///     - [No lock is needed](#no-lock-is-needed)
47/// - [Hashing Algorithm](#hashing-algorithm)
48/// - [Example: Size-based Eviction](#example-size-based-eviction)
49/// - [Example: Time-based Expirations](#example-time-based-expirations)
50///     - [Cache-level TTL and TTI policies](#cache-level-ttl-and-tti-policies)
51///     - [Per-entry expiration policy](#per-entry-expiration-policy)
52/// - [Example: Eviction Listener](#example-eviction-listener)
53///     - [You should avoid eviction listener to panic](#you-should-avoid-eviction-listener-to-panic)
54///
55/// # Example: `insert`, `get` and `invalidate`
56///
57/// Cache entries are manually added using [`insert`](#method.insert) of
58/// [`get_with`](#method.get_with) method, and are stored in the cache until either
59/// evicted or manually invalidated:
60///
61/// Here's an example of reading and updating a cache by using multiple asynchronous
62/// tasks with [Tokio][tokio-crate] runtime:
63///
64/// [tokio-crate]: https://crates.io/crates/tokio
65///
66///```rust
67/// // Cargo.toml
68/// //
69/// // [dependencies]
70/// // moka = { version = "0.12", features = ["future"] }
71/// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
72/// // futures-util = "0.3"
73///
74/// use moka::future::Cache;
75///
76/// #[tokio::main]
77/// async fn main() {
78///     const NUM_TASKS: usize = 16;
79///     const NUM_KEYS_PER_TASK: usize = 64;
80///
81///     fn value(n: usize) -> String {
82///         format!("value {n}")
83///     }
84///
85///     // Create a cache that can store up to 10,000 entries.
86///     let cache = Cache::new(10_000);
87///
88///     // Spawn async tasks and write to and read from the cache.
89///     let tasks: Vec<_> = (0..NUM_TASKS)
90///         .map(|i| {
91///             // To share the same cache across the async tasks, clone it.
92///             // This is a cheap operation.
93///             let my_cache = cache.clone();
94///             let start = i * NUM_KEYS_PER_TASK;
95///             let end = (i + 1) * NUM_KEYS_PER_TASK;
96///
97///             tokio::spawn(async move {
98///                 // Insert 64 entries. (NUM_KEYS_PER_TASK = 64)
99///                 for key in start..end {
100///                     my_cache.insert(key, value(key)).await;
101///                     // get() returns Option<String>, a clone of the stored value.
102///                     assert_eq!(my_cache.get(&key).await, Some(value(key)));
103///                 }
104///
105///                 // Invalidate every 4 element of the inserted entries.
106///                 for key in (start..end).step_by(4) {
107///                     my_cache.invalidate(&key).await;
108///                 }
109///             })
110///         })
111///         .collect();
112///
113///     // Wait for all tasks to complete.
114///     futures_util::future::join_all(tasks).await;
115///
116///     // Verify the result.
117///     for key in 0..(NUM_TASKS * NUM_KEYS_PER_TASK) {
118///         if key % 4 == 0 {
119///             assert_eq!(cache.get(&key).await, None);
120///         } else {
121///             assert_eq!(cache.get(&key).await, Some(value(key)));
122///         }
123///     }
124/// }
125/// ```
126///
127/// If you want to atomically initialize and insert a value when the key is not
128/// present, you might want to check other insertion methods
129/// [`get_with`](#method.get_with) and [`try_get_with`](#method.try_get_with).
130///
131/// # Avoiding to clone the value at `get`
132///
133/// The return type of `get` method is `Option<V>` instead of `Option<&V>`. Every
134/// time `get` is called for an existing key, it creates a clone of the stored value
135/// `V` and returns it. This is because the `Cache` allows concurrent updates from
136/// threads so a value stored in the cache can be dropped or replaced at any time by
137/// any other thread. `get` cannot return a reference `&V` as it is impossible to
138/// guarantee the value outlives the reference.
139///
140/// If you want to store values that will be expensive to clone, wrap them by
141/// `std::sync::Arc` before storing in a cache. [`Arc`][rustdoc-std-arc] is a
142/// thread-safe reference-counted pointer and its `clone()` method is cheap.
143///
144/// [rustdoc-std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html
145///
146/// # Sharing a cache across asynchronous tasks
147///
148/// To share a cache across async tasks (or OS threads), do one of the followings:
149///
150/// - Create a clone of the cache by calling its `clone` method and pass it to other
151///   task.
152/// - If you are using a web application framework such as Actix Web or Axum, you can
153///   store a cache in Actix Web's [`web::Data`][actix-web-data] or Axum's
154///   [shared state][axum-state-extractor], and access it from each request handler.
155/// - Wrap the cache by a `sync::OnceCell` or `sync::Lazy` from
156///   [once_cell][once-cell-crate] create, and set it to a `static` variable.
157///
158/// Cloning is a cheap operation for `Cache` as it only creates thread-safe
159/// reference-counted pointers to the internal data structures.
160///
161/// [once-cell-crate]: https://crates.io/crates/once_cell
162/// [actix-web-data]: https://docs.rs/actix-web/4.3.1/actix_web/web/struct.Data.html
163/// [axum-state-extractor]: https://docs.rs/axum/latest/axum/#sharing-state-with-handlers
164///
165/// ## No lock is needed
166///
167/// Don't wrap a `Cache` by a lock such as `Mutex` or `RwLock`. All methods provided
168/// by the `Cache` are considered thread-safe, and can be safely called by multiple
169/// async tasks at the same time. No lock is needed.
170///
171/// [once-cell-crate]: https://crates.io/crates/once_cell
172///
173/// # Hashing Algorithm
174///
175/// By default, `Cache` uses a hashing algorithm selected to provide resistance
176/// against HashDoS attacks. It will be the same one used by
177/// `std::collections::HashMap`, which is currently SipHash 1-3.
178///
179/// While SipHash's performance is very competitive for medium sized keys, other
180/// hashing algorithms will outperform it for small keys such as integers as well as
181/// large keys such as long strings. However those algorithms will typically not
182/// protect against attacks such as HashDoS.
183///
184/// The hashing algorithm can be replaced on a per-`Cache` basis using the
185/// [`build_with_hasher`][build-with-hasher-method] method of the `CacheBuilder`.
186/// Many alternative algorithms are available on crates.io, such as the
187/// [AHash][ahash-crate] crate.
188///
189/// [build-with-hasher-method]: ./struct.CacheBuilder.html#method.build_with_hasher
190/// [ahash-crate]: https://crates.io/crates/ahash
191///
192/// # Example: Size-based Eviction
193///
194/// ```rust
195/// // Cargo.toml
196/// //
197/// // [dependencies]
198/// // moka = { version = "0.12", features = ["future"] }
199/// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
200/// // futures-util = "0.3"
201///
202/// use moka::future::Cache;
203///
204/// #[tokio::main]
205/// async fn main() {
206///     // Evict based on the number of entries in the cache.
207///     let cache = Cache::builder()
208///         // Up to 10,000 entries.
209///         .max_capacity(10_000)
210///         // Create the cache.
211///         .build();
212///     cache.insert(1, "one".to_string()).await;
213///
214///     // Evict based on the byte length of strings in the cache.
215///     let cache = Cache::builder()
216///         // A weigher closure takes &K and &V and returns a u32
217///         // representing the relative size of the entry.
218///         .weigher(|_key, value: &String| -> u32 {
219///             value.len().try_into().unwrap_or(u32::MAX)
220///         })
221///         // This cache will hold up to 32MiB of values.
222///         .max_capacity(32 * 1024 * 1024)
223///         .build();
224///     cache.insert(2, "two".to_string()).await;
225/// }
226/// ```
227///
228/// If your cache should not grow beyond a certain size, use the `max_capacity`
229/// method of the [`CacheBuilder`][builder-struct] to set the upper bound. The cache
230/// will try to evict entries that have not been used recently or very often.
231///
232/// At the cache creation time, a weigher closure can be set by the `weigher` method
233/// of the `CacheBuilder`. A weigher closure takes `&K` and `&V` as the arguments and
234/// returns a `u32` representing the relative size of the entry:
235///
236/// - If the `weigher` is _not_ set, the cache will treat each entry has the same
237///   size of `1`. This means the cache will be bounded by the number of entries.
238/// - If the `weigher` is set, the cache will call the weigher to calculate the
239///   weighted size (relative size) on an entry. This means the cache will be bounded
240///   by the total weighted size of entries.
241///
242/// Note that weighted sizes are not used when making eviction selections.
243///
244/// [builder-struct]: ./struct.CacheBuilder.html
245///
246/// # Example: Time-based Expirations
247///
248/// ## Cache-level TTL and TTI policies
249///
250/// `Cache` supports the following cache-level expiration policies:
251///
252/// - **Time to live (TTL)**: A cached entry will be expired after the specified
253///   duration past from `insert`.
254/// - **Time to idle (TTI)**: A cached entry will be expired after the specified
255///   duration past from `get` or `insert`.
256///
257/// They are a cache-level expiration policies; all entries in the cache will have
258/// the same TTL and/or TTI durations. If you want to set different expiration
259/// durations for different entries, see the next section.
260///
261/// ```rust
262/// // Cargo.toml
263/// //
264/// // [dependencies]
265/// // moka = { version = "0.12", features = ["future"] }
266/// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
267/// // futures-util = "0.3"
268///
269/// use moka::future::Cache;
270/// use std::time::Duration;
271///
272/// #[tokio::main]
273/// async fn main() {
274///     let cache = Cache::builder()
275///         // Time to live (TTL): 30 minutes
276///         .time_to_live(Duration::from_secs(30 * 60))
277///         // Time to idle (TTI):  5 minutes
278///         .time_to_idle(Duration::from_secs( 5 * 60))
279///         // Create the cache.
280///         .build();
281///
282///     // This entry will expire after 5 minutes (TTI) if there is no get().
283///     cache.insert(0, "zero").await;
284///
285///     // This get() will extend the entry life for another 5 minutes.
286///     cache.get(&0);
287///
288///     // Even though we keep calling get(), the entry will expire
289///     // after 30 minutes (TTL) from the insert().
290/// }
291/// ```
292///
293/// ## Per-entry expiration policy
294///
295/// `Cache` supports per-entry expiration policy through the `Expiry` trait.
296///
297/// `Expiry` trait provides three callback methods:
298/// [`expire_after_create`][exp-create], [`expire_after_read`][exp-read] and
299/// [`expire_after_update`][exp-update]. When a cache entry is inserted, read or
300/// updated, one of these methods is called. These methods return an
301/// `Option<Duration>`, which is used as the expiration duration of the entry.
302///
303/// `Expiry` trait provides the default implementations of these methods, so you will
304/// implement only the methods you want to customize.
305///
306/// [exp-create]: ../trait.Expiry.html#method.expire_after_create
307/// [exp-read]: ../trait.Expiry.html#method.expire_after_read
308/// [exp-update]: ../trait.Expiry.html#method.expire_after_update
309///
310/// ```rust
311/// // Cargo.toml
312/// //
313/// // [dependencies]
314/// // moka = { version = "0.12", features = ["future"] }
315/// // tokio = { version = "1", features = ["rt-multi-thread", "macros", "time" ] }
316///
317/// use moka::{future::Cache, Expiry};
318/// use std::time::{Duration, Instant};
319///
320/// // In this example, we will create a `future::Cache` with `u32` as the key, and
321/// // `(Expiration, String)` as the value. `Expiration` is an enum to represent the
322/// // expiration of the value, and `String` is the application data of the value.
323///
324/// /// An enum to represent the expiration of a value.
325/// #[derive(Clone, Copy, Debug, Eq, PartialEq)]
326/// pub enum Expiration {
327///     /// The value never expires.
328///     Never,
329///     /// The value expires after a short time. (5 seconds in this example)
330///     AfterShortTime,
331///     /// The value expires after a long time. (15 seconds in this example)
332///     AfterLongTime,
333/// }
334///
335/// impl Expiration {
336///     /// Returns the duration of this expiration.
337///     pub fn as_duration(&self) -> Option<Duration> {
338///         match self {
339///             Expiration::Never => None,
340///             Expiration::AfterShortTime => Some(Duration::from_secs(5)),
341///             Expiration::AfterLongTime => Some(Duration::from_secs(15)),
342///         }
343///     }
344/// }
345///
346/// /// An expiry that implements `moka::Expiry` trait. `Expiry` trait provides the
347/// /// default implementations of three callback methods `expire_after_create`,
348/// /// `expire_after_read`, and `expire_after_update`.
349/// ///
350/// /// In this example, we only override the `expire_after_create` method.
351/// pub struct MyExpiry;
352///
353/// impl Expiry<u32, (Expiration, String)> for MyExpiry {
354///     /// Returns the duration of the expiration of the value that was just
355///     /// created.
356///     fn expire_after_create(
357///         &self,
358///         _key: &u32,
359///         value: &(Expiration, String),
360///         _current_time: Instant,
361///     ) -> Option<Duration> {
362///         let duration = value.0.as_duration();
363///         println!("MyExpiry: expire_after_create called with key {_key} and value {value:?}. Returning {duration:?}.");
364///         duration
365///     }
366/// }
367///
368/// #[tokio::main]
369/// async fn main() {
370///     // Create a `Cache<u32, (Expiration, String)>` with an expiry `MyExpiry` and
371///     // eviction listener.
372///     let expiry = MyExpiry;
373///
374///     let eviction_listener = |key, _value, cause| {
375///         println!("Evicted key {key}. Cause: {cause:?}");
376///     };
377///
378///     let cache = Cache::builder()
379///         .max_capacity(100)
380///         .expire_after(expiry)
381///         .eviction_listener(eviction_listener)
382///         .build();
383///
384///     // Insert some entries into the cache with different expirations.
385///     cache
386///         .get_with(0, async { (Expiration::AfterShortTime, "a".to_string()) })
387///         .await;
388///
389///     cache
390///         .get_with(1, async { (Expiration::AfterLongTime, "b".to_string()) })
391///         .await;
392///
393///     cache
394///         .get_with(2, async { (Expiration::Never, "c".to_string()) })
395///         .await;
396///
397///     // Verify that all the inserted entries exist.
398///     assert!(cache.contains_key(&0));
399///     assert!(cache.contains_key(&1));
400///     assert!(cache.contains_key(&2));
401///
402///     // Sleep for 6 seconds. Key 0 should expire.
403///     println!("\nSleeping for 6 seconds...\n");
404///     tokio::time::sleep(Duration::from_secs(6)).await;
405///     cache.run_pending_tasks().await;
406///     println!("Entry count: {}", cache.entry_count());
407///
408///     // Verify that key 0 has been evicted.
409///     assert!(!cache.contains_key(&0));
410///     assert!(cache.contains_key(&1));
411///     assert!(cache.contains_key(&2));
412///
413///     // Sleep for 10 more seconds. Key 1 should expire.
414///     println!("\nSleeping for 10 seconds...\n");
415///     tokio::time::sleep(Duration::from_secs(10)).await;
416///     cache.run_pending_tasks().await;
417///     println!("Entry count: {}", cache.entry_count());
418///
419///     // Verify that key 1 has been evicted.
420///     assert!(!cache.contains_key(&1));
421///     assert!(cache.contains_key(&2));
422///
423///     // Manually invalidate key 2.
424///     cache.invalidate(&2).await;
425///     assert!(!cache.contains_key(&2));
426///
427///     println!("\nSleeping for a second...\n");
428///     tokio::time::sleep(Duration::from_secs(1)).await;
429///     cache.run_pending_tasks().await;
430///     println!("Entry count: {}", cache.entry_count());
431///
432///     println!("\nDone!");
433/// }
434/// ```
435///
436/// # Example: Eviction Listener
437///
438/// A `Cache` can be configured with an eviction listener, a closure that is called
439/// every time there is a cache eviction. The listener takes three parameters: the
440/// key and value of the evicted entry, and the
441/// [`RemovalCause`](../notification/enum.RemovalCause.html) to indicate why the
442/// entry was evicted.
443///
444/// An eviction listener can be used to keep other data structures in sync with the
445/// cache, for example.
446///
447/// The following example demonstrates how to use an eviction listener with
448/// time-to-live expiration to manage the lifecycle of temporary files on a
449/// filesystem. The cache stores the paths of the files, and when one of them has
450/// expired, the eviction listener will be called with the path, so it can remove the
451/// file from the filesystem.
452///
453/// ```rust
454/// // Cargo.toml
455/// //
456/// // [dependencies]
457/// // anyhow = "1.0"
458/// // uuid = { version = "1.1", features = ["v4"] }
459/// // tokio = { version = "1.18", features = ["fs", "macros", "rt-multi-thread", "sync", "time"] }
460///
461/// use moka::{future::Cache, notification::ListenerFuture};
462/// // FutureExt trait provides the boxed method.
463/// use moka::future::FutureExt;
464///
465/// use anyhow::{anyhow, Context};
466/// use std::{
467///     io,
468///     path::{Path, PathBuf},
469///     sync::Arc,
470///     time::Duration,
471/// };
472/// use tokio::{fs, sync::RwLock};
473/// use uuid::Uuid;
474///
475/// /// The DataFileManager writes, reads and removes data files.
476/// struct DataFileManager {
477///     base_dir: PathBuf,
478///     file_count: usize,
479/// }
480///
481/// impl DataFileManager {
482///     fn new(base_dir: PathBuf) -> Self {
483///         Self {
484///             base_dir,
485///             file_count: 0,
486///         }
487///     }
488///
489///     async fn write_data_file(
490///         &mut self,
491///         key: impl AsRef<str>,
492///         contents: String
493///     ) -> io::Result<PathBuf> {
494///         // Use the key as a part of the filename.
495///         let mut path = self.base_dir.to_path_buf();
496///         path.push(key.as_ref());
497///
498///         assert!(!path.exists(), "Path already exists: {path:?}");
499///
500///         // create the file at the path and write the contents to the file.
501///         fs::write(&path, contents).await?;
502///         self.file_count += 1;
503///         println!("Created a data file at {path:?} (file count: {})", self.file_count);
504///         Ok(path)
505///     }
506///
507///     async fn read_data_file(&self, path: impl AsRef<Path>) -> io::Result<String> {
508///         // Reads the contents of the file at the path, and return the contents.
509///         fs::read_to_string(path).await
510///     }
511///
512///     async fn remove_data_file(&mut self, path: impl AsRef<Path>) -> io::Result<()> {
513///         // Remove the file at the path.
514///         fs::remove_file(path.as_ref()).await?;
515///         self.file_count -= 1;
516///         println!(
517///             "Removed a data file at {:?} (file count: {})",
518///             path.as_ref(),
519///             self.file_count
520///         );
521///
522///         Ok(())
523///     }
524/// }
525///
526/// #[tokio::main]
527/// async fn main() -> anyhow::Result<()> {
528///     // Create an instance of the DataFileManager and wrap it with
529///     // Arc<RwLock<_>> so it can be shared across threads.
530///     let mut base_dir = std::env::temp_dir();
531///     base_dir.push(Uuid::new_v4().as_hyphenated().to_string());
532///     println!("base_dir: {base_dir:?}");
533///     std::fs::create_dir(&base_dir)?;
534///
535///     let file_mgr = DataFileManager::new(base_dir);
536///     let file_mgr = Arc::new(RwLock::new(file_mgr));
537///
538///     let file_mgr1 = Arc::clone(&file_mgr);
539///     let rt = tokio::runtime::Handle::current();
540///
541///     // Create an eviction listener closure.
542///     let eviction_listener = move |k, v: PathBuf, cause| -> ListenerFuture {
543///         println!("\n== An entry has been evicted. k: {k:?}, v: {v:?}, cause: {cause:?}");
544///         let file_mgr2 = Arc::clone(&file_mgr1);
545///
546///         // Create a Future that removes the data file at the path `v`.
547///         async move {
548///             // Acquire the write lock of the DataFileManager.
549///             let mut mgr = file_mgr2.write().await;
550///             // Remove the data file. We must handle error cases here to
551///             // prevent the listener from panicking.
552///             if let Err(_e) = mgr.remove_data_file(v.as_path()).await {
553///                 eprintln!("Failed to remove a data file at {v:?}");
554///             }
555///         }
556///         // Convert the regular Future into ListenerFuture. This method is
557///         // provided by moka::future::FutureExt trait.
558///         .boxed()
559///     };
560///
561///     // Create the cache. Set time to live for two seconds and set the
562///     // eviction listener.
563///     let cache = Cache::builder()
564///         .max_capacity(100)
565///         .time_to_live(Duration::from_secs(2))
566///         .async_eviction_listener(eviction_listener)
567///         .build();
568///
569///     // Insert an entry to the cache.
570///     // This will create and write a data file for the key "user1", store the
571///     // path of the file to the cache, and return it.
572///     println!("== try_get_with()");
573///     let key = "user1";
574///     let path = cache
575///         .try_get_with(key, async {
576///             let mut mgr = file_mgr.write().await;
577///             let path = mgr
578///                 .write_data_file(key, "user data".into())
579///                 .await
580///                 .with_context(|| format!("Failed to create a data file"))?;
581///             Ok(path) as anyhow::Result<_>
582///         })
583///         .await
584///         .map_err(|e| anyhow!("{e}"))?;
585///
586///     // Read the data file at the path and print the contents.
587///     println!("\n== read_data_file()");
588///     {
589///         let mgr = file_mgr.read().await;
590///         let contents = mgr
591///             .read_data_file(path.as_path())
592///             .await
593///             .with_context(|| format!("Failed to read data from {path:?}"))?;
594///         println!("contents: {contents}");
595///     }
596///
597///     // Sleep for five seconds. While sleeping, the cache entry for key "user1"
598///     // will be expired and evicted, so the eviction listener will be called to
599///     // remove the file.
600///     tokio::time::sleep(Duration::from_secs(5)).await;
601///
602///     cache.run_pending_tasks();
603///
604///     Ok(())
605/// }
606/// ```
607///
608/// ## You should avoid eviction listener to panic
609///
610/// It is very important to make an eviction listener closure not to panic.
611/// Otherwise, the cache will stop calling the listener after a panic. This is an
612/// intended behavior because the cache cannot know whether it is memory safe or not
613/// to call the panicked listener again.
614///
615/// When a listener panics, the cache will swallow the panic and disable the
616/// listener. If you want to know when a listener panics and the reason of the panic,
617/// you can enable an optional `logging` feature of Moka and check error-level logs.
618///
619/// To enable the `logging`, do the followings:
620///
621/// 1. In `Cargo.toml`, add the crate feature `logging` for `moka`.
622/// 2. Set the logging level for `moka` to `error` or any lower levels (`warn`,
623///    `info`, ...):
624///     - If you are using the `env_logger` crate, you can achieve this by setting
625///       `RUST_LOG` environment variable to `moka=error`.
626/// 3. If you have more than one caches, you may want to set a distinct name for each
627///    cache by using cache builder's [`name`][builder-name-method] method. The name
628///    will appear in the log.
629///
630/// [builder-name-method]: ./struct.CacheBuilder.html#method.name
631///
632pub struct Cache<K, V, S = RandomState> {
633    pub(crate) base: BaseCache<K, V, S>,
634    value_initializer: Arc<ValueInitializer<K, V, S>>,
635
636    #[cfg(test)]
637    schedule_write_op_should_block: AtomicBool,
638}
639
640// TODO: https://github.com/moka-rs/moka/issues/54
641#[allow(clippy::non_send_fields_in_send_ty)]
642unsafe impl<K, V, S> Send for Cache<K, V, S>
643where
644    K: Send + Sync,
645    V: Send + Sync,
646    S: Send,
647{
648}
649
650unsafe impl<K, V, S> Sync for Cache<K, V, S>
651where
652    K: Send + Sync,
653    V: Send + Sync,
654    S: Sync,
655{
656}
657
658// NOTE: We cannot do `#[derive(Clone)]` because it will add `Clone` bound to `K`.
659impl<K, V, S> Clone for Cache<K, V, S> {
660    /// Makes a clone of this shared cache.
661    ///
662    /// This operation is cheap as it only creates thread-safe reference counted
663    /// pointers to the shared internal data structures.
664    fn clone(&self) -> Self {
665        Self {
666            base: self.base.clone(),
667            value_initializer: Arc::clone(&self.value_initializer),
668
669            #[cfg(test)]
670            schedule_write_op_should_block: AtomicBool::new(
671                self.schedule_write_op_should_block.load(Ordering::Acquire),
672            ),
673        }
674    }
675}
676
677impl<K, V, S> fmt::Debug for Cache<K, V, S>
678where
679    K: fmt::Debug + Eq + Hash + Send + Sync + 'static,
680    V: fmt::Debug + Clone + Send + Sync + 'static,
681    // TODO: Remove these bounds from S.
682    S: BuildHasher + Clone + Send + Sync + 'static,
683{
684    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
685        let mut d_map = f.debug_map();
686
687        for (k, v) in self {
688            d_map.entry(&k, &v);
689        }
690
691        d_map.finish()
692    }
693}
694
695impl<K, V, S> Cache<K, V, S> {
696    /// Returns cache’s name.
697    pub fn name(&self) -> Option<&str> {
698        self.base.name()
699    }
700
701    /// Returns a read-only cache policy of this cache.
702    ///
703    /// At this time, cache policy cannot be modified after cache creation.
704    /// A future version may support to modify it.
705    pub fn policy(&self) -> Policy {
706        self.base.policy()
707    }
708
709    /// Returns an approximate number of entries in this cache.
710    ///
711    /// The value returned is _an estimate_; the actual count may differ if there are
712    /// concurrent insertions or removals, or if some entries are pending removal due
713    /// to expiration. This inaccuracy can be mitigated by calling
714    /// `run_pending_tasks` first.
715    ///
716    /// # Example
717    ///
718    /// ```rust
719    /// // Cargo.toml
720    /// //
721    /// // [dependencies]
722    /// // moka = { version = "0.12", features = ["future"] }
723    /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
724    /// use moka::future::Cache;
725    ///
726    /// #[tokio::main]
727    /// async fn main() {
728    ///     let cache = Cache::new(10);
729    ///     cache.insert('n', "Netherland Dwarf").await;
730    ///     cache.insert('l', "Lop Eared").await;
731    ///     cache.insert('d', "Dutch").await;
732    ///
733    ///     // Ensure an entry exists.
734    ///     assert!(cache.contains_key(&'n'));
735    ///
736    ///     // However, followings may print stale number zeros instead of threes.
737    ///     println!("{}", cache.entry_count());   // -> 0
738    ///     println!("{}", cache.weighted_size()); // -> 0
739    ///
740    ///     // To mitigate the inaccuracy, call `run_pending_tasks` to run pending
741    ///     // internal tasks.
742    ///     cache.run_pending_tasks().await;
743    ///
744    ///     // Followings will print the actual numbers.
745    ///     println!("{}", cache.entry_count());   // -> 3
746    ///     println!("{}", cache.weighted_size()); // -> 3
747    /// }
748    /// ```
749    ///
750    pub fn entry_count(&self) -> u64 {
751        self.base.entry_count()
752    }
753
754    /// Returns an approximate total weighted size of entries in this cache.
755    ///
756    /// The value returned is _an estimate_; the actual size may differ if there are
757    /// concurrent insertions or removals, or if some entries are pending removal due
758    /// to expiration. This inaccuracy can be mitigated by calling
759    /// `run_pending_tasks` first. See [`entry_count`](#method.entry_count) for a
760    /// sample code.
761    pub fn weighted_size(&self) -> u64 {
762        self.base.weighted_size()
763    }
764
765    #[cfg(feature = "unstable-debug-counters")]
766    #[cfg_attr(docsrs, doc(cfg(feature = "unstable-debug-counters")))]
767    pub async fn debug_stats(&self) -> CacheDebugStats {
768        self.base.debug_stats().await
769    }
770}
771
772impl<K, V> Cache<K, V, RandomState>
773where
774    K: Hash + Eq + Send + Sync + 'static,
775    V: Clone + Send + Sync + 'static,
776{
777    /// Constructs a new `Cache<K, V>` that will store up to the `max_capacity`.
778    ///
779    /// To adjust various configuration knobs such as `initial_capacity` or
780    /// `time_to_live`, use the [`CacheBuilder`][builder-struct].
781    ///
782    /// [builder-struct]: ./struct.CacheBuilder.html
783    pub fn new(max_capacity: u64) -> Self {
784        let build_hasher = RandomState::default();
785        Self::with_everything(
786            None,
787            Some(max_capacity),
788            None,
789            build_hasher,
790            None,
791            EvictionPolicy::default(),
792            None,
793            ExpirationPolicy::default(),
794            HousekeeperConfig::default(),
795            false,
796            Clock::default(),
797        )
798    }
799
800    /// Returns a [`CacheBuilder`][builder-struct], which can builds a `Cache` with
801    /// various configuration knobs.
802    ///
803    /// [builder-struct]: ./struct.CacheBuilder.html
804    pub fn builder() -> CacheBuilder<K, V, Cache<K, V, RandomState>> {
805        CacheBuilder::default()
806    }
807}
808
809impl<K, V, S> Cache<K, V, S>
810where
811    K: Hash + Eq + Send + Sync + 'static,
812    V: Clone + Send + Sync + 'static,
813    S: BuildHasher + Clone + Send + Sync + 'static,
814{
815    // https://rust-lang.github.io/rust-clippy/master/index.html#too_many_arguments
816    #[allow(clippy::too_many_arguments)]
817    pub(crate) fn with_everything(
818        name: Option<String>,
819        max_capacity: Option<u64>,
820        initial_capacity: Option<usize>,
821        build_hasher: S,
822        weigher: Option<Weigher<K, V>>,
823        eviction_policy: EvictionPolicy,
824        eviction_listener: Option<AsyncEvictionListener<K, V>>,
825        expiration_policy: ExpirationPolicy<K, V>,
826        housekeeper_config: HousekeeperConfig,
827        invalidator_enabled: bool,
828        clock: Clock,
829    ) -> Self {
830        Self {
831            base: BaseCache::new(
832                name,
833                max_capacity,
834                initial_capacity,
835                build_hasher.clone(),
836                weigher,
837                eviction_policy,
838                eviction_listener,
839                expiration_policy,
840                housekeeper_config,
841                invalidator_enabled,
842                clock,
843            ),
844            value_initializer: Arc::new(ValueInitializer::with_hasher(build_hasher)),
845
846            #[cfg(test)]
847            schedule_write_op_should_block: Default::default(), // false
848        }
849    }
850
851    /// Returns `true` if the cache contains a value for the key.
852    ///
853    /// Unlike the `get` method, this method is not considered a cache read operation,
854    /// so it does not update the historic popularity estimator or reset the idle
855    /// timer for the key.
856    ///
857    /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
858    /// on the borrowed form _must_ match those for the key type.
859    pub fn contains_key<Q>(&self, key: &Q) -> bool
860    where
861        K: Borrow<Q>,
862        Q: Hash + Eq + ?Sized,
863    {
864        self.base.contains_key_with_hash(key, self.base.hash(key))
865    }
866
867    /// Returns a _clone_ of the value corresponding to the key.
868    ///
869    /// If you want to store values that will be expensive to clone, wrap them by
870    /// `std::sync::Arc` before storing in a cache. [`Arc`][rustdoc-std-arc] is a
871    /// thread-safe reference-counted pointer and its `clone()` method is cheap.
872    ///
873    /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
874    /// on the borrowed form _must_ match those for the key type.
875    ///
876    /// [rustdoc-std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html
877    pub async fn get<Q>(&self, key: &Q) -> Option<V>
878    where
879        K: Borrow<Q>,
880        Q: Hash + Eq + ?Sized,
881    {
882        let ignore_if = None as Option<&mut fn(&V) -> bool>;
883
884        self.base
885            .get_with_hash(key, self.base.hash(key), ignore_if, false, true)
886            .await
887            .map(Entry::into_value)
888    }
889
890    /// Takes a key `K` and returns an [`OwnedKeyEntrySelector`] that can be used to
891    /// select or insert an entry.
892    ///
893    /// [`OwnedKeyEntrySelector`]: ./struct.OwnedKeyEntrySelector.html
894    ///
895    /// # Example
896    ///
897    /// ```rust
898    /// // Cargo.toml
899    /// //
900    /// // [dependencies]
901    /// // moka = { version = "0.12", features = ["future"] }
902    /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
903    ///
904    /// use moka::future::Cache;
905    ///
906    /// #[tokio::main]
907    /// async fn main() {
908    ///     let cache: Cache<String, u32> = Cache::new(100);
909    ///     let key = "key1".to_string();
910    ///
911    ///     let entry = cache.entry(key.clone()).or_insert(3).await;
912    ///     assert!(entry.is_fresh());
913    ///     assert_eq!(entry.key(), &key);
914    ///     assert_eq!(entry.into_value(), 3);
915    ///
916    ///     let entry = cache.entry(key).or_insert(6).await;
917    ///     // Not fresh because the value was already in the cache.
918    ///     assert!(!entry.is_fresh());
919    ///     assert_eq!(entry.into_value(), 3);
920    /// }
921    /// ```
922    pub fn entry(&self, key: K) -> OwnedKeyEntrySelector<'_, K, V, S>
923    where
924        K: Hash + Eq,
925    {
926        let hash = self.base.hash(&key);
927        OwnedKeyEntrySelector::new(key, hash, self)
928    }
929
930    /// Takes a reference `&Q` of a key and returns an [`RefKeyEntrySelector`] that
931    /// can be used to select or insert an entry.
932    ///
933    /// [`RefKeyEntrySelector`]: ./struct.RefKeyEntrySelector.html
934    ///
935    /// # Example
936    ///
937    /// ```rust
938    /// // Cargo.toml
939    /// //
940    /// // [dependencies]
941    /// // moka = { version = "0.12", features = ["future"] }
942    /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
943    ///
944    /// use moka::future::Cache;
945    ///
946    /// #[tokio::main]
947    /// async fn main() {
948    ///     let cache: Cache<String, u32> = Cache::new(100);
949    ///     let key = "key1".to_string();
950    ///
951    ///     let entry = cache.entry_by_ref(&key).or_insert(3).await;
952    ///     assert!(entry.is_fresh());
953    ///     assert_eq!(entry.key(), &key);
954    ///     assert_eq!(entry.into_value(), 3);
955    ///
956    ///     let entry = cache.entry_by_ref(&key).or_insert(6).await;
957    ///     // Not fresh because the value was already in the cache.
958    ///     assert!(!entry.is_fresh());
959    ///     assert_eq!(entry.into_value(), 3);
960    /// }
961    /// ```
962    pub fn entry_by_ref<'a, Q>(&'a self, key: &'a Q) -> RefKeyEntrySelector<'a, K, Q, V, S>
963    where
964        K: Borrow<Q>,
965        Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
966    {
967        let hash = self.base.hash(key);
968        RefKeyEntrySelector::new(key, hash, self)
969    }
970
971    /// Returns a _clone_ of the value corresponding to the key. If the value does
972    /// not exist, resolve the `init` future and inserts the output.
973    ///
974    /// # Concurrent calls on the same key
975    ///
976    /// This method guarantees that concurrent calls on the same not-existing key are
977    /// coalesced into one evaluation of the `init` future. Only one of the calls
978    /// evaluates its future, and other calls wait for that future to resolve.
979    ///
980    /// The following code snippet demonstrates this behavior:
981    ///
982    /// ```rust
983    /// // Cargo.toml
984    /// //
985    /// // [dependencies]
986    /// // moka = { version = "0.12", features = ["future"] }
987    /// // futures-util = "0.3"
988    /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
989    /// use moka::future::Cache;
990    /// use std::sync::Arc;
991    ///
992    /// #[tokio::main]
993    /// async fn main() {
994    ///     const TEN_MIB: usize = 10 * 1024 * 1024; // 10MiB
995    ///     let cache = Cache::new(100);
996    ///
997    ///     // Spawn four async tasks.
998    ///     let tasks: Vec<_> = (0..4_u8)
999    ///         .map(|task_id| {
1000    ///             let my_cache = cache.clone();
1001    ///             tokio::spawn(async move {
1002    ///                 println!("Task {task_id} started.");
1003    ///
1004    ///                 // Insert and get the value for key1. Although all four async
1005    ///                 // tasks will call `get_with` at the same time, the `init`
1006    ///                 // async block must be resolved only once.
1007    ///                 let value = my_cache
1008    ///                     .get_with("key1", async move {
1009    ///                         println!("Task {task_id} inserting a value.");
1010    ///                         Arc::new(vec![0u8; TEN_MIB])
1011    ///                     })
1012    ///                     .await;
1013    ///
1014    ///                 // Ensure the value exists now.
1015    ///                 assert_eq!(value.len(), TEN_MIB);
1016    ///                 assert!(my_cache.get(&"key1").await.is_some());
1017    ///
1018    ///                 println!("Task {task_id} got the value. (len: {})", value.len());
1019    ///             })
1020    ///         })
1021    ///         .collect();
1022    ///
1023    ///     // Run all tasks concurrently and wait for them to complete.
1024    ///     futures_util::future::join_all(tasks).await;
1025    /// }
1026    /// ```
1027    ///
1028    /// **A Sample Result**
1029    ///
1030    /// - The `init` future (async black) was resolved exactly once by task 3.
1031    /// - Other tasks were blocked until task 3 inserted the value.
1032    ///
1033    /// ```console
1034    /// Task 0 started.
1035    /// Task 3 started.
1036    /// Task 1 started.
1037    /// Task 2 started.
1038    /// Task 3 inserting a value.
1039    /// Task 3 got the value. (len: 10485760)
1040    /// Task 0 got the value. (len: 10485760)
1041    /// Task 1 got the value. (len: 10485760)
1042    /// Task 2 got the value. (len: 10485760)
1043    /// ```
1044    ///
1045    /// # Panics
1046    ///
1047    /// This method panics when the `init` future has panicked. When it happens, only
1048    /// the caller whose `init` future panicked will get the panic (e.g. only task 3
1049    /// in the above sample). If there are other calls in progress (e.g. task 0, 1
1050    /// and 2 above), this method will restart and resolve one of the remaining
1051    /// `init` futures.
1052    ///
1053    pub async fn get_with(&self, key: K, init: impl Future<Output = V>) -> V {
1054        futures_util::pin_mut!(init);
1055        let hash = self.base.hash(&key);
1056        let key = Arc::new(key);
1057        let replace_if = None as Option<fn(&V) -> bool>;
1058        self.get_or_insert_with_hash_and_fun(key, hash, init, replace_if, false)
1059            .await
1060            .into_value()
1061    }
1062
1063    /// Similar to [`get_with`](#method.get_with), but instead of passing an owned
1064    /// key, you can pass a reference to the key. If the key does not exist in the
1065    /// cache, the key will be cloned to create new entry in the cache.
1066    pub async fn get_with_by_ref<Q>(&self, key: &Q, init: impl Future<Output = V>) -> V
1067    where
1068        K: Borrow<Q>,
1069        Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
1070    {
1071        futures_util::pin_mut!(init);
1072        let hash = self.base.hash(key);
1073        let replace_if = None as Option<fn(&V) -> bool>;
1074        self.get_or_insert_with_hash_by_ref_and_fun(key, hash, init, replace_if, false)
1075            .await
1076            .into_value()
1077    }
1078
1079    /// TODO: Remove this in v0.13.0.
1080    /// Deprecated, replaced with
1081    /// [`entry()::or_insert_with_if()`](./struct.OwnedKeyEntrySelector.html#method.or_insert_with_if)
1082    #[deprecated(since = "0.10.0", note = "Replaced with `entry().or_insert_with_if()`")]
1083    pub async fn get_with_if(
1084        &self,
1085        key: K,
1086        init: impl Future<Output = V>,
1087        replace_if: impl FnMut(&V) -> bool + Send,
1088    ) -> V {
1089        futures_util::pin_mut!(init);
1090        let hash = self.base.hash(&key);
1091        let key = Arc::new(key);
1092        self.get_or_insert_with_hash_and_fun(key, hash, init, Some(replace_if), false)
1093            .await
1094            .into_value()
1095    }
1096
1097    /// Returns a _clone_ of the value corresponding to the key. If the value does
1098    /// not exist, resolves the `init` future, and inserts the value if `Some(value)`
1099    /// was returned. If `None` was returned from the future, this method does not
1100    /// insert a value and returns `None`.
1101    ///
1102    /// # Concurrent calls on the same key
1103    ///
1104    /// This method guarantees that concurrent calls on the same not-existing key are
1105    /// coalesced into one evaluation of the `init` future. Only one of the calls
1106    /// evaluates its future, and other calls wait for that future to resolve.
1107    ///
1108    /// The following code snippet demonstrates this behavior:
1109    ///
1110    /// ```rust
1111    /// // Cargo.toml
1112    /// //
1113    /// // [dependencies]
1114    /// // moka = { version = "0.12", features = ["future"] }
1115    /// // futures-util = "0.3"
1116    /// // reqwest = "0.11"
1117    /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
1118    /// use moka::future::Cache;
1119    ///
1120    /// // This async function tries to get HTML from the given URI.
1121    /// async fn get_html(task_id: u8, uri: &str) -> Option<String> {
1122    ///     println!("get_html() called by task {task_id}.");
1123    ///     reqwest::get(uri).await.ok()?.text().await.ok()
1124    /// }
1125    ///
1126    /// #[tokio::main]
1127    /// async fn main() {
1128    ///     let cache = Cache::new(100);
1129    ///
1130    ///     // Spawn four async tasks.
1131    ///     let tasks: Vec<_> = (0..4_u8)
1132    ///         .map(|task_id| {
1133    ///             let my_cache = cache.clone();
1134    ///             tokio::spawn(async move {
1135    ///                 println!("Task {task_id} started.");
1136    ///
1137    ///                 // Try to insert and get the value for key1. Although
1138    ///                 // all four async tasks will call `try_get_with`
1139    ///                 // at the same time, get_html() must be called only once.
1140    ///                 let value = my_cache
1141    ///                     .optionally_get_with(
1142    ///                         "key1",
1143    ///                         get_html(task_id, "https://www.rust-lang.org"),
1144    ///                     ).await;
1145    ///
1146    ///                 // Ensure the value exists now.
1147    ///                 assert!(value.is_some());
1148    ///                 assert!(my_cache.get(&"key1").await.is_some());
1149    ///
1150    ///                 println!(
1151    ///                     "Task {task_id} got the value. (len: {})",
1152    ///                     value.unwrap().len()
1153    ///                 );
1154    ///             })
1155    ///         })
1156    ///         .collect();
1157    ///
1158    ///     // Run all tasks concurrently and wait for them to complete.
1159    ///     futures_util::future::join_all(tasks).await;
1160    /// }
1161    /// ```
1162    ///
1163    /// **A Sample Result**
1164    ///
1165    /// - `get_html()` was called exactly once by task 2.
1166    /// - Other tasks were blocked until task 2 inserted the value.
1167    ///
1168    /// ```console
1169    /// Task 1 started.
1170    /// Task 0 started.
1171    /// Task 2 started.
1172    /// Task 3 started.
1173    /// get_html() called by task 2.
1174    /// Task 2 got the value. (len: 19419)
1175    /// Task 1 got the value. (len: 19419)
1176    /// Task 0 got the value. (len: 19419)
1177    /// Task 3 got the value. (len: 19419)
1178    /// ```
1179    ///
1180    /// # Panics
1181    ///
1182    /// This method panics when the `init` future has panicked. When it happens, only
1183    /// the caller whose `init` future panicked will get the panic (e.g. only task 2
1184    /// in the above sample). If there are other calls in progress (e.g. task 0, 1
1185    /// and 3 above), this method will restart and resolve one of the remaining
1186    /// `init` futures.
1187    ///
1188    pub async fn optionally_get_with<F>(&self, key: K, init: F) -> Option<V>
1189    where
1190        F: Future<Output = Option<V>>,
1191    {
1192        futures_util::pin_mut!(init);
1193        let hash = self.base.hash(&key);
1194        let key = Arc::new(key);
1195        self.get_or_optionally_insert_with_hash_and_fun(key, hash, init, false)
1196            .await
1197            .map(Entry::into_value)
1198    }
1199
1200    /// Similar to [`optionally_get_with`](#method.optionally_get_with), but instead
1201    /// of passing an owned key, you can pass a reference to the key. If the key does
1202    /// not exist in the cache, the key will be cloned to create new entry in the
1203    /// cache.
1204    pub async fn optionally_get_with_by_ref<F, Q>(&self, key: &Q, init: F) -> Option<V>
1205    where
1206        F: Future<Output = Option<V>>,
1207        K: Borrow<Q>,
1208        Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
1209    {
1210        futures_util::pin_mut!(init);
1211        let hash = self.base.hash(key);
1212        self.get_or_optionally_insert_with_hash_by_ref_and_fun(key, hash, init, false)
1213            .await
1214            .map(Entry::into_value)
1215    }
1216
1217    /// Returns a _clone_ of the value corresponding to the key. If the value does
1218    /// not exist, resolves the `init` future, and inserts the value if `Ok(value)`
1219    /// was returned. If `Err(_)` was returned from the future, this method does not
1220    /// insert a value and returns the `Err` wrapped by [`std::sync::Arc`][std-arc].
1221    ///
1222    /// [std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html
1223    ///
1224    /// # Concurrent calls on the same key
1225    ///
1226    /// This method guarantees that concurrent calls on the same not-existing key are
1227    /// coalesced into one evaluation of the `init` future (as long as these
1228    /// futures return the same error type). Only one of the calls evaluates its
1229    /// future, and other calls wait for that future to resolve.
1230    ///
1231    /// The following code snippet demonstrates this behavior:
1232    ///
1233    /// ```rust
1234    /// // Cargo.toml
1235    /// //
1236    /// // [dependencies]
1237    /// // moka = { version = "0.12", features = ["future"] }
1238    /// // futures-util = "0.3"
1239    /// // reqwest = "0.11"
1240    /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
1241    /// use moka::future::Cache;
1242    ///
1243    /// // This async function tries to get HTML from the given URI.
1244    /// async fn get_html(task_id: u8, uri: &str) -> Result<String, reqwest::Error> {
1245    ///     println!("get_html() called by task {task_id}.");
1246    ///     reqwest::get(uri).await?.text().await
1247    /// }
1248    ///
1249    /// #[tokio::main]
1250    /// async fn main() {
1251    ///     let cache = Cache::new(100);
1252    ///
1253    ///     // Spawn four async tasks.
1254    ///     let tasks: Vec<_> = (0..4_u8)
1255    ///         .map(|task_id| {
1256    ///             let my_cache = cache.clone();
1257    ///             tokio::spawn(async move {
1258    ///                 println!("Task {task_id} started.");
1259    ///
1260    ///                 // Try to insert and get the value for key1. Although
1261    ///                 // all four async tasks will call `try_get_with`
1262    ///                 // at the same time, get_html() must be called only once.
1263    ///                 let value = my_cache
1264    ///                     .try_get_with(
1265    ///                         "key1",
1266    ///                         get_html(task_id, "https://www.rust-lang.org"),
1267    ///                     ).await;
1268    ///
1269    ///                 // Ensure the value exists now.
1270    ///                 assert!(value.is_ok());
1271    ///                 assert!(my_cache.get(&"key1").await.is_some());
1272    ///
1273    ///                 println!(
1274    ///                     "Task {task_id} got the value. (len: {})",
1275    ///                     value.unwrap().len()
1276    ///                 );
1277    ///             })
1278    ///         })
1279    ///         .collect();
1280    ///
1281    ///     // Run all tasks concurrently and wait for them to complete.
1282    ///     futures_util::future::join_all(tasks).await;
1283    /// }
1284    /// ```
1285    ///
1286    /// **A Sample Result**
1287    ///
1288    /// - `get_html()` was called exactly once by task 2.
1289    /// - Other tasks were blocked until task 2 inserted the value.
1290    ///
1291    /// ```console
1292    /// Task 1 started.
1293    /// Task 0 started.
1294    /// Task 2 started.
1295    /// Task 3 started.
1296    /// get_html() called by task 2.
1297    /// Task 2 got the value. (len: 19419)
1298    /// Task 1 got the value. (len: 19419)
1299    /// Task 0 got the value. (len: 19419)
1300    /// Task 3 got the value. (len: 19419)
1301    /// ```
1302    ///
1303    /// # Panics
1304    ///
1305    /// This method panics when the `init` future has panicked. When it happens, only
1306    /// the caller whose `init` future panicked will get the panic (e.g. only task 2
1307    /// in the above sample). If there are other calls in progress (e.g. task 0, 1
1308    /// and 3 above), this method will restart and resolve one of the remaining
1309    /// `init` futures.
1310    ///
1311    pub async fn try_get_with<F, E>(&self, key: K, init: F) -> Result<V, Arc<E>>
1312    where
1313        F: Future<Output = Result<V, E>>,
1314        E: Send + Sync + 'static,
1315    {
1316        futures_util::pin_mut!(init);
1317        let hash = self.base.hash(&key);
1318        let key = Arc::new(key);
1319        self.get_or_try_insert_with_hash_and_fun(key, hash, init, false)
1320            .await
1321            .map(Entry::into_value)
1322    }
1323
1324    /// Similar to [`try_get_with`](#method.try_get_with), but instead of passing an
1325    /// owned key, you can pass a reference to the key. If the key does not exist in
1326    /// the cache, the key will be cloned to create new entry in the cache.
1327    pub async fn try_get_with_by_ref<F, E, Q>(&self, key: &Q, init: F) -> Result<V, Arc<E>>
1328    where
1329        F: Future<Output = Result<V, E>>,
1330        E: Send + Sync + 'static,
1331        K: Borrow<Q>,
1332        Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
1333    {
1334        futures_util::pin_mut!(init);
1335        let hash = self.base.hash(key);
1336        self.get_or_try_insert_with_hash_by_ref_and_fun(key, hash, init, false)
1337            .await
1338            .map(Entry::into_value)
1339    }
1340
1341    /// Inserts a key-value pair into the cache.
1342    ///
1343    /// If the cache has this key present, the value is updated.
1344    pub async fn insert(&self, key: K, value: V) {
1345        let hash = self.base.hash(&key);
1346        let key = Arc::new(key);
1347        self.insert_with_hash(key, hash, value).await;
1348    }
1349
1350    /// Discards any cached value for the key.
1351    ///
1352    /// If you need to get the value that has been discarded, use the
1353    /// [`remove`](#method.remove) method instead.
1354    ///
1355    /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
1356    /// on the borrowed form _must_ match those for the key type.
1357    pub async fn invalidate<Q>(&self, key: &Q)
1358    where
1359        K: Borrow<Q>,
1360        Q: Hash + Eq + ?Sized,
1361    {
1362        let hash = self.base.hash(key);
1363        self.invalidate_with_hash(key, hash, false).await;
1364    }
1365
1366    /// Discards any cached value for the key and returns a _clone_ of the value.
1367    ///
1368    /// If you do not need to get the value that has been discarded, use the
1369    /// [`invalidate`](#method.invalidate) method instead.
1370    ///
1371    /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
1372    /// on the borrowed form _must_ match those for the key type.
1373    pub async fn remove<Q>(&self, key: &Q) -> Option<V>
1374    where
1375        K: Borrow<Q>,
1376        Q: Hash + Eq + ?Sized,
1377    {
1378        let hash = self.base.hash(key);
1379        self.invalidate_with_hash(key, hash, true).await
1380    }
1381
1382    /// Discards all cached values.
1383    ///
1384    /// This method returns immediately by just setting the current time as the
1385    /// invalidation time. `get` and other retrieval methods are guaranteed not to
1386    /// return the entries inserted before or at the invalidation time.
1387    ///
1388    /// The actual removal of the invalidated entries is done as a maintenance task
1389    /// driven by a user thread. For more details, see
1390    /// [the Maintenance Tasks section](../index.html#maintenance-tasks) in the crate
1391    /// level documentation.
1392    ///
1393    /// Like the `invalidate` method, this method does not clear the historic
1394    /// popularity estimator of keys so that it retains the client activities of
1395    /// trying to retrieve an item.
1396    pub fn invalidate_all(&self) {
1397        self.base.invalidate_all();
1398    }
1399
1400    /// Discards cached values that satisfy a predicate.
1401    ///
1402    /// `invalidate_entries_if` takes a closure that returns `true` or `false`. The
1403    /// closure is called against each cached entry inserted before or at the time
1404    /// when this method was called. If the closure returns `true` that entry will be
1405    /// evicted from the cache.
1406    ///
1407    /// This method returns immediately by not actually removing the invalidated
1408    /// entries. Instead, it just sets the predicate to the cache with the time when
1409    /// this method was called. The actual removal of the invalidated entries is done
1410    /// as a maintenance task driven by a user thread. For more details, see
1411    /// [the Maintenance Tasks section](../index.html#maintenance-tasks) in the crate
1412    /// level documentation.
1413    ///
1414    /// Also the `get` and other retrieval methods will apply the closure to a cached
1415    /// entry to determine if it should have been invalidated. Therefore, it is
1416    /// guaranteed that these methods must not return invalidated values.
1417    ///
1418    /// Note that you must call
1419    /// [`CacheBuilder::support_invalidation_closures`][support-invalidation-closures]
1420    /// at the cache creation time as the cache needs to maintain additional internal
1421    /// data structures to support this method. Otherwise, calling this method will
1422    /// fail with a
1423    /// [`PredicateError::InvalidationClosuresDisabled`][invalidation-disabled-error].
1424    ///
1425    /// Like the `invalidate` method, this method does not clear the historic
1426    /// popularity estimator of keys so that it retains the client activities of
1427    /// trying to retrieve an item.
1428    ///
1429    /// [support-invalidation-closures]:
1430    ///     ./struct.CacheBuilder.html#method.support_invalidation_closures
1431    /// [invalidation-disabled-error]:
1432    ///     ../enum.PredicateError.html#variant.InvalidationClosuresDisabled
1433    pub fn invalidate_entries_if<F>(&self, predicate: F) -> Result<PredicateId, PredicateError>
1434    where
1435        F: Fn(&K, &V) -> bool + Send + Sync + 'static,
1436    {
1437        self.base.invalidate_entries_if(Arc::new(predicate))
1438    }
1439
1440    /// Creates an iterator visiting all key-value pairs in arbitrary order. The
1441    /// iterator element type is `(Arc<K>, V)`, where `V` is a clone of a stored
1442    /// value.
1443    ///
1444    /// Iterators do not block concurrent reads and writes on the cache. An entry can
1445    /// be inserted to, invalidated or evicted from a cache while iterators are alive
1446    /// on the same cache.
1447    ///
1448    /// Unlike the `get` method, visiting entries via an iterator do not update the
1449    /// historic popularity estimator or reset idle timers for keys.
1450    ///
1451    /// # Guarantees
1452    ///
1453    /// In order to allow concurrent access to the cache, iterator's `next` method
1454    /// does _not_ guarantee the following:
1455    ///
1456    /// - It does not guarantee to return a key-value pair (an entry) if its key has
1457    ///   been inserted to the cache _after_ the iterator was created.
1458    ///   - Such an entry may or may not be returned depending on key's hash and
1459    ///     timing.
1460    ///
1461    /// and the `next` method guarantees the followings:
1462    ///
1463    /// - It guarantees not to return the same entry more than once.
1464    /// - It guarantees not to return an entry if it has been removed from the cache
1465    ///   after the iterator was created.
1466    ///     - Note: An entry can be removed by following reasons:
1467    ///         - Manually invalidated.
1468    ///         - Expired (e.g. time-to-live).
1469    ///         - Evicted as the cache capacity exceeded.
1470    ///
1471    /// # Examples
1472    ///
1473    /// ```rust
1474    /// // Cargo.toml
1475    /// //
1476    /// // [dependencies]
1477    /// // moka = { version = "0.12", features = ["future"] }
1478    /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
1479    /// use moka::future::Cache;
1480    ///
1481    /// #[tokio::main]
1482    /// async fn main() {
1483    ///     let cache = Cache::new(100);
1484    ///     cache.insert("Julia", 14).await;
1485    ///
1486    ///     let mut iter = cache.iter();
1487    ///     let (k, v) = iter.next().unwrap(); // (Arc<K>, V)
1488    ///     assert_eq!(*k, "Julia");
1489    ///     assert_eq!(v, 14);
1490    ///
1491    ///     assert!(iter.next().is_none());
1492    /// }
1493    /// ```
1494    ///
1495    pub fn iter(&self) -> Iter<'_, K, V> {
1496        use crate::sync_base::iter::{Iter as InnerIter, ScanningGet};
1497
1498        let inner = InnerIter::with_single_cache_segment(&self.base, self.base.num_cht_segments());
1499        Iter::new(inner)
1500    }
1501
1502    /// Performs any pending maintenance operations needed by the cache.
1503    pub async fn run_pending_tasks(&self) {
1504        if let Some(hk) = &self.base.housekeeper {
1505            self.base.retry_interrupted_ops().await;
1506            hk.run_pending_tasks(Arc::clone(&self.base.inner)).await;
1507        }
1508    }
1509}
1510
1511impl<'a, K, V, S> IntoIterator for &'a Cache<K, V, S>
1512where
1513    K: Hash + Eq + Send + Sync + 'static,
1514    V: Clone + Send + Sync + 'static,
1515    S: BuildHasher + Clone + Send + Sync + 'static,
1516{
1517    type Item = (Arc<K>, V);
1518
1519    type IntoIter = Iter<'a, K, V>;
1520
1521    fn into_iter(self) -> Self::IntoIter {
1522        self.iter()
1523    }
1524}
1525
1526//
1527// private methods
1528//
1529impl<K, V, S> Cache<K, V, S>
1530where
1531    K: Hash + Eq + Send + Sync + 'static,
1532    V: Clone + Send + Sync + 'static,
1533    S: BuildHasher + Clone + Send + Sync + 'static,
1534{
1535    pub(crate) async fn get_or_insert_with_hash_and_fun(
1536        &self,
1537        key: Arc<K>,
1538        hash: u64,
1539        init: Pin<&mut impl Future<Output = V>>,
1540        mut replace_if: Option<impl FnMut(&V) -> bool + Send>,
1541        need_key: bool,
1542    ) -> Entry<K, V> {
1543        let maybe_entry = self
1544            .base
1545            .get_with_hash(&key, hash, replace_if.as_mut(), need_key, true)
1546            .await;
1547        if let Some(entry) = maybe_entry {
1548            entry
1549        } else {
1550            self.insert_with_hash_and_fun(key, hash, init, replace_if, need_key)
1551                .await
1552        }
1553    }
1554
1555    pub(crate) async fn get_or_insert_with_hash_by_ref_and_fun<Q>(
1556        &self,
1557        key: &Q,
1558        hash: u64,
1559        init: Pin<&mut impl Future<Output = V>>,
1560        mut replace_if: Option<impl FnMut(&V) -> bool + Send>,
1561        need_key: bool,
1562    ) -> Entry<K, V>
1563    where
1564        K: Borrow<Q>,
1565        Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
1566    {
1567        let maybe_entry = self
1568            .base
1569            .get_with_hash(key, hash, replace_if.as_mut(), need_key, true)
1570            .await;
1571        if let Some(entry) = maybe_entry {
1572            entry
1573        } else {
1574            let key = Arc::new(key.to_owned());
1575            self.insert_with_hash_and_fun(key, hash, init, replace_if, need_key)
1576                .await
1577        }
1578    }
1579
1580    async fn insert_with_hash_and_fun(
1581        &self,
1582        key: Arc<K>,
1583        hash: u64,
1584        init: Pin<&mut impl Future<Output = V>>,
1585        replace_if: Option<impl FnMut(&V) -> bool + Send>,
1586        need_key: bool,
1587    ) -> Entry<K, V> {
1588        let k = if need_key {
1589            Some(Arc::clone(&key))
1590        } else {
1591            None
1592        };
1593
1594        let type_id = ValueInitializer::<K, V, S>::type_id_for_get_with();
1595        let post_init = ValueInitializer::<K, V, S>::post_init_for_get_with;
1596
1597        match self
1598            .value_initializer
1599            .try_init_or_read(&key, hash, type_id, self, replace_if, init, post_init)
1600            .await
1601        {
1602            InitResult::Initialized(v) => {
1603                crossbeam_epoch::pin().flush();
1604                Entry::new(k, v, true, false)
1605            }
1606            InitResult::ReadExisting(v) => Entry::new(k, v, false, false),
1607            InitResult::InitErr(_) => unreachable!(),
1608        }
1609    }
1610
1611    pub(crate) async fn get_or_insert_with_hash(
1612        &self,
1613        key: Arc<K>,
1614        hash: u64,
1615        init: impl FnOnce() -> V,
1616    ) -> Entry<K, V> {
1617        match self
1618            .base
1619            .get_with_hash(&key, hash, never_ignore(), true, true)
1620            .await
1621        {
1622            Some(entry) => entry,
1623            None => {
1624                let value = init();
1625                self.insert_with_hash(Arc::clone(&key), hash, value.clone())
1626                    .await;
1627                Entry::new(Some(key), value, true, false)
1628            }
1629        }
1630    }
1631
1632    pub(crate) async fn get_or_insert_with_hash_by_ref<Q>(
1633        &self,
1634        key: &Q,
1635        hash: u64,
1636        init: impl FnOnce() -> V,
1637    ) -> Entry<K, V>
1638    where
1639        K: Borrow<Q>,
1640        Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
1641    {
1642        match self
1643            .base
1644            .get_with_hash(key, hash, never_ignore(), true, true)
1645            .await
1646        {
1647            Some(entry) => entry,
1648            None => {
1649                let key = Arc::new(key.to_owned());
1650                let value = init();
1651                self.insert_with_hash(Arc::clone(&key), hash, value.clone())
1652                    .await;
1653                Entry::new(Some(key), value, true, false)
1654            }
1655        }
1656    }
1657
1658    pub(crate) async fn get_or_optionally_insert_with_hash_and_fun<F>(
1659        &self,
1660        key: Arc<K>,
1661        hash: u64,
1662        init: Pin<&mut F>,
1663        need_key: bool,
1664    ) -> Option<Entry<K, V>>
1665    where
1666        F: Future<Output = Option<V>>,
1667    {
1668        let entry = self
1669            .base
1670            .get_with_hash(&key, hash, never_ignore(), need_key, true)
1671            .await;
1672        if entry.is_some() {
1673            return entry;
1674        }
1675
1676        self.optionally_insert_with_hash_and_fun(key, hash, init, need_key)
1677            .await
1678    }
1679
1680    pub(crate) async fn get_or_optionally_insert_with_hash_by_ref_and_fun<F, Q>(
1681        &self,
1682        key: &Q,
1683        hash: u64,
1684        init: Pin<&mut F>,
1685        need_key: bool,
1686    ) -> Option<Entry<K, V>>
1687    where
1688        F: Future<Output = Option<V>>,
1689        K: Borrow<Q>,
1690        Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
1691    {
1692        let entry = self
1693            .base
1694            .get_with_hash(key, hash, never_ignore(), need_key, true)
1695            .await;
1696        if entry.is_some() {
1697            return entry;
1698        }
1699
1700        let key = Arc::new(key.to_owned());
1701        self.optionally_insert_with_hash_and_fun(key, hash, init, need_key)
1702            .await
1703    }
1704
1705    async fn optionally_insert_with_hash_and_fun<F>(
1706        &self,
1707        key: Arc<K>,
1708        hash: u64,
1709        init: Pin<&mut F>,
1710        need_key: bool,
1711    ) -> Option<Entry<K, V>>
1712    where
1713        F: Future<Output = Option<V>>,
1714    {
1715        let k = if need_key {
1716            Some(Arc::clone(&key))
1717        } else {
1718            None
1719        };
1720
1721        let type_id = ValueInitializer::<K, V, S>::type_id_for_optionally_get_with();
1722        let post_init = ValueInitializer::<K, V, S>::post_init_for_optionally_get_with;
1723
1724        match self
1725            .value_initializer
1726            .try_init_or_read(&key, hash, type_id, self, never_ignore(), init, post_init)
1727            .await
1728        {
1729            InitResult::Initialized(v) => {
1730                crossbeam_epoch::pin().flush();
1731                Some(Entry::new(k, v, true, false))
1732            }
1733            InitResult::ReadExisting(v) => Some(Entry::new(k, v, false, false)),
1734            InitResult::InitErr(_) => None,
1735        }
1736    }
1737
1738    pub(super) async fn get_or_try_insert_with_hash_and_fun<F, E>(
1739        &self,
1740        key: Arc<K>,
1741        hash: u64,
1742        init: Pin<&mut F>,
1743        need_key: bool,
1744    ) -> Result<Entry<K, V>, Arc<E>>
1745    where
1746        F: Future<Output = Result<V, E>>,
1747        E: Send + Sync + 'static,
1748    {
1749        if let Some(entry) = self
1750            .base
1751            .get_with_hash(&key, hash, never_ignore(), need_key, true)
1752            .await
1753        {
1754            return Ok(entry);
1755        }
1756
1757        self.try_insert_with_hash_and_fun(key, hash, init, need_key)
1758            .await
1759    }
1760
1761    pub(super) async fn get_or_try_insert_with_hash_by_ref_and_fun<F, E, Q>(
1762        &self,
1763        key: &Q,
1764        hash: u64,
1765        init: Pin<&mut F>,
1766        need_key: bool,
1767    ) -> Result<Entry<K, V>, Arc<E>>
1768    where
1769        F: Future<Output = Result<V, E>>,
1770        E: Send + Sync + 'static,
1771        K: Borrow<Q>,
1772        Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
1773    {
1774        if let Some(entry) = self
1775            .base
1776            .get_with_hash(key, hash, never_ignore(), need_key, true)
1777            .await
1778        {
1779            return Ok(entry);
1780        }
1781        let key = Arc::new(key.to_owned());
1782        self.try_insert_with_hash_and_fun(key, hash, init, need_key)
1783            .await
1784    }
1785
1786    async fn try_insert_with_hash_and_fun<F, E>(
1787        &self,
1788        key: Arc<K>,
1789        hash: u64,
1790        init: Pin<&mut F>,
1791        need_key: bool,
1792    ) -> Result<Entry<K, V>, Arc<E>>
1793    where
1794        F: Future<Output = Result<V, E>>,
1795        E: Send + Sync + 'static,
1796    {
1797        let k = if need_key {
1798            Some(Arc::clone(&key))
1799        } else {
1800            None
1801        };
1802
1803        let type_id = ValueInitializer::<K, V, S>::type_id_for_try_get_with::<E>();
1804        let post_init = ValueInitializer::<K, V, S>::post_init_for_try_get_with;
1805
1806        match self
1807            .value_initializer
1808            .try_init_or_read(&key, hash, type_id, self, never_ignore(), init, post_init)
1809            .await
1810        {
1811            InitResult::Initialized(v) => {
1812                crossbeam_epoch::pin().flush();
1813                Ok(Entry::new(k, v, true, false))
1814            }
1815            InitResult::ReadExisting(v) => Ok(Entry::new(k, v, false, false)),
1816            InitResult::InitErr(e) => {
1817                crossbeam_epoch::pin().flush();
1818                Err(e)
1819            }
1820        }
1821    }
1822
1823    pub(crate) async fn insert_with_hash(&self, key: Arc<K>, hash: u64, value: V) {
1824        if self.base.is_map_disabled() {
1825            return;
1826        }
1827
1828        let (op, ts) = self.base.do_insert_with_hash(key, hash, value).await;
1829        let mut cancel_guard = CancelGuard::new(&self.base.interrupted_op_ch_snd, ts);
1830        cancel_guard.set_op(op.clone());
1831
1832        let should_block;
1833        #[cfg(not(test))]
1834        {
1835            should_block = false;
1836        }
1837        #[cfg(test)]
1838        {
1839            should_block = self.schedule_write_op_should_block.load(Ordering::Acquire);
1840        }
1841
1842        let hk = self.base.housekeeper.as_ref();
1843        let event = self.base.write_op_ch_ready_event();
1844
1845        BaseCache::<K, V, S>::schedule_write_op(
1846            &self.base.inner,
1847            &self.base.write_op_ch,
1848            event,
1849            op,
1850            ts,
1851            hk,
1852            should_block,
1853        )
1854        .await
1855        .expect("Failed to schedule write op for insert");
1856        cancel_guard.clear();
1857    }
1858
1859    pub(crate) async fn compute_with_hash_and_fun<F, Fut>(
1860        &self,
1861        key: Arc<K>,
1862        hash: u64,
1863        f: F,
1864    ) -> compute::CompResult<K, V>
1865    where
1866        F: FnOnce(Option<Entry<K, V>>) -> Fut,
1867        Fut: Future<Output = compute::Op<V>>,
1868    {
1869        let post_init = ValueInitializer::<K, V, S>::post_init_for_compute_with;
1870        match self
1871            .value_initializer
1872            .try_compute(key, hash, self, f, post_init, true)
1873            .await
1874        {
1875            Ok(result) => result,
1876            Err(_) => unreachable!(),
1877        }
1878    }
1879
1880    pub(crate) async fn try_compute_with_hash_and_fun<F, Fut, E>(
1881        &self,
1882        key: Arc<K>,
1883        hash: u64,
1884        f: F,
1885    ) -> Result<compute::CompResult<K, V>, E>
1886    where
1887        F: FnOnce(Option<Entry<K, V>>) -> Fut,
1888        Fut: Future<Output = Result<compute::Op<V>, E>>,
1889        E: Send + Sync + 'static,
1890    {
1891        let post_init = ValueInitializer::<K, V, S>::post_init_for_try_compute_with;
1892        self.value_initializer
1893            .try_compute(key, hash, self, f, post_init, true)
1894            .await
1895    }
1896
1897    pub(crate) async fn try_compute_if_nobody_else_with_hash_and_fun<F, Fut, E>(
1898        &self,
1899        key: Arc<K>,
1900        hash: u64,
1901        f: F,
1902    ) -> Result<compute::CompResult<K, V>, E>
1903    where
1904        F: FnOnce(Option<Entry<K, V>>) -> Fut,
1905        Fut: Future<Output = Result<compute::Op<V>, E>>,
1906        E: Send + Sync + 'static,
1907    {
1908        let post_init = ValueInitializer::<K, V, S>::post_init_for_try_compute_with_if_nobody_else;
1909        self.value_initializer
1910            .try_compute_if_nobody_else(key, hash, self, f, post_init, true)
1911            .await
1912    }
1913
1914    pub(crate) async fn upsert_with_hash_and_fun<F, Fut>(
1915        &self,
1916        key: Arc<K>,
1917        hash: u64,
1918        f: F,
1919    ) -> Entry<K, V>
1920    where
1921        F: FnOnce(Option<Entry<K, V>>) -> Fut,
1922        Fut: Future<Output = V>,
1923    {
1924        let post_init = ValueInitializer::<K, V, S>::post_init_for_upsert_with;
1925        match self
1926            .value_initializer
1927            .try_compute(key, hash, self, f, post_init, false)
1928            .await
1929        {
1930            Ok(CompResult::Inserted(entry) | CompResult::ReplacedWith(entry)) => entry,
1931            _ => unreachable!(),
1932        }
1933    }
1934
1935    pub(crate) async fn invalidate_with_hash<Q>(
1936        &self,
1937        key: &Q,
1938        hash: u64,
1939        need_value: bool,
1940    ) -> Option<V>
1941    where
1942        K: Borrow<Q>,
1943        Q: Hash + Eq + ?Sized,
1944    {
1945        use futures_util::FutureExt;
1946
1947        self.base.retry_interrupted_ops().await;
1948
1949        // Lock the key for removal if blocking removal notification is enabled.
1950        let mut kl = None;
1951        let mut klg = None;
1952        if self.base.is_removal_notifier_enabled() {
1953            // To lock the key, we have to get Arc<K> for key (&Q).
1954            //
1955            // TODO: Enhance this if possible. This is rather hack now because
1956            // it cannot prevent race conditions like this:
1957            //
1958            // 1. We miss the key because it does not exist. So we do not lock
1959            //    the key.
1960            // 2. Somebody else (other thread) inserts the key.
1961            // 3. We remove the entry for the key, but without the key lock!
1962            //
1963            if let Some(arc_key) = self.base.get_key_with_hash(key, hash) {
1964                kl = self.base.maybe_key_lock(&arc_key);
1965                klg = if let Some(lock) = &kl {
1966                    Some(lock.lock().await)
1967                } else {
1968                    None
1969                };
1970            }
1971        }
1972
1973        match self.base.remove_entry(key, hash) {
1974            None => None,
1975            Some(kv) => {
1976                let now = self.base.current_time();
1977
1978                let maybe_v = if need_value {
1979                    Some(kv.entry.value.clone())
1980                } else {
1981                    None
1982                };
1983
1984                let info = kv.entry.entry_info();
1985                let entry_gen = info.incr_entry_gen();
1986
1987                let op: WriteOp<K, V> = WriteOp::Remove {
1988                    kv_entry: kv.clone(),
1989                    entry_gen,
1990                };
1991
1992                // Async Cancellation Safety: To ensure the below future should be
1993                // executed even if our caller async task is cancelled, we create a
1994                // cancel guard for the future (and the op). If our caller is
1995                // cancelled while we are awaiting for the future, the cancel guard
1996                // will save the future and the op to the interrupted_op_ch channel,
1997                // so that we can resume/retry later.
1998                let mut cancel_guard = CancelGuard::new(&self.base.interrupted_op_ch_snd, now);
1999
2000                if self.base.is_removal_notifier_enabled() {
2001                    let future = self
2002                        .base
2003                        .notify_invalidate(&kv.key, &kv.entry)
2004                        .boxed()
2005                        .shared();
2006                    cancel_guard.set_future_and_op(future.clone(), op.clone());
2007                    // Send notification to the eviction listener.
2008                    future.await;
2009                    cancel_guard.unset_future();
2010                } else {
2011                    cancel_guard.set_op(op.clone());
2012                }
2013
2014                // Drop the locks before scheduling write op to avoid a potential
2015                // dead lock. (Scheduling write can do spin lock when the queue is
2016                // full, and queue will be drained by the housekeeping thread that
2017                // can lock the same key)
2018                std::mem::drop(klg);
2019                std::mem::drop(kl);
2020
2021                let should_block;
2022                #[cfg(not(test))]
2023                {
2024                    should_block = false;
2025                }
2026                #[cfg(test)]
2027                {
2028                    should_block = self.schedule_write_op_should_block.load(Ordering::Acquire);
2029                }
2030
2031                let event = self.base.write_op_ch_ready_event();
2032                let hk = self.base.housekeeper.as_ref();
2033
2034                BaseCache::<K, V, S>::schedule_write_op(
2035                    &self.base.inner,
2036                    &self.base.write_op_ch,
2037                    event,
2038                    op,
2039                    now,
2040                    hk,
2041                    should_block,
2042                )
2043                .await
2044                .expect("Failed to schedule write op for remove");
2045                cancel_guard.clear();
2046
2047                crossbeam_epoch::pin().flush();
2048                maybe_v
2049            }
2050        }
2051    }
2052}
2053
2054// For unit tests.
2055// For unit tests.
2056#[cfg(test)]
2057impl<K, V, S> Cache<K, V, S> {
2058    pub(crate) fn is_table_empty(&self) -> bool {
2059        self.entry_count() == 0
2060    }
2061
2062    pub(crate) fn is_waiter_map_empty(&self) -> bool {
2063        self.value_initializer.waiter_count() == 0
2064    }
2065}
2066
2067#[cfg(test)]
2068impl<K, V, S> Cache<K, V, S>
2069where
2070    K: Hash + Eq + Send + Sync + 'static,
2071    V: Clone + Send + Sync + 'static,
2072    S: BuildHasher + Clone + Send + Sync + 'static,
2073{
2074    fn invalidation_predicate_count(&self) -> usize {
2075        self.base.invalidation_predicate_count()
2076    }
2077
2078    async fn reconfigure_for_testing(&mut self) {
2079        self.base.reconfigure_for_testing().await;
2080    }
2081
2082    fn key_locks_map_is_empty(&self) -> bool {
2083        self.base.key_locks_map_is_empty()
2084    }
2085
2086    fn run_pending_tasks_initiation_count(&self) -> usize {
2087        self.base
2088            .housekeeper
2089            .as_ref()
2090            .map(|hk| hk.start_count.load(Ordering::Acquire))
2091            .expect("housekeeper is not set")
2092    }
2093
2094    fn run_pending_tasks_completion_count(&self) -> usize {
2095        self.base
2096            .housekeeper
2097            .as_ref()
2098            .map(|hk| hk.complete_count.load(Ordering::Acquire))
2099            .expect("housekeeper is not set")
2100    }
2101}
2102
2103// AS of Rust 1.71, we cannot make this function into a `const fn` because mutable
2104// references are not allowed.
2105// See [#57349](https://github.com/rust-lang/rust/issues/57349).
2106#[inline]
2107fn never_ignore<'a, V>() -> Option<&'a mut fn(&V) -> bool> {
2108    None
2109}
2110
2111// To see the debug prints, run test as `cargo test -- --nocapture`
2112#[cfg(test)]
2113mod tests {
2114    use super::Cache;
2115    use crate::{
2116        common::{time::Clock, HousekeeperConfig},
2117        future::FutureExt,
2118        notification::{ListenerFuture, RemovalCause},
2119        ops::compute,
2120        policy::{test_utils::ExpiryCallCounters, EvictionPolicy},
2121        Expiry,
2122    };
2123
2124    use async_lock::{Barrier, Mutex};
2125    use std::{
2126        convert::Infallible,
2127        sync::{
2128            atomic::{AtomicU32, AtomicU8, Ordering},
2129            Arc,
2130        },
2131        time::{Duration, Instant as StdInstant},
2132        vec,
2133    };
2134    use tokio::time::sleep;
2135
2136    #[test]
2137    fn futures_are_send() {
2138        let cache = Cache::new(0);
2139
2140        fn is_send(_: impl Send) {}
2141
2142        // pub fns
2143        is_send(cache.get(&()));
2144        is_send(cache.get_with((), async {}));
2145        is_send(cache.get_with_by_ref(&(), async {}));
2146        #[allow(deprecated)]
2147        is_send(cache.get_with_if((), async {}, |_| false));
2148        is_send(cache.insert((), ()));
2149        is_send(cache.invalidate(&()));
2150        is_send(cache.optionally_get_with((), async { None }));
2151        is_send(cache.optionally_get_with_by_ref(&(), async { None }));
2152        is_send(cache.remove(&()));
2153        is_send(cache.run_pending_tasks());
2154        is_send(cache.try_get_with((), async { Err(()) }));
2155        is_send(cache.try_get_with_by_ref(&(), async { Err(()) }));
2156
2157        // entry fns
2158        is_send(
2159            cache
2160                .entry(())
2161                .and_compute_with(|_| async { compute::Op::Nop }),
2162        );
2163        is_send(
2164            cache
2165                .entry(())
2166                .and_try_compute_with(|_| async { Ok(compute::Op::Nop) as Result<_, Infallible> }),
2167        );
2168        is_send(cache.entry(()).and_upsert_with(|_| async {}));
2169        is_send(cache.entry(()).or_default());
2170        is_send(cache.entry(()).or_insert(()));
2171        is_send(cache.entry(()).or_insert_with(async {}));
2172        is_send(cache.entry(()).or_insert_with_if(async {}, |_| false));
2173        is_send(cache.entry(()).or_optionally_insert_with(async { None }));
2174        is_send(cache.entry(()).or_try_insert_with(async { Err(()) }));
2175
2176        // entry_by_ref fns
2177        is_send(
2178            cache
2179                .entry_by_ref(&())
2180                .and_compute_with(|_| async { compute::Op::Nop }),
2181        );
2182        is_send(
2183            cache
2184                .entry_by_ref(&())
2185                .and_try_compute_with(|_| async { Ok(compute::Op::Nop) as Result<_, Infallible> }),
2186        );
2187        is_send(cache.entry_by_ref(&()).and_upsert_with(|_| async {}));
2188        is_send(cache.entry_by_ref(&()).or_default());
2189        is_send(cache.entry_by_ref(&()).or_insert(()));
2190        is_send(cache.entry_by_ref(&()).or_insert_with(async {}));
2191        is_send(
2192            cache
2193                .entry_by_ref(&())
2194                .or_insert_with_if(async {}, |_| false),
2195        );
2196        is_send(
2197            cache
2198                .entry_by_ref(&())
2199                .or_optionally_insert_with(async { None }),
2200        );
2201        is_send(
2202            cache
2203                .entry_by_ref(&())
2204                .or_try_insert_with(async { Err(()) }),
2205        );
2206    }
2207
2208    #[tokio::test]
2209    async fn max_capacity_zero() {
2210        let mut cache = Cache::new(0);
2211        cache.reconfigure_for_testing().await;
2212
2213        // Make the cache exterior immutable.
2214        let cache = cache;
2215
2216        cache.insert(0, ()).await;
2217
2218        assert!(!cache.contains_key(&0));
2219        assert!(cache.get(&0).await.is_none());
2220        cache.run_pending_tasks().await;
2221        assert!(!cache.contains_key(&0));
2222        assert!(cache.get(&0).await.is_none());
2223        assert_eq!(cache.entry_count(), 0)
2224    }
2225
2226    #[tokio::test]
2227    async fn basic_single_async_task() {
2228        // The following `Vec`s will hold actual and expected notifications.
2229        let actual = Arc::new(Mutex::new(Vec::new()));
2230        let mut expected = Vec::new();
2231
2232        // Create an eviction listener.
2233        let a1 = Arc::clone(&actual);
2234        let listener = move |k, v, cause| -> ListenerFuture {
2235            let a2 = Arc::clone(&a1);
2236            async move {
2237                a2.lock().await.push((k, v, cause));
2238            }
2239            .boxed()
2240        };
2241
2242        // Create a cache with the eviction listener.
2243        let mut cache = Cache::builder()
2244            .max_capacity(3)
2245            .async_eviction_listener(listener)
2246            .build();
2247        cache.reconfigure_for_testing().await;
2248
2249        // Make the cache exterior immutable.
2250        let cache = cache;
2251
2252        cache.insert("a", "alice").await;
2253        cache.insert("b", "bob").await;
2254        assert_eq!(cache.get(&"a").await, Some("alice"));
2255        assert!(cache.contains_key(&"a"));
2256        assert!(cache.contains_key(&"b"));
2257        assert_eq!(cache.get(&"b").await, Some("bob"));
2258        cache.run_pending_tasks().await;
2259        // counts: a -> 1, b -> 1
2260
2261        cache.insert("c", "cindy").await;
2262        assert_eq!(cache.get(&"c").await, Some("cindy"));
2263        assert!(cache.contains_key(&"c"));
2264        // counts: a -> 1, b -> 1, c -> 1
2265        cache.run_pending_tasks().await;
2266
2267        assert!(cache.contains_key(&"a"));
2268        assert_eq!(cache.get(&"a").await, Some("alice"));
2269        assert_eq!(cache.get(&"b").await, Some("bob"));
2270        assert!(cache.contains_key(&"b"));
2271        cache.run_pending_tasks().await;
2272        // counts: a -> 2, b -> 2, c -> 1
2273
2274        // "d" should not be admitted because its frequency is too low.
2275        cache.insert("d", "david").await; //   count: d -> 0
2276        expected.push((Arc::new("d"), "david", RemovalCause::Size));
2277        cache.run_pending_tasks().await;
2278        assert_eq!(cache.get(&"d").await, None); //   d -> 1
2279        assert!(!cache.contains_key(&"d"));
2280
2281        cache.insert("d", "david").await;
2282        expected.push((Arc::new("d"), "david", RemovalCause::Size));
2283        cache.run_pending_tasks().await;
2284        assert!(!cache.contains_key(&"d"));
2285        assert_eq!(cache.get(&"d").await, None); //   d -> 2
2286
2287        // "d" should be admitted and "c" should be evicted
2288        // because d's frequency is higher than c's.
2289        cache.insert("d", "dennis").await;
2290        expected.push((Arc::new("c"), "cindy", RemovalCause::Size));
2291        cache.run_pending_tasks().await;
2292        assert_eq!(cache.get(&"a").await, Some("alice"));
2293        assert_eq!(cache.get(&"b").await, Some("bob"));
2294        assert_eq!(cache.get(&"c").await, None);
2295        assert_eq!(cache.get(&"d").await, Some("dennis"));
2296        assert!(cache.contains_key(&"a"));
2297        assert!(cache.contains_key(&"b"));
2298        assert!(!cache.contains_key(&"c"));
2299        assert!(cache.contains_key(&"d"));
2300
2301        cache.invalidate(&"b").await;
2302        expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
2303        cache.run_pending_tasks().await;
2304        assert_eq!(cache.get(&"b").await, None);
2305        assert!(!cache.contains_key(&"b"));
2306
2307        assert!(cache.remove(&"b").await.is_none());
2308        assert_eq!(cache.remove(&"d").await, Some("dennis"));
2309        expected.push((Arc::new("d"), "dennis", RemovalCause::Explicit));
2310        cache.run_pending_tasks().await;
2311        assert_eq!(cache.get(&"d").await, None);
2312        assert!(!cache.contains_key(&"d"));
2313
2314        verify_notification_vec(&cache, actual, &expected).await;
2315        assert!(cache.key_locks_map_is_empty());
2316    }
2317
2318    #[tokio::test]
2319    async fn basic_lru_single_thread() {
2320        // The following `Vec`s will hold actual and expected notifications.
2321        let actual = Arc::new(Mutex::new(Vec::new()));
2322        let mut expected = Vec::new();
2323
2324        // Create an eviction listener.
2325        let a1 = Arc::clone(&actual);
2326        let listener = move |k, v, cause| -> ListenerFuture {
2327            let a2 = Arc::clone(&a1);
2328            async move {
2329                a2.lock().await.push((k, v, cause));
2330            }
2331            .boxed()
2332        };
2333
2334        // Create a cache with the eviction listener.
2335        let mut cache = Cache::builder()
2336            .max_capacity(3)
2337            .eviction_policy(EvictionPolicy::lru())
2338            .async_eviction_listener(listener)
2339            .build();
2340        cache.reconfigure_for_testing().await;
2341
2342        // Make the cache exterior immutable.
2343        let cache = cache;
2344
2345        cache.insert("a", "alice").await;
2346        cache.insert("b", "bob").await;
2347        assert_eq!(cache.get(&"a").await, Some("alice"));
2348        assert!(cache.contains_key(&"a"));
2349        assert!(cache.contains_key(&"b"));
2350        assert_eq!(cache.get(&"b").await, Some("bob"));
2351        cache.run_pending_tasks().await;
2352        // a -> b
2353
2354        cache.insert("c", "cindy").await;
2355        assert_eq!(cache.get(&"c").await, Some("cindy"));
2356        assert!(cache.contains_key(&"c"));
2357        cache.run_pending_tasks().await;
2358        // a -> b -> c
2359
2360        assert!(cache.contains_key(&"a"));
2361        assert_eq!(cache.get(&"a").await, Some("alice"));
2362        assert_eq!(cache.get(&"b").await, Some("bob"));
2363        assert!(cache.contains_key(&"b"));
2364        cache.run_pending_tasks().await;
2365        // c -> a -> b
2366
2367        // "d" should be admitted because the cache uses the LRU strategy.
2368        cache.insert("d", "david").await;
2369        // "c" is the LRU and should have be evicted.
2370        expected.push((Arc::new("c"), "cindy", RemovalCause::Size));
2371        cache.run_pending_tasks().await;
2372
2373        assert_eq!(cache.get(&"a").await, Some("alice"));
2374        assert_eq!(cache.get(&"b").await, Some("bob"));
2375        assert_eq!(cache.get(&"c").await, None);
2376        assert_eq!(cache.get(&"d").await, Some("david"));
2377        assert!(cache.contains_key(&"a"));
2378        assert!(cache.contains_key(&"b"));
2379        assert!(!cache.contains_key(&"c"));
2380        assert!(cache.contains_key(&"d"));
2381        cache.run_pending_tasks().await;
2382        // a -> b -> d
2383
2384        cache.invalidate(&"b").await;
2385        expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
2386        cache.run_pending_tasks().await;
2387        // a -> d
2388        assert_eq!(cache.get(&"b").await, None);
2389        assert!(!cache.contains_key(&"b"));
2390
2391        assert!(cache.remove(&"b").await.is_none());
2392        assert_eq!(cache.remove(&"d").await, Some("david"));
2393        expected.push((Arc::new("d"), "david", RemovalCause::Explicit));
2394        cache.run_pending_tasks().await;
2395        // a
2396        assert_eq!(cache.get(&"d").await, None);
2397        assert!(!cache.contains_key(&"d"));
2398
2399        cache.insert("e", "emily").await;
2400        cache.insert("f", "frank").await;
2401        // "a" should be evicted because it is the LRU.
2402        cache.insert("g", "gina").await;
2403        expected.push((Arc::new("a"), "alice", RemovalCause::Size));
2404        cache.run_pending_tasks().await;
2405        // e -> f -> g
2406        assert_eq!(cache.get(&"a").await, None);
2407        assert_eq!(cache.get(&"e").await, Some("emily"));
2408        assert_eq!(cache.get(&"f").await, Some("frank"));
2409        assert_eq!(cache.get(&"g").await, Some("gina"));
2410        assert!(!cache.contains_key(&"a"));
2411        assert!(cache.contains_key(&"e"));
2412        assert!(cache.contains_key(&"f"));
2413        assert!(cache.contains_key(&"g"));
2414
2415        verify_notification_vec(&cache, actual, &expected).await;
2416        assert!(cache.key_locks_map_is_empty());
2417    }
2418
2419    #[tokio::test]
2420    async fn size_aware_eviction() {
2421        let weigher = |_k: &&str, v: &(&str, u32)| v.1;
2422
2423        let alice = ("alice", 10);
2424        let bob = ("bob", 15);
2425        let bill = ("bill", 20);
2426        let cindy = ("cindy", 5);
2427        let david = ("david", 15);
2428        let dennis = ("dennis", 15);
2429
2430        // The following `Vec`s will hold actual and expected notifications.
2431        let actual = Arc::new(Mutex::new(Vec::new()));
2432        let mut expected = Vec::new();
2433
2434        // Create an eviction listener.
2435        let a1 = Arc::clone(&actual);
2436        let listener = move |k, v, cause| -> ListenerFuture {
2437            let a2 = Arc::clone(&a1);
2438            async move {
2439                a2.lock().await.push((k, v, cause));
2440            }
2441            .boxed()
2442        };
2443
2444        // Create a cache with the eviction listener.
2445        let mut cache = Cache::builder()
2446            .max_capacity(31)
2447            .weigher(weigher)
2448            .async_eviction_listener(listener)
2449            .build();
2450        cache.reconfigure_for_testing().await;
2451
2452        // Make the cache exterior immutable.
2453        let cache = cache;
2454
2455        cache.insert("a", alice).await;
2456        cache.insert("b", bob).await;
2457        assert_eq!(cache.get(&"a").await, Some(alice));
2458        assert!(cache.contains_key(&"a"));
2459        assert!(cache.contains_key(&"b"));
2460        assert_eq!(cache.get(&"b").await, Some(bob));
2461        cache.run_pending_tasks().await;
2462        // order (LRU -> MRU) and counts: a -> 1, b -> 1
2463
2464        cache.insert("c", cindy).await;
2465        assert_eq!(cache.get(&"c").await, Some(cindy));
2466        assert!(cache.contains_key(&"c"));
2467        // order and counts: a -> 1, b -> 1, c -> 1
2468        cache.run_pending_tasks().await;
2469
2470        assert!(cache.contains_key(&"a"));
2471        assert_eq!(cache.get(&"a").await, Some(alice));
2472        assert_eq!(cache.get(&"b").await, Some(bob));
2473        assert!(cache.contains_key(&"b"));
2474        cache.run_pending_tasks().await;
2475        // order and counts: c -> 1, a -> 2, b -> 2
2476
2477        // To enter "d" (weight: 15), it needs to evict "c" (w: 5) and "a" (w: 10).
2478        // "d" must have higher count than 3, which is the aggregated count
2479        // of "a" and "c".
2480        cache.insert("d", david).await; //   count: d -> 0
2481        expected.push((Arc::new("d"), david, RemovalCause::Size));
2482        cache.run_pending_tasks().await;
2483        assert_eq!(cache.get(&"d").await, None); //   d -> 1
2484        assert!(!cache.contains_key(&"d"));
2485
2486        cache.insert("d", david).await;
2487        expected.push((Arc::new("d"), david, RemovalCause::Size));
2488        cache.run_pending_tasks().await;
2489        assert!(!cache.contains_key(&"d"));
2490        assert_eq!(cache.get(&"d").await, None); //   d -> 2
2491
2492        cache.insert("d", david).await;
2493        expected.push((Arc::new("d"), david, RemovalCause::Size));
2494        cache.run_pending_tasks().await;
2495        assert_eq!(cache.get(&"d").await, None); //   d -> 3
2496        assert!(!cache.contains_key(&"d"));
2497
2498        cache.insert("d", david).await;
2499        expected.push((Arc::new("d"), david, RemovalCause::Size));
2500        cache.run_pending_tasks().await;
2501        assert!(!cache.contains_key(&"d"));
2502        assert_eq!(cache.get(&"d").await, None); //   d -> 4
2503
2504        // Finally "d" should be admitted by evicting "c" and "a".
2505        cache.insert("d", dennis).await;
2506        expected.push((Arc::new("c"), cindy, RemovalCause::Size));
2507        expected.push((Arc::new("a"), alice, RemovalCause::Size));
2508        cache.run_pending_tasks().await;
2509        assert_eq!(cache.get(&"a").await, None);
2510        assert_eq!(cache.get(&"b").await, Some(bob));
2511        assert_eq!(cache.get(&"c").await, None);
2512        assert_eq!(cache.get(&"d").await, Some(dennis));
2513        assert!(!cache.contains_key(&"a"));
2514        assert!(cache.contains_key(&"b"));
2515        assert!(!cache.contains_key(&"c"));
2516        assert!(cache.contains_key(&"d"));
2517
2518        // Update "b" with "bill" (w: 15 -> 20). This should evict "d" (w: 15).
2519        cache.insert("b", bill).await;
2520        expected.push((Arc::new("b"), bob, RemovalCause::Replaced));
2521        expected.push((Arc::new("d"), dennis, RemovalCause::Size));
2522        cache.run_pending_tasks().await;
2523        assert_eq!(cache.get(&"b").await, Some(bill));
2524        assert_eq!(cache.get(&"d").await, None);
2525        assert!(cache.contains_key(&"b"));
2526        assert!(!cache.contains_key(&"d"));
2527
2528        // Re-add "a" (w: 10) and update "b" with "bob" (w: 20 -> 15).
2529        cache.insert("a", alice).await;
2530        cache.insert("b", bob).await;
2531        expected.push((Arc::new("b"), bill, RemovalCause::Replaced));
2532        cache.run_pending_tasks().await;
2533        assert_eq!(cache.get(&"a").await, Some(alice));
2534        assert_eq!(cache.get(&"b").await, Some(bob));
2535        assert_eq!(cache.get(&"d").await, None);
2536        assert!(cache.contains_key(&"a"));
2537        assert!(cache.contains_key(&"b"));
2538        assert!(!cache.contains_key(&"d"));
2539
2540        // Verify the sizes.
2541        assert_eq!(cache.entry_count(), 2);
2542        assert_eq!(cache.weighted_size(), 25);
2543
2544        verify_notification_vec(&cache, actual, &expected).await;
2545        assert!(cache.key_locks_map_is_empty());
2546    }
2547
2548    #[tokio::test]
2549    async fn basic_multi_async_tasks() {
2550        let num_tasks = 2;
2551        let num_threads = 2;
2552
2553        let cache = Cache::new(100);
2554        let barrier = Arc::new(Barrier::new(num_tasks + num_threads as usize));
2555
2556        let tasks = (0..num_tasks)
2557            .map(|id| {
2558                let cache = cache.clone();
2559                let barrier = Arc::clone(&barrier);
2560
2561                tokio::spawn(async move {
2562                    barrier.wait().await;
2563
2564                    cache.insert(10, format!("{id}-100")).await;
2565                    cache.get(&10).await;
2566                    cache.insert(20, format!("{id}-200")).await;
2567                    cache.invalidate(&10).await;
2568                })
2569            })
2570            .collect::<Vec<_>>();
2571
2572        let threads = (0..num_threads)
2573            .map(|id| {
2574                let cache = cache.clone();
2575                let barrier = Arc::clone(&barrier);
2576                let rt = tokio::runtime::Handle::current();
2577
2578                std::thread::spawn(move || {
2579                    rt.block_on(barrier.wait());
2580
2581                    rt.block_on(cache.insert(10, format!("{id}-100")));
2582                    rt.block_on(cache.get(&10));
2583                    rt.block_on(cache.insert(20, format!("{id}-200")));
2584                    rt.block_on(cache.invalidate(&10));
2585                })
2586            })
2587            .collect::<Vec<_>>();
2588
2589        let _ = futures_util::future::join_all(tasks).await;
2590        threads.into_iter().for_each(|t| t.join().unwrap());
2591
2592        assert!(cache.get(&10).await.is_none());
2593        assert!(cache.get(&20).await.is_some());
2594        assert!(!cache.contains_key(&10));
2595        assert!(cache.contains_key(&20));
2596    }
2597
2598    #[tokio::test]
2599    async fn invalidate_all() {
2600        // The following `Vec`s will hold actual and expected notifications.
2601        let actual = Arc::new(Mutex::new(Vec::new()));
2602        let mut expected = Vec::new();
2603
2604        // Create an eviction listener.
2605        let a1 = Arc::clone(&actual);
2606        let listener = move |k, v, cause| -> ListenerFuture {
2607            let a2 = Arc::clone(&a1);
2608            async move {
2609                a2.lock().await.push((k, v, cause));
2610            }
2611            .boxed()
2612        };
2613
2614        // Create a cache with the eviction listener.
2615        let mut cache = Cache::builder()
2616            .max_capacity(100)
2617            .async_eviction_listener(listener)
2618            .build();
2619        cache.reconfigure_for_testing().await;
2620
2621        // Make the cache exterior immutable.
2622        let cache = cache;
2623
2624        cache.insert("a", "alice").await;
2625        cache.insert("b", "bob").await;
2626        cache.insert("c", "cindy").await;
2627        assert_eq!(cache.get(&"a").await, Some("alice"));
2628        assert_eq!(cache.get(&"b").await, Some("bob"));
2629        assert_eq!(cache.get(&"c").await, Some("cindy"));
2630        assert!(cache.contains_key(&"a"));
2631        assert!(cache.contains_key(&"b"));
2632        assert!(cache.contains_key(&"c"));
2633
2634        // `cache.run_pending_tasks().await` is no longer needed here before invalidating. The last
2635        // modified timestamp of the entries were updated when they were inserted.
2636        // https://github.com/moka-rs/moka/issues/155
2637
2638        cache.invalidate_all();
2639        expected.push((Arc::new("a"), "alice", RemovalCause::Explicit));
2640        expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
2641        expected.push((Arc::new("c"), "cindy", RemovalCause::Explicit));
2642        cache.run_pending_tasks().await;
2643
2644        cache.insert("d", "david").await;
2645        cache.run_pending_tasks().await;
2646
2647        assert!(cache.get(&"a").await.is_none());
2648        assert!(cache.get(&"b").await.is_none());
2649        assert!(cache.get(&"c").await.is_none());
2650        assert_eq!(cache.get(&"d").await, Some("david"));
2651        assert!(!cache.contains_key(&"a"));
2652        assert!(!cache.contains_key(&"b"));
2653        assert!(!cache.contains_key(&"c"));
2654        assert!(cache.contains_key(&"d"));
2655
2656        verify_notification_vec(&cache, actual, &expected).await;
2657    }
2658
2659    // This test is for https://github.com/moka-rs/moka/issues/155
2660    #[tokio::test]
2661    async fn invalidate_all_without_running_pending_tasks() {
2662        let cache = Cache::new(1024);
2663
2664        assert_eq!(cache.get(&0).await, None);
2665        cache.insert(0, 1).await;
2666        assert_eq!(cache.get(&0).await, Some(1));
2667
2668        cache.invalidate_all();
2669        assert_eq!(cache.get(&0).await, None);
2670    }
2671
2672    #[tokio::test]
2673    async fn invalidate_entries_if() -> Result<(), Box<dyn std::error::Error>> {
2674        use std::collections::HashSet;
2675
2676        // The following `Vec`s will hold actual and expected notifications.
2677        let actual = Arc::new(Mutex::new(Vec::new()));
2678        let mut expected = Vec::new();
2679
2680        // Create an eviction listener.
2681        let a1 = Arc::clone(&actual);
2682        let listener = move |k, v, cause| -> ListenerFuture {
2683            let a2 = Arc::clone(&a1);
2684            async move {
2685                a2.lock().await.push((k, v, cause));
2686            }
2687            .boxed()
2688        };
2689
2690        let (clock, mock) = Clock::mock();
2691
2692        // Create a cache with the eviction listener.
2693        let mut cache = Cache::builder()
2694            .max_capacity(100)
2695            .support_invalidation_closures()
2696            .async_eviction_listener(listener)
2697            .clock(clock)
2698            .build();
2699        cache.reconfigure_for_testing().await;
2700
2701        // Make the cache exterior immutable.
2702        let cache = cache;
2703
2704        cache.insert(0, "alice").await;
2705        cache.insert(1, "bob").await;
2706        cache.insert(2, "alex").await;
2707        cache.run_pending_tasks().await;
2708
2709        mock.increment(Duration::from_secs(5)); // 5 secs from the start.
2710        cache.run_pending_tasks().await;
2711
2712        assert_eq!(cache.get(&0).await, Some("alice"));
2713        assert_eq!(cache.get(&1).await, Some("bob"));
2714        assert_eq!(cache.get(&2).await, Some("alex"));
2715        assert!(cache.contains_key(&0));
2716        assert!(cache.contains_key(&1));
2717        assert!(cache.contains_key(&2));
2718
2719        let names = ["alice", "alex"].iter().cloned().collect::<HashSet<_>>();
2720        cache.invalidate_entries_if(move |_k, &v| names.contains(v))?;
2721        assert_eq!(cache.invalidation_predicate_count(), 1);
2722        expected.push((Arc::new(0), "alice", RemovalCause::Explicit));
2723        expected.push((Arc::new(2), "alex", RemovalCause::Explicit));
2724
2725        mock.increment(Duration::from_secs(5)); // 10 secs from the start.
2726
2727        cache.insert(3, "alice").await;
2728
2729        // Run the invalidation task and wait for it to finish. (TODO: Need a better way than sleeping)
2730        cache.run_pending_tasks().await; // To submit the invalidation task.
2731        std::thread::sleep(Duration::from_millis(200));
2732        cache.run_pending_tasks().await; // To process the task result.
2733        std::thread::sleep(Duration::from_millis(200));
2734
2735        assert!(cache.get(&0).await.is_none());
2736        assert!(cache.get(&2).await.is_none());
2737        assert_eq!(cache.get(&1).await, Some("bob"));
2738        // This should survive as it was inserted after calling invalidate_entries_if.
2739        assert_eq!(cache.get(&3).await, Some("alice"));
2740
2741        assert!(!cache.contains_key(&0));
2742        assert!(cache.contains_key(&1));
2743        assert!(!cache.contains_key(&2));
2744        assert!(cache.contains_key(&3));
2745
2746        assert_eq!(cache.entry_count(), 2);
2747        assert_eq!(cache.invalidation_predicate_count(), 0);
2748
2749        mock.increment(Duration::from_secs(5)); // 15 secs from the start.
2750
2751        cache.invalidate_entries_if(|_k, &v| v == "alice")?;
2752        cache.invalidate_entries_if(|_k, &v| v == "bob")?;
2753        assert_eq!(cache.invalidation_predicate_count(), 2);
2754        // key 1 was inserted before key 3.
2755        expected.push((Arc::new(1), "bob", RemovalCause::Explicit));
2756        expected.push((Arc::new(3), "alice", RemovalCause::Explicit));
2757
2758        // Run the invalidation task and wait for it to finish. (TODO: Need a better way than sleeping)
2759        cache.run_pending_tasks().await; // To submit the invalidation task.
2760        std::thread::sleep(Duration::from_millis(200));
2761        cache.run_pending_tasks().await; // To process the task result.
2762        std::thread::sleep(Duration::from_millis(200));
2763
2764        assert!(cache.get(&1).await.is_none());
2765        assert!(cache.get(&3).await.is_none());
2766
2767        assert!(!cache.contains_key(&1));
2768        assert!(!cache.contains_key(&3));
2769
2770        assert_eq!(cache.entry_count(), 0);
2771        assert_eq!(cache.invalidation_predicate_count(), 0);
2772
2773        verify_notification_vec(&cache, actual, &expected).await;
2774
2775        Ok(())
2776    }
2777
2778    #[tokio::test]
2779    async fn time_to_live() {
2780        // The following `Vec`s will hold actual and expected notifications.
2781        let actual = Arc::new(Mutex::new(Vec::new()));
2782        let mut expected = Vec::new();
2783
2784        // Create an eviction listener.
2785        let a1 = Arc::clone(&actual);
2786        let listener = move |k, v, cause| -> ListenerFuture {
2787            let a2 = Arc::clone(&a1);
2788            async move {
2789                a2.lock().await.push((k, v, cause));
2790            }
2791            .boxed()
2792        };
2793
2794        let (clock, mock) = Clock::mock();
2795
2796        // Create a cache with the eviction listener.
2797        let mut cache = Cache::builder()
2798            .max_capacity(100)
2799            .time_to_live(Duration::from_secs(10))
2800            .async_eviction_listener(listener)
2801            .clock(clock)
2802            .build();
2803        cache.reconfigure_for_testing().await;
2804
2805        // Make the cache exterior immutable.
2806        let cache = cache;
2807
2808        cache.insert("a", "alice").await;
2809        cache.run_pending_tasks().await;
2810
2811        mock.increment(Duration::from_secs(5)); // 5 secs from the start.
2812        cache.run_pending_tasks().await;
2813
2814        assert_eq!(cache.get(&"a").await, Some("alice"));
2815        assert!(cache.contains_key(&"a"));
2816
2817        mock.increment(Duration::from_secs(5)); // 10 secs.
2818        expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
2819        assert_eq!(cache.get(&"a").await, None);
2820        assert!(!cache.contains_key(&"a"));
2821
2822        assert_eq!(cache.iter().count(), 0);
2823
2824        cache.run_pending_tasks().await;
2825        assert!(cache.is_table_empty());
2826
2827        cache.insert("b", "bob").await;
2828        cache.run_pending_tasks().await;
2829
2830        assert_eq!(cache.entry_count(), 1);
2831
2832        mock.increment(Duration::from_secs(5)); // 15 secs.
2833        cache.run_pending_tasks().await;
2834
2835        assert_eq!(cache.get(&"b").await, Some("bob"));
2836        assert!(cache.contains_key(&"b"));
2837        assert_eq!(cache.entry_count(), 1);
2838
2839        cache.insert("b", "bill").await;
2840        expected.push((Arc::new("b"), "bob", RemovalCause::Replaced));
2841        cache.run_pending_tasks().await;
2842
2843        mock.increment(Duration::from_secs(5)); // 20 secs
2844        cache.run_pending_tasks().await;
2845
2846        assert_eq!(cache.get(&"b").await, Some("bill"));
2847        assert!(cache.contains_key(&"b"));
2848        assert_eq!(cache.entry_count(), 1);
2849
2850        mock.increment(Duration::from_secs(5)); // 25 secs
2851        expected.push((Arc::new("b"), "bill", RemovalCause::Expired));
2852
2853        assert_eq!(cache.get(&"a").await, None);
2854        assert_eq!(cache.get(&"b").await, None);
2855        assert!(!cache.contains_key(&"a"));
2856        assert!(!cache.contains_key(&"b"));
2857
2858        assert_eq!(cache.iter().count(), 0);
2859
2860        cache.run_pending_tasks().await;
2861        assert!(cache.is_table_empty());
2862
2863        verify_notification_vec(&cache, actual, &expected).await;
2864    }
2865
2866    #[tokio::test]
2867    async fn time_to_idle() {
2868        // The following `Vec`s will hold actual and expected notifications.
2869        let actual = Arc::new(Mutex::new(Vec::new()));
2870        let mut expected = Vec::new();
2871
2872        // Create an eviction listener.
2873        let a1 = Arc::clone(&actual);
2874        let listener = move |k, v, cause| -> ListenerFuture {
2875            let a2 = Arc::clone(&a1);
2876            async move {
2877                a2.lock().await.push((k, v, cause));
2878            }
2879            .boxed()
2880        };
2881
2882        let (clock, mock) = Clock::mock();
2883
2884        // Create a cache with the eviction listener.
2885        let mut cache = Cache::builder()
2886            .max_capacity(100)
2887            .time_to_idle(Duration::from_secs(10))
2888            .async_eviction_listener(listener)
2889            .clock(clock)
2890            .build();
2891        cache.reconfigure_for_testing().await;
2892
2893        // Make the cache exterior immutable.
2894        let cache = cache;
2895
2896        cache.insert("a", "alice").await;
2897        cache.run_pending_tasks().await;
2898
2899        mock.increment(Duration::from_secs(5)); // 5 secs from the start.
2900        cache.run_pending_tasks().await;
2901
2902        assert_eq!(cache.get(&"a").await, Some("alice"));
2903
2904        mock.increment(Duration::from_secs(5)); // 10 secs.
2905        cache.run_pending_tasks().await;
2906
2907        cache.insert("b", "bob").await;
2908        cache.run_pending_tasks().await;
2909
2910        assert_eq!(cache.entry_count(), 2);
2911
2912        mock.increment(Duration::from_secs(2)); // 12 secs.
2913        cache.run_pending_tasks().await;
2914
2915        // contains_key does not reset the idle timer for the key.
2916        assert!(cache.contains_key(&"a"));
2917        assert!(cache.contains_key(&"b"));
2918        cache.run_pending_tasks().await;
2919
2920        assert_eq!(cache.entry_count(), 2);
2921
2922        mock.increment(Duration::from_secs(3)); // 15 secs.
2923        expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
2924
2925        assert_eq!(cache.get(&"a").await, None);
2926        assert_eq!(cache.get(&"b").await, Some("bob"));
2927        assert!(!cache.contains_key(&"a"));
2928        assert!(cache.contains_key(&"b"));
2929
2930        assert_eq!(cache.iter().count(), 1);
2931
2932        cache.run_pending_tasks().await;
2933        assert_eq!(cache.entry_count(), 1);
2934
2935        mock.increment(Duration::from_secs(10)); // 25 secs
2936        expected.push((Arc::new("b"), "bob", RemovalCause::Expired));
2937
2938        assert_eq!(cache.get(&"a").await, None);
2939        assert_eq!(cache.get(&"b").await, None);
2940        assert!(!cache.contains_key(&"a"));
2941        assert!(!cache.contains_key(&"b"));
2942
2943        assert_eq!(cache.iter().count(), 0);
2944
2945        cache.run_pending_tasks().await;
2946        assert!(cache.is_table_empty());
2947
2948        verify_notification_vec(&cache, actual, &expected).await;
2949    }
2950
2951    // https://github.com/moka-rs/moka/issues/359
2952    #[tokio::test]
2953    async fn ensure_access_time_is_updated_immediately_after_read() {
2954        let (clock, mock) = Clock::mock();
2955        let mut cache = Cache::builder()
2956            .max_capacity(10)
2957            .time_to_idle(Duration::from_secs(5))
2958            .clock(clock)
2959            .build();
2960        cache.reconfigure_for_testing().await;
2961        // Make the cache exterior immutable.
2962        let cache = cache;
2963
2964        cache.insert(1, 1).await;
2965
2966        mock.increment(Duration::from_secs(4));
2967        assert_eq!(cache.get(&1).await, Some(1));
2968
2969        mock.increment(Duration::from_secs(2));
2970        assert_eq!(cache.get(&1).await, Some(1));
2971        cache.run_pending_tasks().await;
2972        assert_eq!(cache.get(&1).await, Some(1));
2973    }
2974
2975    #[tokio::test]
2976    async fn time_to_live_by_expiry_type() {
2977        // The following `Vec`s will hold actual and expected notifications.
2978        let actual = Arc::new(Mutex::new(Vec::new()));
2979        let mut expected = Vec::new();
2980
2981        // Define an expiry type.
2982        struct MyExpiry {
2983            counters: Arc<ExpiryCallCounters>,
2984        }
2985
2986        impl MyExpiry {
2987            fn new(counters: Arc<ExpiryCallCounters>) -> Self {
2988                Self { counters }
2989            }
2990        }
2991
2992        impl Expiry<&str, &str> for MyExpiry {
2993            fn expire_after_create(
2994                &self,
2995                _key: &&str,
2996                _value: &&str,
2997                _current_time: StdInstant,
2998            ) -> Option<Duration> {
2999                self.counters.incl_actual_creations();
3000                Some(Duration::from_secs(10))
3001            }
3002
3003            fn expire_after_update(
3004                &self,
3005                _key: &&str,
3006                _value: &&str,
3007                _current_time: StdInstant,
3008                _current_duration: Option<Duration>,
3009            ) -> Option<Duration> {
3010                self.counters.incl_actual_updates();
3011                Some(Duration::from_secs(10))
3012            }
3013        }
3014
3015        // Create an eviction listener.
3016        let a1 = Arc::clone(&actual);
3017        let listener = move |k, v, cause| -> ListenerFuture {
3018            let a2 = Arc::clone(&a1);
3019            async move {
3020                a2.lock().await.push((k, v, cause));
3021            }
3022            .boxed()
3023        };
3024
3025        // Create expiry counters and the expiry.
3026        let expiry_counters = Arc::new(ExpiryCallCounters::default());
3027        let expiry = MyExpiry::new(Arc::clone(&expiry_counters));
3028
3029        let (clock, mock) = Clock::mock();
3030
3031        // Create a cache with the expiry and eviction listener.
3032        let mut cache = Cache::builder()
3033            .max_capacity(100)
3034            .expire_after(expiry)
3035            .async_eviction_listener(listener)
3036            .clock(clock)
3037            .build();
3038        cache.reconfigure_for_testing().await;
3039
3040        // Make the cache exterior immutable.
3041        let cache = cache;
3042
3043        cache.insert("a", "alice").await;
3044        expiry_counters.incl_expected_creations();
3045        cache.run_pending_tasks().await;
3046
3047        mock.increment(Duration::from_secs(5)); // 5 secs from the start.
3048        cache.run_pending_tasks().await;
3049
3050        assert_eq!(cache.get(&"a").await, Some("alice"));
3051        assert!(cache.contains_key(&"a"));
3052
3053        mock.increment(Duration::from_secs(5)); // 10 secs.
3054        expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
3055        assert_eq!(cache.get(&"a").await, None);
3056        assert!(!cache.contains_key(&"a"));
3057
3058        assert_eq!(cache.iter().count(), 0);
3059
3060        cache.run_pending_tasks().await;
3061        assert!(cache.is_table_empty());
3062
3063        cache.insert("b", "bob").await;
3064        expiry_counters.incl_expected_creations();
3065        cache.run_pending_tasks().await;
3066
3067        assert_eq!(cache.entry_count(), 1);
3068
3069        mock.increment(Duration::from_secs(5)); // 15 secs.
3070        cache.run_pending_tasks().await;
3071
3072        assert_eq!(cache.get(&"b").await, Some("bob"));
3073        assert!(cache.contains_key(&"b"));
3074        assert_eq!(cache.entry_count(), 1);
3075
3076        cache.insert("b", "bill").await;
3077        expected.push((Arc::new("b"), "bob", RemovalCause::Replaced));
3078        expiry_counters.incl_expected_updates();
3079        cache.run_pending_tasks().await;
3080
3081        mock.increment(Duration::from_secs(5)); // 20 secs
3082        cache.run_pending_tasks().await;
3083
3084        assert_eq!(cache.get(&"b").await, Some("bill"));
3085        assert!(cache.contains_key(&"b"));
3086        assert_eq!(cache.entry_count(), 1);
3087
3088        mock.increment(Duration::from_secs(5)); // 25 secs
3089        expected.push((Arc::new("b"), "bill", RemovalCause::Expired));
3090
3091        assert_eq!(cache.get(&"a").await, None);
3092        assert_eq!(cache.get(&"b").await, None);
3093        assert!(!cache.contains_key(&"a"));
3094        assert!(!cache.contains_key(&"b"));
3095
3096        assert_eq!(cache.iter().count(), 0);
3097
3098        cache.run_pending_tasks().await;
3099        assert!(cache.is_table_empty());
3100
3101        expiry_counters.verify();
3102        verify_notification_vec(&cache, actual, &expected).await;
3103    }
3104
3105    #[tokio::test]
3106    async fn time_to_idle_by_expiry_type() {
3107        // The following `Vec`s will hold actual and expected notifications.
3108        let actual = Arc::new(Mutex::new(Vec::new()));
3109        let mut expected = Vec::new();
3110
3111        // Define an expiry type.
3112        struct MyExpiry {
3113            counters: Arc<ExpiryCallCounters>,
3114        }
3115
3116        impl MyExpiry {
3117            fn new(counters: Arc<ExpiryCallCounters>) -> Self {
3118                Self { counters }
3119            }
3120        }
3121
3122        impl Expiry<&str, &str> for MyExpiry {
3123            fn expire_after_read(
3124                &self,
3125                _key: &&str,
3126                _value: &&str,
3127                _current_time: StdInstant,
3128                _current_duration: Option<Duration>,
3129                _last_modified_at: StdInstant,
3130            ) -> Option<Duration> {
3131                self.counters.incl_actual_reads();
3132                Some(Duration::from_secs(10))
3133            }
3134        }
3135
3136        // Create an eviction listener.
3137        let a1 = Arc::clone(&actual);
3138        let listener = move |k, v, cause| -> ListenerFuture {
3139            let a2 = Arc::clone(&a1);
3140            async move {
3141                a2.lock().await.push((k, v, cause));
3142            }
3143            .boxed()
3144        };
3145
3146        // Create expiry counters and the expiry.
3147        let expiry_counters = Arc::new(ExpiryCallCounters::default());
3148        let expiry = MyExpiry::new(Arc::clone(&expiry_counters));
3149
3150        let (clock, mock) = Clock::mock();
3151
3152        // Create a cache with the expiry and eviction listener.
3153        let mut cache = Cache::builder()
3154            .max_capacity(100)
3155            .expire_after(expiry)
3156            .async_eviction_listener(listener)
3157            .clock(clock)
3158            .build();
3159        cache.reconfigure_for_testing().await;
3160
3161        // Make the cache exterior immutable.
3162        let cache = cache;
3163
3164        cache.insert("a", "alice").await;
3165        cache.run_pending_tasks().await;
3166
3167        mock.increment(Duration::from_secs(5)); // 5 secs from the start.
3168        cache.run_pending_tasks().await;
3169
3170        assert_eq!(cache.get(&"a").await, Some("alice"));
3171        expiry_counters.incl_expected_reads();
3172
3173        mock.increment(Duration::from_secs(5)); // 10 secs.
3174        cache.run_pending_tasks().await;
3175
3176        cache.insert("b", "bob").await;
3177        cache.run_pending_tasks().await;
3178
3179        assert_eq!(cache.entry_count(), 2);
3180
3181        mock.increment(Duration::from_secs(2)); // 12 secs.
3182        cache.run_pending_tasks().await;
3183
3184        // contains_key does not reset the idle timer for the key.
3185        assert!(cache.contains_key(&"a"));
3186        assert!(cache.contains_key(&"b"));
3187        cache.run_pending_tasks().await;
3188
3189        assert_eq!(cache.entry_count(), 2);
3190
3191        mock.increment(Duration::from_secs(3)); // 15 secs.
3192        expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
3193
3194        assert_eq!(cache.get(&"a").await, None);
3195        assert_eq!(cache.get(&"b").await, Some("bob"));
3196        expiry_counters.incl_expected_reads();
3197        assert!(!cache.contains_key(&"a"));
3198        assert!(cache.contains_key(&"b"));
3199
3200        assert_eq!(cache.iter().count(), 1);
3201
3202        cache.run_pending_tasks().await;
3203        assert_eq!(cache.entry_count(), 1);
3204
3205        mock.increment(Duration::from_secs(10)); // 25 secs
3206        expected.push((Arc::new("b"), "bob", RemovalCause::Expired));
3207
3208        assert_eq!(cache.get(&"a").await, None);
3209        assert_eq!(cache.get(&"b").await, None);
3210        assert!(!cache.contains_key(&"a"));
3211        assert!(!cache.contains_key(&"b"));
3212
3213        assert_eq!(cache.iter().count(), 0);
3214
3215        cache.run_pending_tasks().await;
3216        assert!(cache.is_table_empty());
3217
3218        expiry_counters.verify();
3219        verify_notification_vec(&cache, actual, &expected).await;
3220    }
3221
3222    /// Verify that the `Expiry::expire_after_read()` method is called in `get_with`
3223    /// only when the key was already present in the cache.
3224    #[tokio::test]
3225    async fn test_expiry_using_get_with() {
3226        // Define an expiry type, which always return `None`.
3227        struct NoExpiry {
3228            counters: Arc<ExpiryCallCounters>,
3229        }
3230
3231        impl NoExpiry {
3232            fn new(counters: Arc<ExpiryCallCounters>) -> Self {
3233                Self { counters }
3234            }
3235        }
3236
3237        impl Expiry<&str, &str> for NoExpiry {
3238            fn expire_after_create(
3239                &self,
3240                _key: &&str,
3241                _value: &&str,
3242                _current_time: StdInstant,
3243            ) -> Option<Duration> {
3244                self.counters.incl_actual_creations();
3245                None
3246            }
3247
3248            fn expire_after_read(
3249                &self,
3250                _key: &&str,
3251                _value: &&str,
3252                _current_time: StdInstant,
3253                _current_duration: Option<Duration>,
3254                _last_modified_at: StdInstant,
3255            ) -> Option<Duration> {
3256                self.counters.incl_actual_reads();
3257                None
3258            }
3259
3260            fn expire_after_update(
3261                &self,
3262                _key: &&str,
3263                _value: &&str,
3264                _current_time: StdInstant,
3265                _current_duration: Option<Duration>,
3266            ) -> Option<Duration> {
3267                unreachable!("The `expire_after_update()` method should not be called.");
3268            }
3269        }
3270
3271        // Create expiry counters and the expiry.
3272        let expiry_counters = Arc::new(ExpiryCallCounters::default());
3273        let expiry = NoExpiry::new(Arc::clone(&expiry_counters));
3274
3275        // Create a cache with the expiry and eviction listener.
3276        let mut cache = Cache::builder()
3277            .max_capacity(100)
3278            .expire_after(expiry)
3279            .build();
3280        cache.reconfigure_for_testing().await;
3281
3282        // Make the cache exterior immutable.
3283        let cache = cache;
3284
3285        // The key is not present.
3286        cache.get_with("a", async { "alice" }).await;
3287        expiry_counters.incl_expected_creations();
3288        cache.run_pending_tasks().await;
3289
3290        // The key is present.
3291        cache.get_with("a", async { "alex" }).await;
3292        expiry_counters.incl_expected_reads();
3293        cache.run_pending_tasks().await;
3294
3295        // The key is not present.
3296        cache.invalidate("a").await;
3297        cache.get_with("a", async { "amanda" }).await;
3298        expiry_counters.incl_expected_creations();
3299        cache.run_pending_tasks().await;
3300
3301        expiry_counters.verify();
3302    }
3303
3304    // https://github.com/moka-rs/moka/issues/345
3305    #[tokio::test]
3306    async fn test_race_between_updating_entry_and_processing_its_write_ops() {
3307        let (clock, mock) = Clock::mock();
3308        let cache = Cache::builder()
3309            .max_capacity(2)
3310            .time_to_idle(Duration::from_secs(1))
3311            .clock(clock)
3312            .build();
3313
3314        cache.insert("a", "alice").await;
3315        cache.insert("b", "bob").await;
3316        cache.insert("c", "cathy").await; // c1
3317        mock.increment(Duration::from_secs(2));
3318
3319        // The following `insert` will do the followings:
3320        // 1. Replaces current "c" (c1) in the concurrent hash table (cht).
3321        // 2. Runs the pending tasks implicitly.
3322        //    (1) "a" will be admitted.
3323        //    (2) "b" will be admitted.
3324        //    (3) c1 will be evicted by size constraint.
3325        //    (4) "a" will be evicted due to expiration.
3326        //    (5) "b" will be evicted due to expiration.
3327        // 3. Send its `WriteOp` log to the channel.
3328        cache.insert("c", "cindy").await; // c2
3329
3330        // Remove "c" (c2) from the cht.
3331        assert_eq!(cache.remove(&"c").await, Some("cindy")); // c-remove
3332
3333        mock.increment(Duration::from_secs(2));
3334
3335        // The following `run_pending_tasks` will do the followings:
3336        // 1. Admits "c" (c2) to the cache. (Create a node in the LRU deque)
3337        // 2. Because of c-remove, removes c2's node from the LRU deque.
3338        cache.run_pending_tasks().await;
3339        assert_eq!(cache.entry_count(), 0);
3340    }
3341
3342    #[tokio::test]
3343    async fn test_race_between_recreating_entry_and_processing_its_write_ops() {
3344        let cache = Cache::builder().max_capacity(2).build();
3345
3346        cache.insert('a', "a").await;
3347        cache.insert('b', "b").await;
3348        cache.run_pending_tasks().await;
3349
3350        cache.insert('c', "c1").await; // (a) `EntryInfo` 1, gen: 1
3351        assert!(cache.remove(&'a').await.is_some()); // (b)
3352        assert!(cache.remove(&'b').await.is_some()); // (c)
3353        assert!(cache.remove(&'c').await.is_some()); // (d) `EntryInfo` 1, gen: 2
3354        cache.insert('c', "c2").await; // (e) `EntryInfo` 2, gen: 1
3355
3356        // Now the `write_op_ch` channel contains the following `WriteOp`s:
3357        //
3358        // - 0: (a) insert "c1" (`EntryInfo` 1, gen: 1)
3359        // - 1: (b) remove "a"
3360        // - 2: (c) remove "b"
3361        // - 3: (d) remove "c1" (`EntryInfo` 1, gen: 2)
3362        // - 4: (e) insert "c2" (`EntryInfo` 2, gen: 1)
3363        //
3364        // 0 for "c1" is going to be rejected because the cache is full. Let's ensure
3365        // processing 0 must not remove "c2" from the concurrent hash table. (Their
3366        // gen are the same, but `EntryInfo`s are different)
3367        cache.run_pending_tasks().await;
3368        assert_eq!(cache.get(&'c').await, Some("c2"));
3369    }
3370
3371    #[tokio::test]
3372    async fn test_iter() {
3373        const NUM_KEYS: usize = 50;
3374
3375        fn make_value(key: usize) -> String {
3376            format!("val: {key}")
3377        }
3378
3379        let cache = Cache::builder()
3380            .max_capacity(100)
3381            .time_to_idle(Duration::from_secs(10))
3382            .build();
3383
3384        for key in 0..NUM_KEYS {
3385            cache.insert(key, make_value(key)).await;
3386        }
3387
3388        let mut key_set = std::collections::HashSet::new();
3389
3390        for (key, value) in &cache {
3391            assert_eq!(value, make_value(*key));
3392
3393            key_set.insert(*key);
3394        }
3395
3396        // Ensure there are no missing or duplicate keys in the iteration.
3397        assert_eq!(key_set.len(), NUM_KEYS);
3398    }
3399
3400    /// Runs 16 async tasks at the same time and ensures no deadlock occurs.
3401    ///
3402    /// - Eight of the task will update key-values in the cache.
3403    /// - Eight others will iterate the cache.
3404    ///
3405    #[tokio::test]
3406    async fn test_iter_multi_async_tasks() {
3407        use std::collections::HashSet;
3408
3409        const NUM_KEYS: usize = 1024;
3410        const NUM_TASKS: usize = 16;
3411
3412        fn make_value(key: usize) -> String {
3413            format!("val: {key}")
3414        }
3415
3416        let cache = Cache::builder()
3417            .max_capacity(2048)
3418            .time_to_idle(Duration::from_secs(10))
3419            .build();
3420
3421        // Initialize the cache.
3422        for key in 0..NUM_KEYS {
3423            cache.insert(key, make_value(key)).await;
3424        }
3425
3426        let rw_lock = Arc::new(tokio::sync::RwLock::<()>::default());
3427        let write_lock = rw_lock.write().await;
3428
3429        let tasks = (0..NUM_TASKS)
3430            .map(|n| {
3431                let cache = cache.clone();
3432                let rw_lock = Arc::clone(&rw_lock);
3433
3434                if n % 2 == 0 {
3435                    // This thread will update the cache.
3436                    tokio::spawn(async move {
3437                        let read_lock = rw_lock.read().await;
3438                        for key in 0..NUM_KEYS {
3439                            // TODO: Update keys in a random order?
3440                            cache.insert(key, make_value(key)).await;
3441                        }
3442                        std::mem::drop(read_lock);
3443                    })
3444                } else {
3445                    // This thread will iterate the cache.
3446                    tokio::spawn(async move {
3447                        let read_lock = rw_lock.read().await;
3448                        let mut key_set = HashSet::new();
3449                        // let mut key_count = 0usize;
3450                        for (key, value) in &cache {
3451                            assert_eq!(value, make_value(*key));
3452                            key_set.insert(*key);
3453                            // key_count += 1;
3454                        }
3455                        // Ensure there are no missing or duplicate keys in the iteration.
3456                        assert_eq!(key_set.len(), NUM_KEYS);
3457                        std::mem::drop(read_lock);
3458                    })
3459                }
3460            })
3461            .collect::<Vec<_>>();
3462
3463        // Let these threads to run by releasing the write lock.
3464        std::mem::drop(write_lock);
3465
3466        let _ = futures_util::future::join_all(tasks).await;
3467
3468        // Ensure there are no missing or duplicate keys in the iteration.
3469        let key_set = cache.iter().map(|(k, _v)| *k).collect::<HashSet<_>>();
3470        assert_eq!(key_set.len(), NUM_KEYS);
3471    }
3472
3473    #[tokio::test]
3474    async fn get_with() {
3475        let cache = Cache::new(100);
3476        const KEY: u32 = 0;
3477
3478        // This test will run five async tasks:
3479        //
3480        // Task1 will be the first task to call `get_with` for a key, so its async
3481        // block will be evaluated and then a &str value "task1" will be inserted to
3482        // the cache.
3483        let task1 = {
3484            let cache1 = cache.clone();
3485            async move {
3486                // Call `get_with` immediately.
3487                let v = cache1
3488                    .get_with(KEY, async {
3489                        // Wait for 300 ms and return a &str value.
3490                        sleep(Duration::from_millis(300)).await;
3491                        "task1"
3492                    })
3493                    .await;
3494                assert_eq!(v, "task1");
3495            }
3496        };
3497
3498        // Task2 will be the second task to call `get_with` for the same key, so its
3499        // async block will not be evaluated. Once task1's async block finishes, it
3500        // will get the value inserted by task1's async block.
3501        let task2 = {
3502            let cache2 = cache.clone();
3503            async move {
3504                // Wait for 100 ms before calling `get_with`.
3505                sleep(Duration::from_millis(100)).await;
3506                let v = cache2.get_with(KEY, async { unreachable!() }).await;
3507                assert_eq!(v, "task1");
3508            }
3509        };
3510
3511        // Task3 will be the third task to call `get_with` for the same key. By the
3512        // time it calls, task1's async block should have finished already and the
3513        // value should be already inserted to the cache. So its async block will not
3514        // be evaluated and will get the value inserted by task1's async block
3515        // immediately.
3516        let task3 = {
3517            let cache3 = cache.clone();
3518            async move {
3519                // Wait for 400 ms before calling `get_with`.
3520                sleep(Duration::from_millis(400)).await;
3521                let v = cache3.get_with(KEY, async { unreachable!() }).await;
3522                assert_eq!(v, "task1");
3523            }
3524        };
3525
3526        // Task4 will call `get` for the same key. It will call when task1's async
3527        // block is still running, so it will get none for the key.
3528        let task4 = {
3529            let cache4 = cache.clone();
3530            async move {
3531                // Wait for 200 ms before calling `get`.
3532                sleep(Duration::from_millis(200)).await;
3533                let maybe_v = cache4.get(&KEY).await;
3534                assert!(maybe_v.is_none());
3535            }
3536        };
3537
3538        // Task5 will call `get` for the same key. It will call after task1's async
3539        // block finished, so it will get the value insert by task1's async block.
3540        let task5 = {
3541            let cache5 = cache.clone();
3542            async move {
3543                // Wait for 400 ms before calling `get`.
3544                sleep(Duration::from_millis(400)).await;
3545                let maybe_v = cache5.get(&KEY).await;
3546                assert_eq!(maybe_v, Some("task1"));
3547            }
3548        };
3549
3550        futures_util::join!(task1, task2, task3, task4, task5);
3551
3552        assert!(cache.is_waiter_map_empty());
3553    }
3554
3555    #[tokio::test]
3556    async fn get_with_by_ref() {
3557        let cache = Cache::new(100);
3558        const KEY: &u32 = &0;
3559
3560        // This test will run five async tasks:
3561        //
3562        // Task1 will be the first task to call `get_with_by_ref` for a key, so its async
3563        // block will be evaluated and then a &str value "task1" will be inserted to
3564        // the cache.
3565        let task1 = {
3566            let cache1 = cache.clone();
3567            async move {
3568                // Call `get_with_by_ref` immediately.
3569                let v = cache1
3570                    .get_with_by_ref(KEY, async {
3571                        // Wait for 300 ms and return a &str value.
3572                        sleep(Duration::from_millis(300)).await;
3573                        "task1"
3574                    })
3575                    .await;
3576                assert_eq!(v, "task1");
3577            }
3578        };
3579
3580        // Task2 will be the second task to call `get_with_by_ref` for the same key, so its
3581        // async block will not be evaluated. Once task1's async block finishes, it
3582        // will get the value inserted by task1's async block.
3583        let task2 = {
3584            let cache2 = cache.clone();
3585            async move {
3586                // Wait for 100 ms before calling `get_with_by_ref`.
3587                sleep(Duration::from_millis(100)).await;
3588                let v = cache2.get_with_by_ref(KEY, async { unreachable!() }).await;
3589                assert_eq!(v, "task1");
3590            }
3591        };
3592
3593        // Task3 will be the third task to call `get_with_by_ref` for the same key. By the
3594        // time it calls, task1's async block should have finished already and the
3595        // value should be already inserted to the cache. So its async block will not
3596        // be evaluated and will get the value inserted by task1's async block
3597        // immediately.
3598        let task3 = {
3599            let cache3 = cache.clone();
3600            async move {
3601                // Wait for 400 ms before calling `get_with_by_ref`.
3602                sleep(Duration::from_millis(400)).await;
3603                let v = cache3.get_with_by_ref(KEY, async { unreachable!() }).await;
3604                assert_eq!(v, "task1");
3605            }
3606        };
3607
3608        // Task4 will call `get` for the same key. It will call when task1's async
3609        // block is still running, so it will get none for the key.
3610        let task4 = {
3611            let cache4 = cache.clone();
3612            async move {
3613                // Wait for 200 ms before calling `get`.
3614                sleep(Duration::from_millis(200)).await;
3615                let maybe_v = cache4.get(KEY).await;
3616                assert!(maybe_v.is_none());
3617            }
3618        };
3619
3620        // Task5 will call `get` for the same key. It will call after task1's async
3621        // block finished, so it will get the value insert by task1's async block.
3622        let task5 = {
3623            let cache5 = cache.clone();
3624            async move {
3625                // Wait for 400 ms before calling `get`.
3626                sleep(Duration::from_millis(400)).await;
3627                let maybe_v = cache5.get(KEY).await;
3628                assert_eq!(maybe_v, Some("task1"));
3629            }
3630        };
3631
3632        futures_util::join!(task1, task2, task3, task4, task5);
3633
3634        assert!(cache.is_waiter_map_empty());
3635    }
3636
3637    #[tokio::test]
3638    async fn entry_or_insert_with_if() {
3639        let cache = Cache::new(100);
3640        const KEY: u32 = 0;
3641
3642        // This test will run seven async tasks:
3643        //
3644        // Task1 will be the first task to call `or_insert_with_if` for a key, so its
3645        // async block will be evaluated and then a &str value "task1" will be
3646        // inserted to the cache.
3647        let task1 = {
3648            let cache1 = cache.clone();
3649            async move {
3650                // Call `or_insert_with_if` immediately.
3651                let entry = cache1
3652                    .entry(KEY)
3653                    .or_insert_with_if(
3654                        async {
3655                            // Wait for 300 ms and return a &str value.
3656                            sleep(Duration::from_millis(300)).await;
3657                            "task1"
3658                        },
3659                        |_v| unreachable!(),
3660                    )
3661                    .await;
3662                // Entry should be fresh because our async block should have been
3663                // evaluated.
3664                assert!(entry.is_fresh());
3665                assert_eq!(entry.into_value(), "task1");
3666            }
3667        };
3668
3669        // Task2 will be the second task to call `or_insert_with_if` for the same
3670        // key, so its async block will not be evaluated. Once task1's async block
3671        // finishes, it will get the value inserted by task1's async block.
3672        let task2 = {
3673            let cache2 = cache.clone();
3674            async move {
3675                // Wait for 100 ms before calling `or_insert_with_if`.
3676                sleep(Duration::from_millis(100)).await;
3677                let entry = cache2
3678                    .entry(KEY)
3679                    .or_insert_with_if(async { unreachable!() }, |_v| unreachable!())
3680                    .await;
3681                // Entry should not be fresh because task1's async block should have
3682                // been evaluated instead of ours.
3683                assert!(!entry.is_fresh());
3684                assert_eq!(entry.into_value(), "task1");
3685            }
3686        };
3687
3688        // Task3 will be the third task to call `or_insert_with_if` for the same key.
3689        // By the time it calls, task1's async block should have finished already and
3690        // the value should be already inserted to the cache. Also task3's
3691        // `replace_if` closure returns `false`. So its async block will not be
3692        // evaluated and will get the value inserted by task1's async block
3693        // immediately.
3694        let task3 = {
3695            let cache3 = cache.clone();
3696            async move {
3697                // Wait for 350 ms before calling `or_insert_with_if`.
3698                sleep(Duration::from_millis(350)).await;
3699                let entry = cache3
3700                    .entry(KEY)
3701                    .or_insert_with_if(async { unreachable!() }, |v| {
3702                        assert_eq!(v, &"task1");
3703                        false
3704                    })
3705                    .await;
3706                assert!(!entry.is_fresh());
3707                assert_eq!(entry.into_value(), "task1");
3708            }
3709        };
3710
3711        // Task4 will be the fourth task to call `or_insert_with_if` for the same
3712        // key. The value should have been already inserted to the cache by task1.
3713        // However task4's `replace_if` closure returns `true`. So its async block
3714        // will be evaluated to replace the current value.
3715        let task4 = {
3716            let cache4 = cache.clone();
3717            async move {
3718                // Wait for 400 ms before calling `or_insert_with_if`.
3719                sleep(Duration::from_millis(400)).await;
3720                let entry = cache4
3721                    .entry(KEY)
3722                    .or_insert_with_if(async { "task4" }, |v| {
3723                        assert_eq!(v, &"task1");
3724                        true
3725                    })
3726                    .await;
3727                assert!(entry.is_fresh());
3728                assert_eq!(entry.into_value(), "task4");
3729            }
3730        };
3731
3732        // Task5 will call `get` for the same key. It will call when task1's async
3733        // block is still running, so it will get none for the key.
3734        let task5 = {
3735            let cache5 = cache.clone();
3736            async move {
3737                // Wait for 200 ms before calling `get`.
3738                sleep(Duration::from_millis(200)).await;
3739                let maybe_v = cache5.get(&KEY).await;
3740                assert!(maybe_v.is_none());
3741            }
3742        };
3743
3744        // Task6 will call `get` for the same key. It will call after task1's async
3745        // block finished, so it will get the value insert by task1's async block.
3746        let task6 = {
3747            let cache6 = cache.clone();
3748            async move {
3749                // Wait for 350 ms before calling `get`.
3750                sleep(Duration::from_millis(350)).await;
3751                let maybe_v = cache6.get(&KEY).await;
3752                assert_eq!(maybe_v, Some("task1"));
3753            }
3754        };
3755
3756        // Task7 will call `get` for the same key. It will call after task4's async
3757        // block finished, so it will get the value insert by task4's async block.
3758        let task7 = {
3759            let cache7 = cache.clone();
3760            async move {
3761                // Wait for 450 ms before calling `get`.
3762                sleep(Duration::from_millis(450)).await;
3763                let maybe_v = cache7.get(&KEY).await;
3764                assert_eq!(maybe_v, Some("task4"));
3765            }
3766        };
3767
3768        futures_util::join!(task1, task2, task3, task4, task5, task6, task7);
3769
3770        assert!(cache.is_waiter_map_empty());
3771    }
3772
3773    #[tokio::test]
3774    async fn entry_by_ref_or_insert_with_if() {
3775        let cache = Cache::new(100);
3776        const KEY: &u32 = &0;
3777
3778        // This test will run seven async tasks:
3779        //
3780        // Task1 will be the first task to call `or_insert_with_if` for a key, so its
3781        // async block will be evaluated and then a &str value "task1" will be
3782        // inserted to the cache.
3783        let task1 = {
3784            let cache1 = cache.clone();
3785            async move {
3786                // Call `or_insert_with_if` immediately.
3787                let entry = cache1
3788                    .entry_by_ref(KEY)
3789                    .or_insert_with_if(
3790                        async {
3791                            // Wait for 300 ms and return a &str value.
3792                            sleep(Duration::from_millis(300)).await;
3793                            "task1"
3794                        },
3795                        |_v| unreachable!(),
3796                    )
3797                    .await;
3798                // Entry should be fresh because our async block should have been
3799                // evaluated.
3800                assert!(entry.is_fresh());
3801                assert_eq!(entry.into_value(), "task1");
3802            }
3803        };
3804
3805        // Task2 will be the second task to call `or_insert_with_if` for the same
3806        // key, so its async block will not be evaluated. Once task1's async block
3807        // finishes, it will get the value inserted by task1's async block.
3808        let task2 = {
3809            let cache2 = cache.clone();
3810            async move {
3811                // Wait for 100 ms before calling `or_insert_with_if`.
3812                sleep(Duration::from_millis(100)).await;
3813                let entry = cache2
3814                    .entry_by_ref(KEY)
3815                    .or_insert_with_if(async { unreachable!() }, |_v| unreachable!())
3816                    .await;
3817                // Entry should not be fresh because task1's async block should have
3818                // been evaluated instead of ours.
3819                assert!(!entry.is_fresh());
3820                assert_eq!(entry.into_value(), "task1");
3821            }
3822        };
3823
3824        // Task3 will be the third task to call `or_insert_with_if` for the same key.
3825        // By the time it calls, task1's async block should have finished already and
3826        // the value should be already inserted to the cache. Also task3's
3827        // `replace_if` closure returns `false`. So its async block will not be
3828        // evaluated and will get the value inserted by task1's async block
3829        // immediately.
3830        let task3 = {
3831            let cache3 = cache.clone();
3832            async move {
3833                // Wait for 350 ms before calling `or_insert_with_if`.
3834                sleep(Duration::from_millis(350)).await;
3835                let entry = cache3
3836                    .entry_by_ref(KEY)
3837                    .or_insert_with_if(async { unreachable!() }, |v| {
3838                        assert_eq!(v, &"task1");
3839                        false
3840                    })
3841                    .await;
3842                assert!(!entry.is_fresh());
3843                assert_eq!(entry.into_value(), "task1");
3844            }
3845        };
3846
3847        // Task4 will be the fourth task to call `or_insert_with_if` for the same
3848        // key. The value should have been already inserted to the cache by task1.
3849        // However task4's `replace_if` closure returns `true`. So its async block
3850        // will be evaluated to replace the current value.
3851        let task4 = {
3852            let cache4 = cache.clone();
3853            async move {
3854                // Wait for 400 ms before calling `or_insert_with_if`.
3855                sleep(Duration::from_millis(400)).await;
3856                let entry = cache4
3857                    .entry_by_ref(KEY)
3858                    .or_insert_with_if(async { "task4" }, |v| {
3859                        assert_eq!(v, &"task1");
3860                        true
3861                    })
3862                    .await;
3863                assert!(entry.is_fresh());
3864                assert_eq!(entry.into_value(), "task4");
3865            }
3866        };
3867
3868        // Task5 will call `get` for the same key. It will call when task1's async
3869        // block is still running, so it will get none for the key.
3870        let task5 = {
3871            let cache5 = cache.clone();
3872            async move {
3873                // Wait for 200 ms before calling `get`.
3874                sleep(Duration::from_millis(200)).await;
3875                let maybe_v = cache5.get(KEY).await;
3876                assert!(maybe_v.is_none());
3877            }
3878        };
3879
3880        // Task6 will call `get` for the same key. It will call after task1's async
3881        // block finished, so it will get the value insert by task1's async block.
3882        let task6 = {
3883            let cache6 = cache.clone();
3884            async move {
3885                // Wait for 350 ms before calling `get`.
3886                sleep(Duration::from_millis(350)).await;
3887                let maybe_v = cache6.get(KEY).await;
3888                assert_eq!(maybe_v, Some("task1"));
3889            }
3890        };
3891
3892        // Task7 will call `get` for the same key. It will call after task4's async
3893        // block finished, so it will get the value insert by task4's async block.
3894        let task7 = {
3895            let cache7 = cache.clone();
3896            async move {
3897                // Wait for 450 ms before calling `get`.
3898                sleep(Duration::from_millis(450)).await;
3899                let maybe_v = cache7.get(KEY).await;
3900                assert_eq!(maybe_v, Some("task4"));
3901            }
3902        };
3903
3904        futures_util::join!(task1, task2, task3, task4, task5, task6, task7);
3905
3906        assert!(cache.is_waiter_map_empty());
3907    }
3908
3909    #[tokio::test]
3910    async fn try_get_with() {
3911        use std::sync::Arc;
3912
3913        // Note that MyError does not implement std::error::Error trait
3914        // like anyhow::Error.
3915        #[derive(Debug)]
3916        pub struct MyError(#[allow(dead_code)] String);
3917
3918        type MyResult<T> = Result<T, Arc<MyError>>;
3919
3920        let cache = Cache::new(100);
3921        const KEY: u32 = 0;
3922
3923        // This test will run eight async tasks:
3924        //
3925        // Task1 will be the first task to call `get_with` for a key, so its async
3926        // block will be evaluated and then an error will be returned. Nothing will
3927        // be inserted to the cache.
3928        let task1 = {
3929            let cache1 = cache.clone();
3930            async move {
3931                // Call `try_get_with` immediately.
3932                let v = cache1
3933                    .try_get_with(KEY, async {
3934                        // Wait for 300 ms and return an error.
3935                        sleep(Duration::from_millis(300)).await;
3936                        Err(MyError("task1 error".into()))
3937                    })
3938                    .await;
3939                assert!(v.is_err());
3940            }
3941        };
3942
3943        // Task2 will be the second task to call `get_with` for the same key, so its
3944        // async block will not be evaluated. Once task1's async block finishes, it
3945        // will get the same error value returned by task1's async block.
3946        let task2 = {
3947            let cache2 = cache.clone();
3948            async move {
3949                // Wait for 100 ms before calling `try_get_with`.
3950                sleep(Duration::from_millis(100)).await;
3951                let v: MyResult<_> = cache2.try_get_with(KEY, async { unreachable!() }).await;
3952                assert!(v.is_err());
3953            }
3954        };
3955
3956        // Task3 will be the third task to call `get_with` for the same key. By the
3957        // time it calls, task1's async block should have finished already, but the
3958        // key still does not exist in the cache. So its async block will be
3959        // evaluated and then an okay &str value will be returned. That value will be
3960        // inserted to the cache.
3961        let task3 = {
3962            let cache3 = cache.clone();
3963            async move {
3964                // Wait for 400 ms before calling `try_get_with`.
3965                sleep(Duration::from_millis(400)).await;
3966                let v: MyResult<_> = cache3
3967                    .try_get_with(KEY, async {
3968                        // Wait for 300 ms and return an Ok(&str) value.
3969                        sleep(Duration::from_millis(300)).await;
3970                        Ok("task3")
3971                    })
3972                    .await;
3973                assert_eq!(v.unwrap(), "task3");
3974            }
3975        };
3976
3977        // Task4 will be the fourth task to call `get_with` for the same key. So its
3978        // async block will not be evaluated. Once task3's async block finishes, it
3979        // will get the same okay &str value.
3980        let task4 = {
3981            let cache4 = cache.clone();
3982            async move {
3983                // Wait for 500 ms before calling `try_get_with`.
3984                sleep(Duration::from_millis(500)).await;
3985                let v: MyResult<_> = cache4.try_get_with(KEY, async { unreachable!() }).await;
3986                assert_eq!(v.unwrap(), "task3");
3987            }
3988        };
3989
3990        // Task5 will be the fifth task to call `get_with` for the same key. So its
3991        // async block will not be evaluated. By the time it calls, task3's async
3992        // block should have finished already, so its async block will not be
3993        // evaluated and will get the value insert by task3's async block
3994        // immediately.
3995        let task5 = {
3996            let cache5 = cache.clone();
3997            async move {
3998                // Wait for 800 ms before calling `try_get_with`.
3999                sleep(Duration::from_millis(800)).await;
4000                let v: MyResult<_> = cache5.try_get_with(KEY, async { unreachable!() }).await;
4001                assert_eq!(v.unwrap(), "task3");
4002            }
4003        };
4004
4005        // Task6 will call `get` for the same key. It will call when task1's async
4006        // block is still running, so it will get none for the key.
4007        let task6 = {
4008            let cache6 = cache.clone();
4009            async move {
4010                // Wait for 200 ms before calling `get`.
4011                sleep(Duration::from_millis(200)).await;
4012                let maybe_v = cache6.get(&KEY).await;
4013                assert!(maybe_v.is_none());
4014            }
4015        };
4016
4017        // Task7 will call `get` for the same key. It will call after task1's async
4018        // block finished with an error. So it will get none for the key.
4019        let task7 = {
4020            let cache7 = cache.clone();
4021            async move {
4022                // Wait for 400 ms before calling `get`.
4023                sleep(Duration::from_millis(400)).await;
4024                let maybe_v = cache7.get(&KEY).await;
4025                assert!(maybe_v.is_none());
4026            }
4027        };
4028
4029        // Task8 will call `get` for the same key. It will call after task3's async
4030        // block finished, so it will get the value insert by task3's async block.
4031        let task8 = {
4032            let cache8 = cache.clone();
4033            async move {
4034                // Wait for 800 ms before calling `get`.
4035                sleep(Duration::from_millis(800)).await;
4036                let maybe_v = cache8.get(&KEY).await;
4037                assert_eq!(maybe_v, Some("task3"));
4038            }
4039        };
4040
4041        futures_util::join!(task1, task2, task3, task4, task5, task6, task7, task8);
4042
4043        assert!(cache.is_waiter_map_empty());
4044    }
4045
4046    #[tokio::test]
4047    async fn try_get_with_by_ref() {
4048        use std::sync::Arc;
4049
4050        // Note that MyError does not implement std::error::Error trait
4051        // like anyhow::Error.
4052        #[derive(Debug)]
4053        pub struct MyError(#[allow(dead_code)] String);
4054
4055        type MyResult<T> = Result<T, Arc<MyError>>;
4056
4057        let cache = Cache::new(100);
4058        const KEY: &u32 = &0;
4059
4060        // This test will run eight async tasks:
4061        //
4062        // Task1 will be the first task to call `try_get_with_by_ref` for a key, so
4063        // its async block will be evaluated and then an error will be returned.
4064        // Nothing will be inserted to the cache.
4065        let task1 = {
4066            let cache1 = cache.clone();
4067            async move {
4068                // Call `try_get_with_by_ref` immediately.
4069                let v = cache1
4070                    .try_get_with_by_ref(KEY, async {
4071                        // Wait for 300 ms and return an error.
4072                        sleep(Duration::from_millis(300)).await;
4073                        Err(MyError("task1 error".into()))
4074                    })
4075                    .await;
4076                assert!(v.is_err());
4077            }
4078        };
4079
4080        // Task2 will be the second task to call `get_with` for the same key, so its
4081        // async block will not be evaluated. Once task1's async block finishes, it
4082        // will get the same error value returned by task1's async block.
4083        let task2 = {
4084            let cache2 = cache.clone();
4085            async move {
4086                // Wait for 100 ms before calling `try_get_with_by_ref`.
4087                sleep(Duration::from_millis(100)).await;
4088                let v: MyResult<_> = cache2
4089                    .try_get_with_by_ref(KEY, async { unreachable!() })
4090                    .await;
4091                assert!(v.is_err());
4092            }
4093        };
4094
4095        // Task3 will be the third task to call `get_with` for the same key. By the
4096        // time it calls, task1's async block should have finished already, but the
4097        // key still does not exist in the cache. So its async block will be
4098        // evaluated and then an okay &str value will be returned. That value will be
4099        // inserted to the cache.
4100        let task3 = {
4101            let cache3 = cache.clone();
4102            async move {
4103                // Wait for 400 ms before calling `try_get_with_by_ref`.
4104                sleep(Duration::from_millis(400)).await;
4105                let v: MyResult<_> = cache3
4106                    .try_get_with_by_ref(KEY, async {
4107                        // Wait for 300 ms and return an Ok(&str) value.
4108                        sleep(Duration::from_millis(300)).await;
4109                        Ok("task3")
4110                    })
4111                    .await;
4112                assert_eq!(v.unwrap(), "task3");
4113            }
4114        };
4115
4116        // Task4 will be the fourth task to call `get_with` for the same key. So its
4117        // async block will not be evaluated. Once task3's async block finishes, it
4118        // will get the same okay &str value.
4119        let task4 = {
4120            let cache4 = cache.clone();
4121            async move {
4122                // Wait for 500 ms before calling `try_get_with_by_ref`.
4123                sleep(Duration::from_millis(500)).await;
4124                let v: MyResult<_> = cache4
4125                    .try_get_with_by_ref(KEY, async { unreachable!() })
4126                    .await;
4127                assert_eq!(v.unwrap(), "task3");
4128            }
4129        };
4130
4131        // Task5 will be the fifth task to call `get_with` for the same key. So its
4132        // async block will not be evaluated. By the time it calls, task3's async
4133        // block should have finished already, so its async block will not be
4134        // evaluated and will get the value insert by task3's async block
4135        // immediately.
4136        let task5 = {
4137            let cache5 = cache.clone();
4138            async move {
4139                // Wait for 800 ms before calling `try_get_with_by_ref`.
4140                sleep(Duration::from_millis(800)).await;
4141                let v: MyResult<_> = cache5
4142                    .try_get_with_by_ref(KEY, async { unreachable!() })
4143                    .await;
4144                assert_eq!(v.unwrap(), "task3");
4145            }
4146        };
4147
4148        // Task6 will call `get` for the same key. It will call when task1's async
4149        // block is still running, so it will get none for the key.
4150        let task6 = {
4151            let cache6 = cache.clone();
4152            async move {
4153                // Wait for 200 ms before calling `get`.
4154                sleep(Duration::from_millis(200)).await;
4155                let maybe_v = cache6.get(KEY).await;
4156                assert!(maybe_v.is_none());
4157            }
4158        };
4159
4160        // Task7 will call `get` for the same key. It will call after task1's async
4161        // block finished with an error. So it will get none for the key.
4162        let task7 = {
4163            let cache7 = cache.clone();
4164            async move {
4165                // Wait for 400 ms before calling `get`.
4166                sleep(Duration::from_millis(400)).await;
4167                let maybe_v = cache7.get(KEY).await;
4168                assert!(maybe_v.is_none());
4169            }
4170        };
4171
4172        // Task8 will call `get` for the same key. It will call after task3's async
4173        // block finished, so it will get the value insert by task3's async block.
4174        let task8 = {
4175            let cache8 = cache.clone();
4176            async move {
4177                // Wait for 800 ms before calling `get`.
4178                sleep(Duration::from_millis(800)).await;
4179                let maybe_v = cache8.get(KEY).await;
4180                assert_eq!(maybe_v, Some("task3"));
4181            }
4182        };
4183
4184        futures_util::join!(task1, task2, task3, task4, task5, task6, task7, task8);
4185
4186        assert!(cache.is_waiter_map_empty());
4187    }
4188
4189    #[tokio::test]
4190    async fn optionally_get_with() {
4191        let cache = Cache::new(100);
4192        const KEY: u32 = 0;
4193
4194        // This test will run eight async tasks:
4195        //
4196        // Task1 will be the first task to call `optionally_get_with` for a key,
4197        // so its async block will be evaluated and then an None will be
4198        // returned. Nothing will be inserted to the cache.
4199        let task1 = {
4200            let cache1 = cache.clone();
4201            async move {
4202                // Call `try_get_with` immediately.
4203                let v = cache1
4204                    .optionally_get_with(KEY, async {
4205                        // Wait for 300 ms and return an None.
4206                        sleep(Duration::from_millis(300)).await;
4207                        None
4208                    })
4209                    .await;
4210                assert!(v.is_none());
4211            }
4212        };
4213
4214        // Task2 will be the second task to call `optionally_get_with` for the same
4215        // key, so its async block will not be evaluated. Once task1's async block
4216        // finishes, it will get the same error value returned by task1's async
4217        // block.
4218        let task2 = {
4219            let cache2 = cache.clone();
4220            async move {
4221                // Wait for 100 ms before calling `optionally_get_with`.
4222                sleep(Duration::from_millis(100)).await;
4223                let v = cache2
4224                    .optionally_get_with(KEY, async { unreachable!() })
4225                    .await;
4226                assert!(v.is_none());
4227            }
4228        };
4229
4230        // Task3 will be the third task to call `optionally_get_with` for the
4231        // same key. By the time it calls, task1's async block should have
4232        // finished already, but the key still does not exist in the cache. So
4233        // its async block will be evaluated and then an okay &str value will be
4234        // returned. That value will be inserted to the cache.
4235        let task3 = {
4236            let cache3 = cache.clone();
4237            async move {
4238                // Wait for 400 ms before calling `optionally_get_with`.
4239                sleep(Duration::from_millis(400)).await;
4240                let v = cache3
4241                    .optionally_get_with(KEY, async {
4242                        // Wait for 300 ms and return an Some(&str) value.
4243                        sleep(Duration::from_millis(300)).await;
4244                        Some("task3")
4245                    })
4246                    .await;
4247                assert_eq!(v.unwrap(), "task3");
4248            }
4249        };
4250
4251        // Task4 will be the fourth task to call `optionally_get_with` for the
4252        // same key. So its async block will not be evaluated. Once task3's
4253        // async block finishes, it will get the same okay &str value.
4254        let task4 = {
4255            let cache4 = cache.clone();
4256            async move {
4257                // Wait for 500 ms before calling `try_get_with`.
4258                sleep(Duration::from_millis(500)).await;
4259                let v = cache4
4260                    .optionally_get_with(KEY, async { unreachable!() })
4261                    .await;
4262                assert_eq!(v.unwrap(), "task3");
4263            }
4264        };
4265
4266        // Task5 will be the fifth task to call `optionally_get_with` for the
4267        // same key. So its async block will not be evaluated. By the time it
4268        // calls, task3's async block should have finished already, so its async
4269        // block will not be evaluated and will get the value insert by task3's
4270        // async block immediately.
4271        let task5 = {
4272            let cache5 = cache.clone();
4273            async move {
4274                // Wait for 800 ms before calling `optionally_get_with`.
4275                sleep(Duration::from_millis(800)).await;
4276                let v = cache5
4277                    .optionally_get_with(KEY, async { unreachable!() })
4278                    .await;
4279                assert_eq!(v.unwrap(), "task3");
4280            }
4281        };
4282
4283        // Task6 will call `get` for the same key. It will call when task1's async
4284        // block is still running, so it will get none for the key.
4285        let task6 = {
4286            let cache6 = cache.clone();
4287            async move {
4288                // Wait for 200 ms before calling `get`.
4289                sleep(Duration::from_millis(200)).await;
4290                let maybe_v = cache6.get(&KEY).await;
4291                assert!(maybe_v.is_none());
4292            }
4293        };
4294
4295        // Task7 will call `get` for the same key. It will call after task1's async
4296        // block finished with an error. So it will get none for the key.
4297        let task7 = {
4298            let cache7 = cache.clone();
4299            async move {
4300                // Wait for 400 ms before calling `get`.
4301                sleep(Duration::from_millis(400)).await;
4302                let maybe_v = cache7.get(&KEY).await;
4303                assert!(maybe_v.is_none());
4304            }
4305        };
4306
4307        // Task8 will call `get` for the same key. It will call after task3's async
4308        // block finished, so it will get the value insert by task3's async block.
4309        let task8 = {
4310            let cache8 = cache.clone();
4311            async move {
4312                // Wait for 800 ms before calling `get`.
4313                sleep(Duration::from_millis(800)).await;
4314                let maybe_v = cache8.get(&KEY).await;
4315                assert_eq!(maybe_v, Some("task3"));
4316            }
4317        };
4318
4319        futures_util::join!(task1, task2, task3, task4, task5, task6, task7, task8);
4320    }
4321
4322    #[tokio::test]
4323    async fn optionally_get_with_by_ref() {
4324        let cache = Cache::new(100);
4325        const KEY: &u32 = &0;
4326
4327        // This test will run eight async tasks:
4328        //
4329        // Task1 will be the first task to call `optionally_get_with_by_ref` for a
4330        // key, so its async block will be evaluated and then an None will be
4331        // returned. Nothing will be inserted to the cache.
4332        let task1 = {
4333            let cache1 = cache.clone();
4334            async move {
4335                // Call `try_get_with` immediately.
4336                let v = cache1
4337                    .optionally_get_with_by_ref(KEY, async {
4338                        // Wait for 300 ms and return an None.
4339                        sleep(Duration::from_millis(300)).await;
4340                        None
4341                    })
4342                    .await;
4343                assert!(v.is_none());
4344            }
4345        };
4346
4347        // Task2 will be the second task to call `optionally_get_with_by_ref` for the
4348        // same key, so its async block will not be evaluated. Once task1's async
4349        // block finishes, it will get the same error value returned by task1's async
4350        // block.
4351        let task2 = {
4352            let cache2 = cache.clone();
4353            async move {
4354                // Wait for 100 ms before calling `optionally_get_with_by_ref`.
4355                sleep(Duration::from_millis(100)).await;
4356                let v = cache2
4357                    .optionally_get_with_by_ref(KEY, async { unreachable!() })
4358                    .await;
4359                assert!(v.is_none());
4360            }
4361        };
4362
4363        // Task3 will be the third task to call `optionally_get_with_by_ref` for the
4364        // same key. By the time it calls, task1's async block should have
4365        // finished already, but the key still does not exist in the cache. So
4366        // its async block will be evaluated and then an okay &str value will be
4367        // returned. That value will be inserted to the cache.
4368        let task3 = {
4369            let cache3 = cache.clone();
4370            async move {
4371                // Wait for 400 ms before calling `optionally_get_with_by_ref`.
4372                sleep(Duration::from_millis(400)).await;
4373                let v = cache3
4374                    .optionally_get_with_by_ref(KEY, async {
4375                        // Wait for 300 ms and return an Some(&str) value.
4376                        sleep(Duration::from_millis(300)).await;
4377                        Some("task3")
4378                    })
4379                    .await;
4380                assert_eq!(v.unwrap(), "task3");
4381            }
4382        };
4383
4384        // Task4 will be the fourth task to call `optionally_get_with_by_ref` for the
4385        // same key. So its async block will not be evaluated. Once task3's
4386        // async block finishes, it will get the same okay &str value.
4387        let task4 = {
4388            let cache4 = cache.clone();
4389            async move {
4390                // Wait for 500 ms before calling `try_get_with`.
4391                sleep(Duration::from_millis(500)).await;
4392                let v = cache4
4393                    .optionally_get_with_by_ref(KEY, async { unreachable!() })
4394                    .await;
4395                assert_eq!(v.unwrap(), "task3");
4396            }
4397        };
4398
4399        // Task5 will be the fifth task to call `optionally_get_with_by_ref` for the
4400        // same key. So its async block will not be evaluated. By the time it
4401        // calls, task3's async block should have finished already, so its async
4402        // block will not be evaluated and will get the value insert by task3's
4403        // async block immediately.
4404        let task5 = {
4405            let cache5 = cache.clone();
4406            async move {
4407                // Wait for 800 ms before calling `optionally_get_with_by_ref`.
4408                sleep(Duration::from_millis(800)).await;
4409                let v = cache5
4410                    .optionally_get_with_by_ref(KEY, async { unreachable!() })
4411                    .await;
4412                assert_eq!(v.unwrap(), "task3");
4413            }
4414        };
4415
4416        // Task6 will call `get` for the same key. It will call when task1's async
4417        // block is still running, so it will get none for the key.
4418        let task6 = {
4419            let cache6 = cache.clone();
4420            async move {
4421                // Wait for 200 ms before calling `get`.
4422                sleep(Duration::from_millis(200)).await;
4423                let maybe_v = cache6.get(KEY).await;
4424                assert!(maybe_v.is_none());
4425            }
4426        };
4427
4428        // Task7 will call `get` for the same key. It will call after task1's async
4429        // block finished with an error. So it will get none for the key.
4430        let task7 = {
4431            let cache7 = cache.clone();
4432            async move {
4433                // Wait for 400 ms before calling `get`.
4434                sleep(Duration::from_millis(400)).await;
4435                let maybe_v = cache7.get(KEY).await;
4436                assert!(maybe_v.is_none());
4437            }
4438        };
4439
4440        // Task8 will call `get` for the same key. It will call after task3's async
4441        // block finished, so it will get the value insert by task3's async block.
4442        let task8 = {
4443            let cache8 = cache.clone();
4444            async move {
4445                // Wait for 800 ms before calling `get`.
4446                sleep(Duration::from_millis(800)).await;
4447                let maybe_v = cache8.get(KEY).await;
4448                assert_eq!(maybe_v, Some("task3"));
4449            }
4450        };
4451
4452        futures_util::join!(task1, task2, task3, task4, task5, task6, task7, task8);
4453
4454        assert!(cache.is_waiter_map_empty());
4455    }
4456
4457    #[tokio::test]
4458    async fn upsert_with() {
4459        let cache = Cache::new(100);
4460        const KEY: u32 = 0;
4461
4462        // Spawn three async tasks to call `and_upsert_with` for the same key and
4463        // each task increments the current value by 1. Ensure the key-level lock is
4464        // working by verifying the value is 3 after all tasks finish.
4465        //
4466        // |        |  task 1  |  task 2  |  task 3  |
4467        // |--------|----------|----------|----------|
4468        // |   0 ms | get none |          |          |
4469        // | 100 ms |          | blocked  |          |
4470        // | 200 ms | insert 1 |          |          |
4471        // |        |          | get 1    |          |
4472        // | 300 ms |          |          | blocked  |
4473        // | 400 ms |          | insert 2 |          |
4474        // |        |          |          | get 2    |
4475        // | 500 ms |          |          | insert 3 |
4476
4477        let task1 = {
4478            let cache1 = cache.clone();
4479            async move {
4480                cache1
4481                    .entry(KEY)
4482                    .and_upsert_with(|maybe_entry| async move {
4483                        sleep(Duration::from_millis(200)).await;
4484                        assert!(maybe_entry.is_none());
4485                        1
4486                    })
4487                    .await
4488            }
4489        };
4490
4491        let task2 = {
4492            let cache2 = cache.clone();
4493            async move {
4494                sleep(Duration::from_millis(100)).await;
4495                cache2
4496                    .entry_by_ref(&KEY)
4497                    .and_upsert_with(|maybe_entry| async move {
4498                        sleep(Duration::from_millis(200)).await;
4499                        let entry = maybe_entry.expect("The entry should exist");
4500                        entry.into_value() + 1
4501                    })
4502                    .await
4503            }
4504        };
4505
4506        let task3 = {
4507            let cache3 = cache.clone();
4508            async move {
4509                sleep(Duration::from_millis(300)).await;
4510                cache3
4511                    .entry_by_ref(&KEY)
4512                    .and_upsert_with(|maybe_entry| async move {
4513                        sleep(Duration::from_millis(100)).await;
4514                        let entry = maybe_entry.expect("The entry should exist");
4515                        entry.into_value() + 1
4516                    })
4517                    .await
4518            }
4519        };
4520
4521        let (ent1, ent2, ent3) = futures_util::join!(task1, task2, task3);
4522        assert_eq!(ent1.into_value(), 1);
4523        assert_eq!(ent2.into_value(), 2);
4524        assert_eq!(ent3.into_value(), 3);
4525
4526        assert_eq!(cache.get(&KEY).await, Some(3));
4527
4528        assert!(cache.is_waiter_map_empty());
4529    }
4530
4531    #[tokio::test]
4532    async fn compute_with() {
4533        use crate::ops::compute;
4534        use tokio::sync::RwLock;
4535
4536        let cache = Cache::new(100);
4537        const KEY: u32 = 0;
4538
4539        // Spawn six async tasks to call `and_compute_with` for the same key. Ensure
4540        // the key-level lock is working by verifying the value after all tasks
4541        // finish.
4542        //
4543        // |         |   task 1   |    task 2     |   task 3   |  task 4  |   task 5   | task 6  |
4544        // |---------|------------|---------------|------------|----------|------------|---------|
4545        // |    0 ms | get none   |               |            |          |            |         |
4546        // |  100 ms |            | blocked       |            |          |            |         |
4547        // |  200 ms | insert [1] |               |            |          |            |         |
4548        // |         |            | get [1]       |            |          |            |         |
4549        // |  300 ms |            |               | blocked    |          |            |         |
4550        // |  400 ms |            | insert [1, 2] |            |          |            |         |
4551        // |         |            |               | get [1, 2] |          |            |         |
4552        // |  500 ms |            |               |            | blocked  |            |         |
4553        // |  600 ms |            |               | remove     |          |            |         |
4554        // |         |            |               |            | get none |            |         |
4555        // |  700 ms |            |               |            |          | blocked    |         |
4556        // |  800 ms |            |               |            | nop      |            |         |
4557        // |         |            |               |            |          | get none   |         |
4558        // |  900 ms |            |               |            |          |            | blocked |
4559        // | 1000 ms |            |               |            |          | insert [5] |         |
4560        // |         |            |               |            |          |            | get [5] |
4561        // | 1100 ms |            |               |            |          |            | nop     |
4562
4563        let task1 = {
4564            let cache1 = cache.clone();
4565            async move {
4566                cache1
4567                    .entry(KEY)
4568                    .and_compute_with(|maybe_entry| async move {
4569                        sleep(Duration::from_millis(200)).await;
4570                        assert!(maybe_entry.is_none());
4571                        compute::Op::Put(Arc::new(RwLock::new(vec![1])))
4572                    })
4573                    .await
4574            }
4575        };
4576
4577        let task2 = {
4578            let cache2 = cache.clone();
4579            async move {
4580                sleep(Duration::from_millis(100)).await;
4581                cache2
4582                    .entry_by_ref(&KEY)
4583                    .and_compute_with(|maybe_entry| async move {
4584                        let entry = maybe_entry.expect("The entry should exist");
4585                        let value = entry.into_value();
4586                        assert_eq!(*value.read().await, vec![1]);
4587                        sleep(Duration::from_millis(200)).await;
4588                        value.write().await.push(2);
4589                        compute::Op::Put(value)
4590                    })
4591                    .await
4592            }
4593        };
4594
4595        let task3 = {
4596            let cache3 = cache.clone();
4597            async move {
4598                sleep(Duration::from_millis(300)).await;
4599                cache3
4600                    .entry(KEY)
4601                    .and_compute_with(|maybe_entry| async move {
4602                        let entry = maybe_entry.expect("The entry should exist");
4603                        let value = entry.into_value();
4604                        assert_eq!(*value.read().await, vec![1, 2]);
4605                        sleep(Duration::from_millis(200)).await;
4606                        compute::Op::Remove
4607                    })
4608                    .await
4609            }
4610        };
4611
4612        let task4 = {
4613            let cache4 = cache.clone();
4614            async move {
4615                sleep(Duration::from_millis(500)).await;
4616                cache4
4617                    .entry(KEY)
4618                    .and_compute_with(|maybe_entry| async move {
4619                        assert!(maybe_entry.is_none());
4620                        sleep(Duration::from_millis(200)).await;
4621                        compute::Op::Nop
4622                    })
4623                    .await
4624            }
4625        };
4626
4627        let task5 = {
4628            let cache5 = cache.clone();
4629            async move {
4630                sleep(Duration::from_millis(700)).await;
4631                cache5
4632                    .entry_by_ref(&KEY)
4633                    .and_compute_with(|maybe_entry| async move {
4634                        assert!(maybe_entry.is_none());
4635                        sleep(Duration::from_millis(200)).await;
4636                        compute::Op::Put(Arc::new(RwLock::new(vec![5])))
4637                    })
4638                    .await
4639            }
4640        };
4641
4642        let task6 = {
4643            let cache6 = cache.clone();
4644            async move {
4645                sleep(Duration::from_millis(900)).await;
4646                cache6
4647                    .entry_by_ref(&KEY)
4648                    .and_compute_with(|maybe_entry| async move {
4649                        let entry = maybe_entry.expect("The entry should exist");
4650                        let value = entry.into_value();
4651                        assert_eq!(*value.read().await, vec![5]);
4652                        sleep(Duration::from_millis(100)).await;
4653                        compute::Op::Nop
4654                    })
4655                    .await
4656            }
4657        };
4658
4659        let (res1, res2, res3, res4, res5, res6) =
4660            futures_util::join!(task1, task2, task3, task4, task5, task6);
4661
4662        let compute::CompResult::Inserted(entry) = res1 else {
4663            panic!("Expected `Inserted`. Got {res1:?}")
4664        };
4665        assert_eq!(
4666            *entry.into_value().read().await,
4667            vec![1, 2] // The same Vec was modified by task2.
4668        );
4669
4670        let compute::CompResult::ReplacedWith(entry) = res2 else {
4671            panic!("Expected `ReplacedWith`. Got {res2:?}")
4672        };
4673        assert_eq!(*entry.into_value().read().await, vec![1, 2]);
4674
4675        let compute::CompResult::Removed(entry) = res3 else {
4676            panic!("Expected `Removed`. Got {res3:?}")
4677        };
4678        assert_eq!(*entry.into_value().read().await, vec![1, 2]);
4679
4680        let compute::CompResult::StillNone(key) = res4 else {
4681            panic!("Expected `StillNone`. Got {res4:?}")
4682        };
4683        assert_eq!(*key, KEY);
4684
4685        let compute::CompResult::Inserted(entry) = res5 else {
4686            panic!("Expected `Inserted`. Got {res5:?}")
4687        };
4688        assert_eq!(*entry.into_value().read().await, vec![5]);
4689
4690        let compute::CompResult::Unchanged(entry) = res6 else {
4691            panic!("Expected `Unchanged`. Got {res6:?}")
4692        };
4693        assert_eq!(*entry.into_value().read().await, vec![5]);
4694
4695        assert!(cache.is_waiter_map_empty());
4696    }
4697
4698    #[tokio::test]
4699    async fn try_compute_with() {
4700        use crate::ops::compute;
4701        use tokio::sync::RwLock;
4702
4703        let cache: Cache<u32, Arc<RwLock<Vec<i32>>>> = Cache::new(100);
4704        const KEY: u32 = 0;
4705
4706        // Spawn four async tasks to call `and_try_compute_with` for the same key.
4707        // Ensure the key-level lock is working by verifying the value after all
4708        // tasks finish.
4709        //
4710        // |         |   task 1   |    task 2     |   task 3   |  task 4    |
4711        // |---------|------------|---------------|------------|------------|
4712        // |    0 ms | get none   |               |            |            |
4713        // |  100 ms |            | blocked       |            |            |
4714        // |  200 ms | insert [1] |               |            |            |
4715        // |         |            | get [1]       |            |            |
4716        // |  300 ms |            |               | blocked    |            |
4717        // |  400 ms |            | insert [1, 2] |            |            |
4718        // |         |            |               | get [1, 2] |            |
4719        // |  500 ms |            |               |            | blocked    |
4720        // |  600 ms |            |               | err        |            |
4721        // |         |            |               |            | get [1, 2] |
4722        // |  700 ms |            |               |            | remove     |
4723        //
4724        // This test is shorter than `compute_with` test because this one omits `Nop`
4725        // cases.
4726
4727        let task1 = {
4728            let cache1 = cache.clone();
4729            async move {
4730                cache1
4731                    .entry(KEY)
4732                    .and_try_compute_with(|maybe_entry| async move {
4733                        sleep(Duration::from_millis(200)).await;
4734                        assert!(maybe_entry.is_none());
4735                        Ok(compute::Op::Put(Arc::new(RwLock::new(vec![1])))) as Result<_, ()>
4736                    })
4737                    .await
4738            }
4739        };
4740
4741        let task2 = {
4742            let cache2 = cache.clone();
4743            async move {
4744                sleep(Duration::from_millis(100)).await;
4745                cache2
4746                    .entry_by_ref(&KEY)
4747                    .and_try_compute_with(|maybe_entry| async move {
4748                        let entry = maybe_entry.expect("The entry should exist");
4749                        let value = entry.into_value();
4750                        assert_eq!(*value.read().await, vec![1]);
4751                        sleep(Duration::from_millis(200)).await;
4752                        value.write().await.push(2);
4753                        Ok(compute::Op::Put(value)) as Result<_, ()>
4754                    })
4755                    .await
4756            }
4757        };
4758
4759        let task3 = {
4760            let cache3 = cache.clone();
4761            async move {
4762                sleep(Duration::from_millis(300)).await;
4763                cache3
4764                    .entry(KEY)
4765                    .and_try_compute_with(|maybe_entry| async move {
4766                        let entry = maybe_entry.expect("The entry should exist");
4767                        let value = entry.into_value();
4768                        assert_eq!(*value.read().await, vec![1, 2]);
4769                        sleep(Duration::from_millis(200)).await;
4770                        Err(())
4771                    })
4772                    .await
4773            }
4774        };
4775
4776        let task4 = {
4777            let cache4 = cache.clone();
4778            async move {
4779                sleep(Duration::from_millis(500)).await;
4780                cache4
4781                    .entry(KEY)
4782                    .and_try_compute_with(|maybe_entry| async move {
4783                        let entry = maybe_entry.expect("The entry should exist");
4784                        let value = entry.into_value();
4785                        assert_eq!(*value.read().await, vec![1, 2]);
4786                        sleep(Duration::from_millis(100)).await;
4787                        Ok(compute::Op::Remove) as Result<_, ()>
4788                    })
4789                    .await
4790            }
4791        };
4792
4793        let (res1, res2, res3, res4) = futures_util::join!(task1, task2, task3, task4);
4794
4795        let Ok(compute::CompResult::Inserted(entry)) = res1 else {
4796            panic!("Expected `Inserted`. Got {res1:?}")
4797        };
4798        assert_eq!(
4799            *entry.into_value().read().await,
4800            vec![1, 2] // The same Vec was modified by task2.
4801        );
4802
4803        let Ok(compute::CompResult::ReplacedWith(entry)) = res2 else {
4804            panic!("Expected `ReplacedWith`. Got {res2:?}")
4805        };
4806        assert_eq!(*entry.into_value().read().await, vec![1, 2]);
4807
4808        assert!(res3.is_err());
4809
4810        let Ok(compute::CompResult::Removed(entry)) = res4 else {
4811            panic!("Expected `Removed`. Got {res4:?}")
4812        };
4813        assert_eq!(
4814            *entry.into_value().read().await,
4815            vec![1, 2] // Removed value.
4816        );
4817
4818        assert!(cache.is_waiter_map_empty());
4819    }
4820
4821    #[tokio::test]
4822    // https://github.com/moka-rs/moka/issues/43
4823    async fn handle_panic_in_get_with() {
4824        use tokio::time::{sleep, Duration};
4825
4826        let cache = Cache::new(16);
4827        let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
4828        {
4829            let cache_ref = cache.clone();
4830            let semaphore_ref = semaphore.clone();
4831            tokio::task::spawn(async move {
4832                let _ = cache_ref
4833                    .get_with(1, async move {
4834                        semaphore_ref.add_permits(1);
4835                        sleep(Duration::from_millis(50)).await;
4836                        panic!("Panic during try_get_with");
4837                    })
4838                    .await;
4839            });
4840        }
4841        let _ = semaphore.acquire().await.expect("semaphore acquire failed");
4842        assert_eq!(cache.get_with(1, async { 5 }).await, 5);
4843    }
4844
4845    #[tokio::test]
4846    // https://github.com/moka-rs/moka/issues/43
4847    async fn handle_panic_in_try_get_with() {
4848        use tokio::time::{sleep, Duration};
4849
4850        let cache = Cache::new(16);
4851        let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
4852        {
4853            let cache_ref = cache.clone();
4854            let semaphore_ref = semaphore.clone();
4855            tokio::task::spawn(async move {
4856                let _ = cache_ref
4857                    .try_get_with(1, async move {
4858                        semaphore_ref.add_permits(1);
4859                        sleep(Duration::from_millis(50)).await;
4860                        panic!("Panic during try_get_with");
4861                    })
4862                    .await as Result<_, Arc<Infallible>>;
4863            });
4864        }
4865        let _ = semaphore.acquire().await.expect("semaphore acquire failed");
4866        assert_eq!(
4867            cache.try_get_with(1, async { Ok(5) }).await as Result<_, Arc<Infallible>>,
4868            Ok(5)
4869        );
4870
4871        assert!(cache.is_waiter_map_empty());
4872    }
4873
4874    #[tokio::test]
4875    // https://github.com/moka-rs/moka/issues/59
4876    async fn abort_get_with() {
4877        use tokio::time::{sleep, Duration};
4878
4879        let cache = Cache::new(16);
4880        let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
4881
4882        let handle;
4883        {
4884            let cache_ref = cache.clone();
4885            let semaphore_ref = semaphore.clone();
4886
4887            handle = tokio::task::spawn(async move {
4888                let _ = cache_ref
4889                    .get_with(1, async move {
4890                        semaphore_ref.add_permits(1);
4891                        sleep(Duration::from_millis(50)).await;
4892                        unreachable!();
4893                    })
4894                    .await;
4895            });
4896        }
4897
4898        let _ = semaphore.acquire().await.expect("semaphore acquire failed");
4899        handle.abort();
4900
4901        assert_eq!(cache.get_with(1, async { 5 }).await, 5);
4902
4903        assert!(cache.is_waiter_map_empty());
4904    }
4905
4906    #[tokio::test]
4907    // https://github.com/moka-rs/moka/issues/59
4908    async fn abort_try_get_with() {
4909        use tokio::time::{sleep, Duration};
4910
4911        let cache = Cache::new(16);
4912        let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
4913
4914        let handle;
4915        {
4916            let cache_ref = cache.clone();
4917            let semaphore_ref = semaphore.clone();
4918
4919            handle = tokio::task::spawn(async move {
4920                let _ = cache_ref
4921                    .try_get_with(1, async move {
4922                        semaphore_ref.add_permits(1);
4923                        sleep(Duration::from_millis(50)).await;
4924                        unreachable!();
4925                    })
4926                    .await as Result<_, Arc<Infallible>>;
4927            });
4928        }
4929
4930        let _ = semaphore.acquire().await.expect("semaphore acquire failed");
4931        handle.abort();
4932
4933        assert_eq!(
4934            cache.try_get_with(1, async { Ok(5) }).await as Result<_, Arc<Infallible>>,
4935            Ok(5)
4936        );
4937
4938        assert!(cache.is_waiter_map_empty());
4939    }
4940
4941    #[tokio::test]
4942    async fn test_removal_notifications() {
4943        // The following `Vec`s will hold actual and expected notifications.
4944        let actual = Arc::new(Mutex::new(Vec::new()));
4945        let mut expected = Vec::new();
4946
4947        // Create an eviction listener.
4948        let a1 = Arc::clone(&actual);
4949        let listener = move |k, v, cause| -> ListenerFuture {
4950            let a2 = Arc::clone(&a1);
4951            async move {
4952                a2.lock().await.push((k, v, cause));
4953            }
4954            .boxed()
4955        };
4956
4957        // Create a cache with the eviction listener.
4958        let mut cache = Cache::builder()
4959            .max_capacity(3)
4960            .async_eviction_listener(listener)
4961            .build();
4962        cache.reconfigure_for_testing().await;
4963
4964        // Make the cache exterior immutable.
4965        let cache = cache;
4966
4967        cache.insert('a', "alice").await;
4968        cache.invalidate(&'a').await;
4969        expected.push((Arc::new('a'), "alice", RemovalCause::Explicit));
4970
4971        cache.run_pending_tasks().await;
4972        assert_eq!(cache.entry_count(), 0);
4973
4974        cache.insert('b', "bob").await;
4975        cache.insert('c', "cathy").await;
4976        cache.insert('d', "david").await;
4977        cache.run_pending_tasks().await;
4978        assert_eq!(cache.entry_count(), 3);
4979
4980        // This will be rejected due to the size constraint.
4981        cache.insert('e', "emily").await;
4982        expected.push((Arc::new('e'), "emily", RemovalCause::Size));
4983        cache.run_pending_tasks().await;
4984        assert_eq!(cache.entry_count(), 3);
4985
4986        // Raise the popularity of 'e' so it will be accepted next time.
4987        cache.get(&'e').await;
4988        cache.run_pending_tasks().await;
4989
4990        // Retry.
4991        cache.insert('e', "eliza").await;
4992        // and the LRU entry will be evicted.
4993        expected.push((Arc::new('b'), "bob", RemovalCause::Size));
4994        cache.run_pending_tasks().await;
4995        assert_eq!(cache.entry_count(), 3);
4996
4997        // Replace an existing entry.
4998        cache.insert('d', "dennis").await;
4999        expected.push((Arc::new('d'), "david", RemovalCause::Replaced));
5000        cache.run_pending_tasks().await;
5001        assert_eq!(cache.entry_count(), 3);
5002
5003        verify_notification_vec(&cache, actual, &expected).await;
5004    }
5005
5006    #[tokio::test]
5007    async fn test_removal_notifications_with_updates() {
5008        // The following `Vec`s will hold actual and expected notifications.
5009        let actual = Arc::new(Mutex::new(Vec::new()));
5010        let mut expected = Vec::new();
5011
5012        // Create an eviction listener.
5013        let a1 = Arc::clone(&actual);
5014        let listener = move |k, v, cause| -> ListenerFuture {
5015            let a2 = Arc::clone(&a1);
5016            async move {
5017                a2.lock().await.push((k, v, cause));
5018            }
5019            .boxed()
5020        };
5021
5022        let (clock, mock) = Clock::mock();
5023
5024        // Create a cache with the eviction listener and also TTL and TTI.
5025        let mut cache = Cache::builder()
5026            .async_eviction_listener(listener)
5027            .time_to_live(Duration::from_secs(7))
5028            .time_to_idle(Duration::from_secs(5))
5029            .clock(clock)
5030            .build();
5031        cache.reconfigure_for_testing().await;
5032
5033        // Make the cache exterior immutable.
5034        let cache = cache;
5035
5036        cache.insert("alice", "a0").await;
5037        cache.run_pending_tasks().await;
5038
5039        // Now alice (a0) has been expired by the idle timeout (TTI).
5040        mock.increment(Duration::from_secs(6));
5041        expected.push((Arc::new("alice"), "a0", RemovalCause::Expired));
5042        assert_eq!(cache.get(&"alice").await, None);
5043
5044        // We have not ran sync after the expiration of alice (a0), so it is
5045        // still in the cache.
5046        assert_eq!(cache.entry_count(), 1);
5047
5048        // Re-insert alice with a different value. Since alice (a0) is still
5049        // in the cache, this is actually a replace operation rather than an
5050        // insert operation. We want to verify that the RemovalCause of a0 is
5051        // Expired, not Replaced.
5052        cache.insert("alice", "a1").await;
5053        cache.run_pending_tasks().await;
5054
5055        mock.increment(Duration::from_secs(4));
5056        assert_eq!(cache.get(&"alice").await, Some("a1"));
5057        cache.run_pending_tasks().await;
5058
5059        // Now alice has been expired by time-to-live (TTL).
5060        mock.increment(Duration::from_secs(4));
5061        expected.push((Arc::new("alice"), "a1", RemovalCause::Expired));
5062        assert_eq!(cache.get(&"alice").await, None);
5063
5064        // But, again, it is still in the cache.
5065        assert_eq!(cache.entry_count(), 1);
5066
5067        // Re-insert alice with a different value and verify that the
5068        // RemovalCause of a1 is Expired (not Replaced).
5069        cache.insert("alice", "a2").await;
5070        cache.run_pending_tasks().await;
5071
5072        assert_eq!(cache.entry_count(), 1);
5073
5074        // Now alice (a2) has been expired by the idle timeout.
5075        mock.increment(Duration::from_secs(6));
5076        expected.push((Arc::new("alice"), "a2", RemovalCause::Expired));
5077        assert_eq!(cache.get(&"alice").await, None);
5078        assert_eq!(cache.entry_count(), 1);
5079
5080        // This invalidate will internally remove alice (a2).
5081        cache.invalidate(&"alice").await;
5082        cache.run_pending_tasks().await;
5083        assert_eq!(cache.entry_count(), 0);
5084
5085        // Re-insert, and this time, make it expired by the TTL.
5086        cache.insert("alice", "a3").await;
5087        cache.run_pending_tasks().await;
5088        mock.increment(Duration::from_secs(4));
5089        assert_eq!(cache.get(&"alice").await, Some("a3"));
5090        cache.run_pending_tasks().await;
5091        mock.increment(Duration::from_secs(4));
5092        expected.push((Arc::new("alice"), "a3", RemovalCause::Expired));
5093        assert_eq!(cache.get(&"alice").await, None);
5094        assert_eq!(cache.entry_count(), 1);
5095
5096        // This invalidate will internally remove alice (a2).
5097        cache.invalidate(&"alice").await;
5098        cache.run_pending_tasks().await;
5099        assert_eq!(cache.entry_count(), 0);
5100
5101        verify_notification_vec(&cache, actual, &expected).await;
5102    }
5103
5104    // When the eviction listener is not set, calling `run_pending_tasks` once should
5105    // evict all entries that can be removed.
5106    #[tokio::test]
5107    async fn no_batch_size_limit_on_eviction() {
5108        const MAX_CAPACITY: u64 = 20;
5109
5110        const EVICTION_TIMEOUT: Duration = Duration::from_nanos(0);
5111        const MAX_LOG_SYNC_REPEATS: u32 = 1;
5112        const EVICTION_BATCH_SIZE: u32 = 1;
5113
5114        let hk_conf = HousekeeperConfig::new(
5115            // Timeout should be ignored when the eviction listener is not provided.
5116            Some(EVICTION_TIMEOUT),
5117            Some(MAX_LOG_SYNC_REPEATS),
5118            Some(EVICTION_BATCH_SIZE),
5119        );
5120
5121        // Create a cache with the LRU policy.
5122        let mut cache = Cache::builder()
5123            .max_capacity(MAX_CAPACITY)
5124            .eviction_policy(EvictionPolicy::lru())
5125            .housekeeper_config(hk_conf)
5126            .build();
5127        cache.reconfigure_for_testing().await;
5128
5129        // Make the cache exterior immutable.
5130        let cache = cache;
5131
5132        // Fill the cache.
5133        for i in 0..MAX_CAPACITY {
5134            let v = format!("v{i}");
5135            cache.insert(i, v).await
5136        }
5137        // The max capacity should not change because we have not called
5138        // `run_pending_tasks` yet.
5139        assert_eq!(cache.entry_count(), 0);
5140
5141        cache.run_pending_tasks().await;
5142        assert_eq!(cache.entry_count(), MAX_CAPACITY);
5143
5144        // Insert more items to the cache.
5145        for i in MAX_CAPACITY..(MAX_CAPACITY * 2) {
5146            let v = format!("v{i}");
5147            cache.insert(i, v).await
5148        }
5149        // The max capacity should not change because we have not called
5150        // `run_pending_tasks` yet.
5151        assert_eq!(cache.entry_count(), MAX_CAPACITY);
5152        // Both old and new keys should exist.
5153        assert!(cache.contains_key(&0)); // old
5154        assert!(cache.contains_key(&(MAX_CAPACITY - 1))); // old
5155        assert!(cache.contains_key(&(MAX_CAPACITY * 2 - 1))); // new
5156
5157        // Process the remaining write op logs (there should be MAX_CAPACITY logs),
5158        // and evict the LRU entries.
5159        cache.run_pending_tasks().await;
5160        assert_eq!(cache.entry_count(), MAX_CAPACITY);
5161
5162        // Now all the old keys should be gone.
5163        assert!(!cache.contains_key(&0));
5164        assert!(!cache.contains_key(&(MAX_CAPACITY - 1)));
5165        // And the new keys should exist.
5166        assert!(cache.contains_key(&(MAX_CAPACITY * 2 - 1)));
5167    }
5168
5169    #[tokio::test]
5170    async fn slow_eviction_listener() {
5171        const MAX_CAPACITY: u64 = 20;
5172
5173        const EVICTION_TIMEOUT: Duration = Duration::from_millis(30);
5174        const LISTENER_DELAY: Duration = Duration::from_millis(11);
5175        const MAX_LOG_SYNC_REPEATS: u32 = 1;
5176        const EVICTION_BATCH_SIZE: u32 = 1;
5177
5178        let hk_conf = HousekeeperConfig::new(
5179            Some(EVICTION_TIMEOUT),
5180            Some(MAX_LOG_SYNC_REPEATS),
5181            Some(EVICTION_BATCH_SIZE),
5182        );
5183
5184        let (clock, mock) = Clock::mock();
5185        let listener_call_count = Arc::new(AtomicU8::new(0));
5186        let lcc = Arc::clone(&listener_call_count);
5187
5188        // A slow eviction listener that spend `LISTENER_DELAY` to process a removal
5189        // notification.
5190        let listener = move |_k, _v, _cause| {
5191            mock.increment(LISTENER_DELAY);
5192            lcc.fetch_add(1, Ordering::AcqRel);
5193        };
5194
5195        // Create a cache with the LRU policy.
5196        let mut cache = Cache::builder()
5197            .max_capacity(MAX_CAPACITY)
5198            .eviction_policy(EvictionPolicy::lru())
5199            .eviction_listener(listener)
5200            .housekeeper_config(hk_conf)
5201            .clock(clock)
5202            .build();
5203        cache.reconfigure_for_testing().await;
5204
5205        // Make the cache exterior immutable.
5206        let cache = cache;
5207
5208        // Fill the cache.
5209        for i in 0..MAX_CAPACITY {
5210            let v = format!("v{i}");
5211            cache.insert(i, v).await
5212        }
5213        // The max capacity should not change because we have not called
5214        // `run_pending_tasks` yet.
5215        assert_eq!(cache.entry_count(), 0);
5216
5217        cache.run_pending_tasks().await;
5218        assert_eq!(listener_call_count.load(Ordering::Acquire), 0);
5219        assert_eq!(cache.entry_count(), MAX_CAPACITY);
5220
5221        // Insert more items to the cache.
5222        for i in MAX_CAPACITY..(MAX_CAPACITY * 2) {
5223            let v = format!("v{i}");
5224            cache.insert(i, v).await
5225        }
5226        assert_eq!(cache.entry_count(), MAX_CAPACITY);
5227
5228        cache.run_pending_tasks().await;
5229        // Because of the slow listener, cache should get an over capacity.
5230        let mut expected_call_count = 3;
5231        assert_eq!(
5232            listener_call_count.load(Ordering::Acquire) as u64,
5233            expected_call_count
5234        );
5235        assert_eq!(cache.entry_count(), MAX_CAPACITY * 2 - expected_call_count);
5236
5237        loop {
5238            cache.run_pending_tasks().await;
5239
5240            expected_call_count += 3;
5241            if expected_call_count > MAX_CAPACITY {
5242                expected_call_count = MAX_CAPACITY;
5243            }
5244
5245            let actual_count = listener_call_count.load(Ordering::Acquire) as u64;
5246            assert_eq!(actual_count, expected_call_count);
5247            let expected_entry_count = MAX_CAPACITY * 2 - expected_call_count;
5248            assert_eq!(cache.entry_count(), expected_entry_count);
5249
5250            if expected_call_count >= MAX_CAPACITY {
5251                break;
5252            }
5253        }
5254
5255        assert_eq!(cache.entry_count(), MAX_CAPACITY);
5256    }
5257
5258    // NOTE: To enable the panic logging, run the following command:
5259    //
5260    // RUST_LOG=moka=info cargo test --features 'future, logging' -- \
5261    //   future::cache::tests::recover_from_panicking_eviction_listener --exact --nocapture
5262    //
5263    #[tokio::test]
5264    async fn recover_from_panicking_eviction_listener() {
5265        #[cfg(feature = "logging")]
5266        let _ = env_logger::builder().is_test(true).try_init();
5267
5268        // The following `Vec`s will hold actual and expected notifications.
5269        let actual = Arc::new(Mutex::new(Vec::new()));
5270        let mut expected = Vec::new();
5271
5272        // Create an eviction listener that panics when it see
5273        // a value "panic now!".
5274        let a1 = Arc::clone(&actual);
5275        let listener = move |k, v, cause| -> ListenerFuture {
5276            let a2 = Arc::clone(&a1);
5277            async move {
5278                if v == "panic now!" {
5279                    panic!("Panic now!");
5280                }
5281                a2.lock().await.push((k, v, cause));
5282            }
5283            .boxed()
5284        };
5285
5286        // Create a cache with the eviction listener.
5287        let mut cache = Cache::builder()
5288            .name("My Future Cache")
5289            .async_eviction_listener(listener)
5290            .build();
5291        cache.reconfigure_for_testing().await;
5292
5293        // Make the cache exterior immutable.
5294        let cache = cache;
5295
5296        // Insert an okay value.
5297        cache.insert("alice", "a0").await;
5298        cache.run_pending_tasks().await;
5299
5300        // Insert a value that will cause the eviction listener to panic.
5301        cache.insert("alice", "panic now!").await;
5302        expected.push((Arc::new("alice"), "a0", RemovalCause::Replaced));
5303        cache.run_pending_tasks().await;
5304
5305        // Insert an okay value. This will replace the previous
5306        // value "panic now!" so the eviction listener will panic.
5307        cache.insert("alice", "a2").await;
5308        cache.run_pending_tasks().await;
5309        // No more removal notification should be sent.
5310
5311        // Invalidate the okay value.
5312        cache.invalidate(&"alice").await;
5313        cache.run_pending_tasks().await;
5314
5315        verify_notification_vec(&cache, actual, &expected).await;
5316    }
5317
5318    #[tokio::test]
5319    async fn cancel_future_while_running_pending_tasks() {
5320        use crate::future::FutureExt;
5321        use futures_util::future::poll_immediate;
5322        use tokio::task::yield_now;
5323
5324        let listener_initiation_count: Arc<AtomicU32> = Default::default();
5325        let listener_completion_count: Arc<AtomicU32> = Default::default();
5326
5327        let listener = {
5328            // Variables to capture.
5329            let init_count = Arc::clone(&listener_initiation_count);
5330            let comp_count = Arc::clone(&listener_completion_count);
5331
5332            // Our eviction listener closure.
5333            move |_k, _v, _r| {
5334                init_count.fetch_add(1, Ordering::AcqRel);
5335                let comp_count1 = Arc::clone(&comp_count);
5336
5337                async move {
5338                    yield_now().await;
5339                    comp_count1.fetch_add(1, Ordering::AcqRel);
5340                }
5341                .boxed()
5342            }
5343        };
5344
5345        let (clock, mock) = Clock::mock();
5346
5347        let mut cache: Cache<u32, u32> = Cache::builder()
5348            .time_to_live(Duration::from_millis(10))
5349            .async_eviction_listener(listener)
5350            .clock(clock)
5351            .build();
5352
5353        cache.reconfigure_for_testing().await;
5354
5355        // Make the cache exterior immutable.
5356        let cache = cache;
5357
5358        cache.insert(1, 1).await;
5359        assert_eq!(cache.run_pending_tasks_initiation_count(), 0);
5360        assert_eq!(cache.run_pending_tasks_completion_count(), 0);
5361
5362        // Key 1 is not yet expired.
5363        mock.increment(Duration::from_millis(7));
5364
5365        cache.run_pending_tasks().await;
5366        assert_eq!(cache.run_pending_tasks_initiation_count(), 1);
5367        assert_eq!(cache.run_pending_tasks_completion_count(), 1);
5368        assert_eq!(listener_initiation_count.load(Ordering::Acquire), 0);
5369        assert_eq!(listener_completion_count.load(Ordering::Acquire), 0);
5370
5371        // Now key 1 is expired, so the eviction listener should be called when we
5372        // call run_pending_tasks() and poll the returned future.
5373        mock.increment(Duration::from_millis(7));
5374
5375        let fut = cache.run_pending_tasks();
5376        // Poll the fut only once, and drop it. The fut should not be completed (so
5377        // it is cancelled) because the eviction listener performed a yield_now().
5378        assert!(poll_immediate(fut).await.is_none());
5379
5380        // The task is initiated but not completed.
5381        assert_eq!(cache.run_pending_tasks_initiation_count(), 2);
5382        assert_eq!(cache.run_pending_tasks_completion_count(), 1);
5383        // The listener is initiated but not completed.
5384        assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
5385        assert_eq!(listener_completion_count.load(Ordering::Acquire), 0);
5386
5387        // This will resume the task and the listener, and continue polling
5388        // until complete.
5389        cache.run_pending_tasks().await;
5390        // Now the task is completed.
5391        assert_eq!(cache.run_pending_tasks_initiation_count(), 2);
5392        assert_eq!(cache.run_pending_tasks_completion_count(), 2);
5393        // Now the listener is completed.
5394        assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
5395        assert_eq!(listener_completion_count.load(Ordering::Acquire), 1);
5396    }
5397
5398    #[tokio::test]
5399    async fn cancel_future_while_calling_eviction_listener() {
5400        use crate::future::FutureExt;
5401        use futures_util::future::poll_immediate;
5402        use tokio::task::yield_now;
5403
5404        let listener_initiation_count: Arc<AtomicU32> = Default::default();
5405        let listener_completion_count: Arc<AtomicU32> = Default::default();
5406
5407        let listener = {
5408            // Variables to capture.
5409            let init_count = Arc::clone(&listener_initiation_count);
5410            let comp_count = Arc::clone(&listener_completion_count);
5411
5412            // Our eviction listener closure.
5413            move |_k, _v, _r| {
5414                init_count.fetch_add(1, Ordering::AcqRel);
5415                let comp_count1 = Arc::clone(&comp_count);
5416
5417                async move {
5418                    yield_now().await;
5419                    comp_count1.fetch_add(1, Ordering::AcqRel);
5420                }
5421                .boxed()
5422            }
5423        };
5424
5425        let mut cache: Cache<u32, u32> = Cache::builder()
5426            .time_to_live(Duration::from_millis(10))
5427            .async_eviction_listener(listener)
5428            .build();
5429
5430        cache.reconfigure_for_testing().await;
5431
5432        // Make the cache exterior immutable.
5433        let cache = cache;
5434
5435        // ------------------------------------------------------------
5436        // Interrupt the eviction listener while calling `insert`
5437        // ------------------------------------------------------------
5438
5439        cache.insert(1, 1).await;
5440
5441        let fut = cache.insert(1, 2);
5442        // Poll the fut only once, and drop it. The fut should not be completed (so
5443        // it is cancelled) because the eviction listener performed a yield_now().
5444        assert!(poll_immediate(fut).await.is_none());
5445
5446        // The listener is initiated but not completed.
5447        assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
5448        assert_eq!(listener_completion_count.load(Ordering::Acquire), 0);
5449
5450        // This will call retry_interrupted_ops() and resume the interrupted
5451        // listener.
5452        cache.run_pending_tasks().await;
5453        assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
5454        assert_eq!(listener_completion_count.load(Ordering::Acquire), 1);
5455
5456        // ------------------------------------------------------------
5457        // Interrupt the eviction listener while calling `invalidate`
5458        // ------------------------------------------------------------
5459
5460        let fut = cache.invalidate(&1);
5461        // Cancel the fut after one poll.
5462        assert!(poll_immediate(fut).await.is_none());
5463        // The listener is initiated but not completed.
5464        assert_eq!(listener_initiation_count.load(Ordering::Acquire), 2);
5465        assert_eq!(listener_completion_count.load(Ordering::Acquire), 1);
5466
5467        // This will call retry_interrupted_ops() and resume the interrupted
5468        // listener.
5469        cache.get(&99).await;
5470        assert_eq!(listener_initiation_count.load(Ordering::Acquire), 2);
5471        assert_eq!(listener_completion_count.load(Ordering::Acquire), 2);
5472
5473        // ------------------------------------------------------------
5474        // Ensure retry_interrupted_ops() is called
5475        // ------------------------------------------------------------
5476
5477        // Repeat the same test with `insert`, but this time, call different methods
5478        // to ensure retry_interrupted_ops() is called.
5479        let prepare = || async {
5480            cache.invalidate(&1).await;
5481
5482            // Reset the counters.
5483            listener_initiation_count.store(0, Ordering::Release);
5484            listener_completion_count.store(0, Ordering::Release);
5485
5486            cache.insert(1, 1).await;
5487
5488            let fut = cache.insert(1, 2);
5489            // Poll the fut only once, and drop it. The fut should not be completed (so
5490            // it is cancelled) because the eviction listener performed a yield_now().
5491            assert!(poll_immediate(fut).await.is_none());
5492
5493            // The listener is initiated but not completed.
5494            assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
5495            assert_eq!(listener_completion_count.load(Ordering::Acquire), 0);
5496        };
5497
5498        // Methods to test:
5499        //
5500        // - run_pending_tasks (Already tested in a previous test)
5501        // - get               (Already tested in a previous test)
5502        // - insert
5503        // - invalidate
5504        // - remove
5505
5506        // insert
5507        prepare().await;
5508        cache.insert(99, 99).await;
5509        assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
5510        assert_eq!(listener_completion_count.load(Ordering::Acquire), 1);
5511
5512        // invalidate
5513        prepare().await;
5514        cache.invalidate(&88).await;
5515        assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
5516        assert_eq!(listener_completion_count.load(Ordering::Acquire), 1);
5517
5518        // remove
5519        prepare().await;
5520        cache.remove(&77).await;
5521        assert_eq!(listener_initiation_count.load(Ordering::Acquire), 1);
5522        assert_eq!(listener_completion_count.load(Ordering::Acquire), 1);
5523    }
5524
5525    #[tokio::test]
5526    async fn cancel_future_while_scheduling_write_op() {
5527        use futures_util::future::poll_immediate;
5528
5529        let mut cache: Cache<u32, u32> = Cache::builder().build();
5530        cache.reconfigure_for_testing().await;
5531
5532        // Make the cache exterior immutable.
5533        let cache = cache;
5534
5535        // --------------------------------------------------------------
5536        // Interrupt `insert` while blocking in `schedule_write_op`
5537        // --------------------------------------------------------------
5538
5539        cache
5540            .schedule_write_op_should_block
5541            .store(true, Ordering::Release);
5542        let fut = cache.insert(1, 1);
5543        // Poll the fut only once, and drop it. The fut should not be completed (so
5544        // it is cancelled) because schedule_write_op should be awaiting for a lock.
5545        assert!(poll_immediate(fut).await.is_none());
5546
5547        assert_eq!(cache.base.interrupted_op_ch_snd.len(), 1);
5548        assert_eq!(cache.base.write_op_ch.len(), 0);
5549
5550        // This should retry the interrupted operation.
5551        cache
5552            .schedule_write_op_should_block
5553            .store(false, Ordering::Release);
5554        cache.get(&99).await;
5555        assert_eq!(cache.base.interrupted_op_ch_snd.len(), 0);
5556        assert_eq!(cache.base.write_op_ch.len(), 1);
5557
5558        cache.run_pending_tasks().await;
5559        assert_eq!(cache.base.write_op_ch.len(), 0);
5560
5561        // --------------------------------------------------------------
5562        // Interrupt `invalidate` while blocking in `schedule_write_op`
5563        // --------------------------------------------------------------
5564
5565        cache
5566            .schedule_write_op_should_block
5567            .store(true, Ordering::Release);
5568        let fut = cache.invalidate(&1);
5569        // Poll the fut only once, and drop it. The fut should not be completed (so
5570        // it is cancelled) because schedule_write_op should be awaiting for a lock.
5571        assert!(poll_immediate(fut).await.is_none());
5572
5573        assert_eq!(cache.base.interrupted_op_ch_snd.len(), 1);
5574        assert_eq!(cache.base.write_op_ch.len(), 0);
5575
5576        // This should retry the interrupted operation.
5577        cache
5578            .schedule_write_op_should_block
5579            .store(false, Ordering::Release);
5580        cache.get(&99).await;
5581        assert_eq!(cache.base.interrupted_op_ch_snd.len(), 0);
5582        assert_eq!(cache.base.write_op_ch.len(), 1);
5583
5584        cache.run_pending_tasks().await;
5585        assert_eq!(cache.base.write_op_ch.len(), 0);
5586    }
5587
5588    // This test ensures that the `contains_key`, `get` and `invalidate` can use
5589    // borrowed form `&[u8]` for key with type `Vec<u8>`.
5590    // https://github.com/moka-rs/moka/issues/166
5591    #[tokio::test]
5592    async fn borrowed_forms_of_key() {
5593        let cache: Cache<Vec<u8>, ()> = Cache::new(1);
5594
5595        let key = vec![1_u8];
5596        cache.insert(key.clone(), ()).await;
5597
5598        // key as &Vec<u8>
5599        let key_v: &Vec<u8> = &key;
5600        assert!(cache.contains_key(key_v));
5601        assert_eq!(cache.get(key_v).await, Some(()));
5602        cache.invalidate(key_v).await;
5603
5604        cache.insert(key, ()).await;
5605
5606        // key as &[u8]
5607        let key_s: &[u8] = &[1_u8];
5608        assert!(cache.contains_key(key_s));
5609        assert_eq!(cache.get(key_s).await, Some(()));
5610        cache.invalidate(key_s).await;
5611    }
5612
5613    #[tokio::test]
5614    async fn drop_value_immediately_after_eviction() {
5615        use crate::common::test_utils::{Counters, Value};
5616
5617        const MAX_CAPACITY: u32 = 500;
5618        const KEYS: u32 = ((MAX_CAPACITY as f64) * 1.2) as u32;
5619
5620        let counters = Arc::new(Counters::default());
5621        let counters1 = Arc::clone(&counters);
5622
5623        let listener = move |_k, _v, cause| match cause {
5624            RemovalCause::Size => counters1.incl_evicted(),
5625            RemovalCause::Explicit => counters1.incl_invalidated(),
5626            _ => (),
5627        };
5628
5629        let mut cache = Cache::builder()
5630            .max_capacity(MAX_CAPACITY as u64)
5631            .eviction_listener(listener)
5632            .build();
5633        cache.reconfigure_for_testing().await;
5634
5635        // Make the cache exterior immutable.
5636        let cache = cache;
5637
5638        for key in 0..KEYS {
5639            let value = Arc::new(Value::new(vec![0u8; 1024], &counters));
5640            cache.insert(key, value).await;
5641            counters.incl_inserted();
5642            cache.run_pending_tasks().await;
5643        }
5644
5645        let eviction_count = KEYS - MAX_CAPACITY;
5646
5647        // Retries will be needed when testing in a QEMU VM.
5648        const MAX_RETRIES: usize = 5;
5649        let mut retries = 0;
5650        loop {
5651            // Ensure all scheduled notifications have been processed.
5652            std::thread::sleep(Duration::from_millis(500));
5653
5654            if counters.evicted() != eviction_count || counters.value_dropped() != eviction_count {
5655                if retries <= MAX_RETRIES {
5656                    retries += 1;
5657                    cache.run_pending_tasks().await;
5658                    continue;
5659                } else {
5660                    assert_eq!(counters.evicted(), eviction_count, "Retries exhausted");
5661                    assert_eq!(
5662                        counters.value_dropped(),
5663                        eviction_count,
5664                        "Retries exhausted"
5665                    );
5666                }
5667            }
5668
5669            assert_eq!(counters.inserted(), KEYS, "inserted");
5670            assert_eq!(counters.value_created(), KEYS, "value_created");
5671            assert_eq!(counters.evicted(), eviction_count, "evicted");
5672            assert_eq!(counters.invalidated(), 0, "invalidated");
5673            assert_eq!(counters.value_dropped(), eviction_count, "value_dropped");
5674
5675            break;
5676        }
5677
5678        for key in 0..KEYS {
5679            cache.invalidate(&key).await;
5680            cache.run_pending_tasks().await;
5681        }
5682
5683        let mut retries = 0;
5684        loop {
5685            // Ensure all scheduled notifications have been processed.
5686            std::thread::sleep(Duration::from_millis(500));
5687
5688            if counters.invalidated() != MAX_CAPACITY || counters.value_dropped() != KEYS {
5689                if retries <= MAX_RETRIES {
5690                    retries += 1;
5691                    cache.run_pending_tasks().await;
5692                    continue;
5693                } else {
5694                    assert_eq!(counters.invalidated(), MAX_CAPACITY, "Retries exhausted");
5695                    assert_eq!(counters.value_dropped(), KEYS, "Retries exhausted");
5696                }
5697            }
5698
5699            assert_eq!(counters.inserted(), KEYS, "inserted");
5700            assert_eq!(counters.value_created(), KEYS, "value_created");
5701            assert_eq!(counters.evicted(), eviction_count, "evicted");
5702            assert_eq!(counters.invalidated(), MAX_CAPACITY, "invalidated");
5703            assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
5704
5705            break;
5706        }
5707
5708        std::mem::drop(cache);
5709        assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
5710    }
5711
5712    // https://github.com/moka-rs/moka/issues/383
5713    #[tokio::test]
5714    async fn ensure_gc_runs_when_dropping_cache() {
5715        let cache = Cache::builder().build();
5716        let val = Arc::new(0);
5717        cache
5718            .get_with(1, std::future::ready(Arc::clone(&val)))
5719            .await;
5720        drop(cache);
5721        assert_eq!(Arc::strong_count(&val), 1);
5722    }
5723
5724    #[tokio::test]
5725    async fn test_debug_format() {
5726        let cache = Cache::new(10);
5727        cache.insert('a', "alice").await;
5728        cache.insert('b', "bob").await;
5729        cache.insert('c', "cindy").await;
5730
5731        let debug_str = format!("{cache:?}");
5732        assert!(debug_str.starts_with('{'));
5733        assert!(debug_str.contains(r#"'a': "alice""#));
5734        assert!(debug_str.contains(r#"'b': "bob""#));
5735        assert!(debug_str.contains(r#"'c': "cindy""#));
5736        assert!(debug_str.ends_with('}'));
5737    }
5738
5739    type NotificationTuple<K, V> = (Arc<K>, V, RemovalCause);
5740
5741    async fn verify_notification_vec<K, V, S>(
5742        cache: &Cache<K, V, S>,
5743        actual: Arc<Mutex<Vec<NotificationTuple<K, V>>>>,
5744        expected: &[NotificationTuple<K, V>],
5745    ) where
5746        K: std::hash::Hash + Eq + std::fmt::Debug + Send + Sync + 'static,
5747        V: Eq + std::fmt::Debug + Clone + Send + Sync + 'static,
5748        S: std::hash::BuildHasher + Clone + Send + Sync + 'static,
5749    {
5750        // Retries will be needed when testing in a QEMU VM.
5751        const MAX_RETRIES: usize = 5;
5752        let mut retries = 0;
5753        loop {
5754            // Ensure all scheduled notifications have been processed.
5755            std::thread::sleep(Duration::from_millis(500));
5756
5757            let actual = &*actual.lock().await;
5758            if actual.len() != expected.len() {
5759                if retries <= MAX_RETRIES {
5760                    retries += 1;
5761                    cache.run_pending_tasks().await;
5762                    continue;
5763                } else {
5764                    assert_eq!(actual.len(), expected.len(), "Retries exhausted");
5765                }
5766            }
5767
5768            for (i, (actual, expected)) in actual.iter().zip(expected).enumerate() {
5769                assert_eq!(actual, expected, "expected[{i}]");
5770            }
5771
5772            break;
5773        }
5774    }
5775}