Skip to main content

moka/sync/
cache.rs

1use super::{
2    base_cache::{BaseCache, HouseKeeperArc},
3    value_initializer::{InitResult, ValueInitializer},
4    CacheBuilder, OwnedKeyEntrySelector, RefKeyEntrySelector,
5};
6use crate::{
7    common::{
8        concurrent::{
9            constants::WRITE_RETRY_INTERVAL_MICROS, housekeeper::InnerSync, Weigher, WriteOp,
10        },
11        iter::ScanningGet,
12        time::{Clock, Instant},
13        HousekeeperConfig,
14    },
15    notification::EvictionListener,
16    ops::compute::{self, CompResult},
17    policy::{EvictionPolicy, ExpirationPolicy},
18    sync::{Iter, PredicateId},
19    Entry, Policy, PredicateError,
20};
21
22use crossbeam_channel::{Sender, TrySendError};
23use equivalent::Equivalent;
24use std::{
25    collections::hash_map::RandomState,
26    fmt,
27    hash::{BuildHasher, Hash},
28    sync::Arc,
29    time::Duration,
30};
31
32/// A thread-safe concurrent synchronous in-memory cache.
33///
34/// `Cache` supports full concurrency of retrievals and a high expected concurrency
35/// for updates.
36///
37/// `Cache` utilizes a lock-free concurrent hash table as the central key-value
38/// storage. `Cache` performs a best-effort bounding of the map using an entry
39/// replacement algorithm to determine which entries to evict when the capacity is
40/// exceeded.
41///
42/// # Table of Contents
43///
44/// - [Example: `insert`, `get` and `invalidate`](#example-insert-get-and-invalidate)
45/// - [Avoiding to clone the value at `get`](#avoiding-to-clone-the-value-at-get)
46/// - [Sharing a cache across threads](#sharing-a-cache-across-threads)
47///     - [No lock is needed](#no-lock-is-needed)
48/// - [Hashing Algorithm](#hashing-algorithm)
49/// - [Example: Size-based Eviction](#example-size-based-eviction)
50/// - [Example: Time-based Expirations](#example-time-based-expirations)
51///     - [Cache-level TTL and TTI policies](#cache-level-ttl-and-tti-policies)
52///     - [Per-entry expiration policy](#per-entry-expiration-policy)
53/// - [Example: Eviction Listener](#example-eviction-listener)
54///     - [You should avoid eviction listener to
55///       panic](#you-should-avoid-eviction-listener-to-panic)
56///
57/// # Example: `insert`, `get` and `invalidate`
58///
59/// Cache entries are manually added using [`insert`](#method.insert) or
60/// [`get_with`](#method.get_with) methods, and are stored in the cache until either
61/// evicted or manually invalidated.
62///
63/// Here's an example of reading and updating a cache by using multiple threads:
64///
65/// ```rust
66/// use moka::sync::Cache;
67///
68/// use std::thread;
69///
70/// fn value(n: usize) -> String {
71///     format!("value {n}")
72/// }
73///
74/// const NUM_THREADS: usize = 16;
75/// const NUM_KEYS_PER_THREAD: usize = 64;
76///
77/// // Create a cache that can store up to 10,000 entries.
78/// let cache = Cache::new(10_000);
79///
80/// // Spawn threads and read and update the cache simultaneously.
81/// let threads: Vec<_> = (0..NUM_THREADS)
82///     .map(|i| {
83///         // To share the same cache across the threads, clone it.
84///         // This is a cheap operation.
85///         let my_cache = cache.clone();
86///         let start = i * NUM_KEYS_PER_THREAD;
87///         let end = (i + 1) * NUM_KEYS_PER_THREAD;
88///
89///         thread::spawn(move || {
90///             // Insert 64 entries. (NUM_KEYS_PER_THREAD = 64)
91///             for key in start..end {
92///                 my_cache.insert(key, value(key));
93///                 // get() returns Option<String>, a clone of the stored value.
94///                 assert_eq!(my_cache.get(&key), Some(value(key)));
95///             }
96///
97///             // Invalidate every 4 element of the inserted entries.
98///             for key in (start..end).step_by(4) {
99///                 my_cache.invalidate(&key);
100///             }
101///         })
102///     })
103///     .collect();
104///
105/// // Wait for all threads to complete.
106/// threads.into_iter().for_each(|t| t.join().expect("Failed"));
107///
108/// // Verify the result.
109/// for key in 0..(NUM_THREADS * NUM_KEYS_PER_THREAD) {
110///     if key % 4 == 0 {
111///         assert_eq!(cache.get(&key), None);
112///     } else {
113///         assert_eq!(cache.get(&key), Some(value(key)));
114///     }
115/// }
116/// ```
117///
118/// If you want to atomically initialize and insert a value when the key is not
119/// present, you might want to check other insertion methods
120/// [`get_with`](#method.get_with) and [`try_get_with`](#method.try_get_with).
121///
122/// # Avoiding to clone the value at `get`
123///
124/// The return type of `get` method is `Option<V>` instead of `Option<&V>`. Every
125/// time `get` is called for an existing key, it creates a clone of the stored value
126/// `V` and returns it. This is because the `Cache` allows concurrent updates from
127/// threads so a value stored in the cache can be dropped or replaced at any time by
128/// any other thread. `get` cannot return a reference `&V` as it is impossible to
129/// guarantee the value outlives the reference.
130///
131/// If you want to store values that will be expensive to clone, wrap them by
132/// `std::sync::Arc` before storing in a cache. [`Arc`][rustdoc-std-arc] is a
133/// thread-safe reference-counted pointer and its `clone()` method is cheap.
134///
135/// [rustdoc-std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html
136///
137/// # Sharing a cache across threads
138///
139/// To share a cache across threads, do one of the followings:
140///
141/// - Create a clone of the cache by calling its `clone` method and pass it to other
142///   thread.
143/// - Wrap the cache by a `sync::OnceCell` or `sync::Lazy` from
144///   [once_cell][once-cell-crate] create, and set it to a `static` variable.
145///
146/// Cloning is a cheap operation for `Cache` as it only creates thread-safe
147/// reference-counted pointers to the internal data structures.
148///
149/// ## No lock is needed
150///
151/// Don't wrap a `Cache` by a lock such as `Mutex` or `RwLock`. All methods provided
152/// by the `Cache` are considered thread-safe, and can be safely called by multiple
153/// threads at the same time. No lock is needed.
154///
155/// [once-cell-crate]: https://crates.io/crates/once_cell
156///
157/// # Hashing Algorithm
158///
159/// By default, `Cache` uses a hashing algorithm selected to provide resistance
160/// against HashDoS attacks. It will be the same one used by
161/// `std::collections::HashMap`, which is currently SipHash 1-3.
162///
163/// While SipHash's performance is very competitive for medium sized keys, other
164/// hashing algorithms will outperform it for small keys such as integers as well as
165/// large keys such as long strings. However those algorithms will typically not
166/// protect against attacks such as HashDoS.
167///
168/// The hashing algorithm can be replaced on a per-`Cache` basis using the
169/// [`build_with_hasher`][build-with-hasher-method] method of the `CacheBuilder`.
170/// Many alternative algorithms are available on crates.io, such as the
171/// [AHash][ahash-crate] crate.
172///
173/// [build-with-hasher-method]: ./struct.CacheBuilder.html#method.build_with_hasher
174/// [ahash-crate]: https://crates.io/crates/ahash
175///
176/// # Example: Size-based Eviction
177///
178/// ```rust
179/// use moka::sync::Cache;
180///
181/// // Evict based on the number of entries in the cache.
182/// let cache = Cache::builder()
183///     // Up to 10,000 entries.
184///     .max_capacity(10_000)
185///     // Create the cache.
186///     .build();
187/// cache.insert(1, "one".to_string());
188///
189/// // Evict based on the byte length of strings in the cache.
190/// let cache = Cache::builder()
191///     // A weigher closure takes &K and &V and returns a u32
192///     // representing the relative size of the entry.
193///     .weigher(|_key, value: &String| -> u32 {
194///         value.len().try_into().unwrap_or(u32::MAX)
195///     })
196///     // This cache will hold up to 32MiB of values.
197///     .max_capacity(32 * 1024 * 1024)
198///     .build();
199/// cache.insert(2, "two".to_string());
200/// ```
201///
202/// If your cache should not grow beyond a certain size, use the `max_capacity`
203/// method of the [`CacheBuilder`][builder-struct] to set the upper bound. The cache
204/// will try to evict entries that have not been used recently or very often.
205///
206/// At the cache creation time, a weigher closure can be set by the `weigher` method
207/// of the `CacheBuilder`. A weigher closure takes `&K` and `&V` as the arguments and
208/// returns a `u32` representing the relative size of the entry:
209///
210/// - If the `weigher` is _not_ set, the cache will treat each entry has the same
211///   size of `1`. This means the cache will be bounded by the number of entries.
212/// - If the `weigher` is set, the cache will call the weigher to calculate the
213///   weighted size (relative size) on an entry. This means the cache will be bounded
214///   by the total weighted size of entries.
215///
216/// Note that weighted sizes are not used when making eviction selections.
217///
218/// [builder-struct]: ./struct.CacheBuilder.html
219///
220/// # Example: Time-based Expirations
221///
222/// ## Cache-level TTL and TTI policies
223///
224/// `Cache` supports the following cache-level expiration policies:
225///
226/// - **Time to live (TTL)**: A cached entry will be expired after the specified
227///   duration past from `insert`.
228/// - **Time to idle (TTI)**: A cached entry will be expired after the specified
229///   duration past from `get` or `insert`.
230///
231/// They are a cache-level expiration policies; all entries in the cache will have
232/// the same TTL and/or TTI durations. If you want to set different expiration
233/// durations for different entries, see the next section.
234///
235/// ```rust
236/// use moka::sync::Cache;
237/// use std::time::Duration;
238///
239/// let cache = Cache::builder()
240///     // Time to live (TTL): 30 minutes
241///     .time_to_live(Duration::from_secs(30 * 60))
242///     // Time to idle (TTI):  5 minutes
243///     .time_to_idle(Duration::from_secs( 5 * 60))
244///     // Create the cache.
245///     .build();
246///
247/// // This entry will expire after 5 minutes (TTI) if there is no get().
248/// cache.insert(0, "zero");
249///
250/// // This get() will extend the entry life for another 5 minutes.
251/// cache.get(&0);
252///
253/// // Even though we keep calling get(), the entry will expire
254/// // after 30 minutes (TTL) from the insert().
255/// ```
256///
257/// ## Per-entry expiration policy
258///
259/// `Cache` supports per-entry expiration policy through the `Expiry` trait.
260///
261/// `Expiry` trait provides three callback methods:
262/// [`expire_after_create`][exp-create], [`expire_after_read`][exp-read] and
263/// [`expire_after_update`][exp-update]. When a cache entry is inserted, read or
264/// updated, one of these methods is called. These methods return an
265/// `Option<Duration>`, which is used as the expiration duration of the entry.
266///
267/// `Expiry` trait provides the default implementations of these methods, so you will
268/// implement only the methods you want to customize.
269///
270/// [exp-create]: ../trait.Expiry.html#method.expire_after_create
271/// [exp-read]: ../trait.Expiry.html#method.expire_after_read
272/// [exp-update]: ../trait.Expiry.html#method.expire_after_update
273///
274/// ```rust
275/// use moka::{sync::Cache, Expiry};
276/// use std::time::{Duration, Instant};
277///
278/// // In this example, we will create a `sync::Cache` with `u32` as the key, and
279/// // `(Expiration, String)` as the value. `Expiration` is an enum to represent the
280/// // expiration of the value, and `String` is the application data of the value.
281///
282/// /// An enum to represent the expiration of a value.
283/// #[derive(Clone, Copy, Debug, Eq, PartialEq)]
284/// pub enum Expiration {
285///     /// The value never expires.
286///     Never,
287///     /// The value expires after a short time. (5 seconds in this example)
288///     AfterShortTime,
289///     /// The value expires after a long time. (15 seconds in this example)
290///     AfterLongTime,
291/// }
292///
293/// impl Expiration {
294///     /// Returns the duration of this expiration.
295///     pub fn as_duration(&self) -> Option<Duration> {
296///         match self {
297///             Expiration::Never => None,
298///             Expiration::AfterShortTime => Some(Duration::from_secs(5)),
299///             Expiration::AfterLongTime => Some(Duration::from_secs(15)),
300///         }
301///     }
302/// }
303///
304/// /// An expiry that implements `moka::Expiry` trait. `Expiry` trait provides the
305/// /// default implementations of three callback methods `expire_after_create`,
306/// /// `expire_after_read`, and `expire_after_update`.
307/// ///
308/// /// In this example, we only override the `expire_after_create` method.
309/// pub struct MyExpiry;
310///
311/// impl Expiry<u32, (Expiration, String)> for MyExpiry {
312///     /// Returns the duration of the expiration of the value that was just
313///     /// created.
314///     fn expire_after_create(
315///         &self,
316///         _key: &u32,
317///         value: &(Expiration, String),
318///         _current_time: Instant,
319///     ) -> Option<Duration> {
320///         let duration = value.0.as_duration();
321///         println!("MyExpiry: expire_after_create called with key {_key} and value {value:?}. Returning {duration:?}.");
322///         duration
323///     }
324/// }
325///
326/// // Create a `Cache<u32, (Expiration, String)>` with an expiry `MyExpiry` and
327/// // eviction listener.
328/// let expiry = MyExpiry;
329///
330/// let eviction_listener = |key, _value, cause| {
331///     println!("Evicted key {key}. Cause: {cause:?}");
332/// };
333///
334/// let cache = Cache::builder()
335///     .max_capacity(100)
336///     .expire_after(expiry)
337///     .eviction_listener(eviction_listener)
338///     .build();
339///
340/// // Insert some entries into the cache with different expirations.
341/// cache.get_with(0, || (Expiration::AfterShortTime, "a".to_string()));
342/// cache.get_with(1, || (Expiration::AfterLongTime, "b".to_string()));
343/// cache.get_with(2, || (Expiration::Never, "c".to_string()));
344///
345/// // Verify that all the inserted entries exist.
346/// assert!(cache.contains_key(&0));
347/// assert!(cache.contains_key(&1));
348/// assert!(cache.contains_key(&2));
349///
350/// // Sleep for 6 seconds. Key 0 should expire.
351/// println!("\nSleeping for 6 seconds...\n");
352/// std::thread::sleep(Duration::from_secs(6));
353/// println!("Entry count: {}", cache.entry_count());
354///
355/// // Verify that key 0 has been evicted.
356/// assert!(!cache.contains_key(&0));
357/// assert!(cache.contains_key(&1));
358/// assert!(cache.contains_key(&2));
359///
360/// // Sleep for 10 more seconds. Key 1 should expire.
361/// println!("\nSleeping for 10 seconds...\n");
362/// std::thread::sleep(Duration::from_secs(10));
363/// println!("Entry count: {}", cache.entry_count());
364///
365/// // Verify that key 1 has been evicted.
366/// assert!(!cache.contains_key(&1));
367/// assert!(cache.contains_key(&2));
368///
369/// // Manually invalidate key 2.
370/// cache.invalidate(&2);
371/// assert!(!cache.contains_key(&2));
372///
373/// println!("\nSleeping for a second...\n");
374/// std::thread::sleep(Duration::from_secs(1));
375/// println!("Entry count: {}", cache.entry_count());
376///
377/// println!("\nDone!");
378/// ```
379///
380/// # Example: Eviction Listener
381///
382/// A `Cache` can be configured with an eviction listener, a closure that is called
383/// every time there is a cache eviction. The listener takes three parameters: the
384/// key and value of the evicted entry, and the
385/// [`RemovalCause`](../notification/enum.RemovalCause.html) to indicate why the
386/// entry was evicted.
387///
388/// An eviction listener can be used to keep other data structures in sync with the
389/// cache, for example.
390///
391/// The following example demonstrates how to use an eviction listener with
392/// time-to-live expiration to manage the lifecycle of temporary files on a
393/// filesystem. The cache stores the paths of the files, and when one of them has
394/// expired, the eviction listener will be called with the path, so it can remove the
395/// file from the filesystem.
396///
397/// ```rust
398/// // Cargo.toml
399/// //
400/// // [dependencies]
401/// // anyhow = "1.0"
402/// // uuid = { version = "1.1", features = ["v4"] }
403///
404/// use moka::{sync::Cache, notification};
405///
406/// use anyhow::{anyhow, Context};
407/// use std::{
408///     fs, io,
409///     path::{Path, PathBuf},
410///     sync::{Arc, RwLock},
411///     time::Duration,
412/// };
413/// use uuid::Uuid;
414///
415/// /// The DataFileManager writes, reads and removes data files.
416/// struct DataFileManager {
417///     base_dir: PathBuf,
418///     file_count: usize,
419/// }
420///
421/// impl DataFileManager {
422///     fn new(base_dir: PathBuf) -> Self {
423///         Self {
424///             base_dir,
425///             file_count: 0,
426///         }
427///     }
428///
429///     fn write_data_file(
430///         &mut self,
431///         key: impl AsRef<str>,
432///         contents: String
433///     ) -> io::Result<PathBuf> {
434///         // Use the key as a part of the filename.
435///         let mut path = self.base_dir.to_path_buf();
436///         path.push(key.as_ref());
437///
438///         assert!(!path.exists(), "Path already exists: {path:?}");
439///
440///         // create the file at the path and write the contents to the file.
441///         fs::write(&path, contents)?;
442///         self.file_count += 1;
443///         println!("Created a data file at {path:?} (file count: {})", self.file_count);
444///         Ok(path)
445///     }
446///
447///     fn read_data_file(&self, path: impl AsRef<Path>) -> io::Result<String> {
448///         // Reads the contents of the file at the path, and return the contents.
449///         fs::read_to_string(path)
450///     }
451///
452///     fn remove_data_file(&mut self, path: impl AsRef<Path>) -> io::Result<()> {
453///         // Remove the file at the path.
454///         fs::remove_file(path.as_ref())?;
455///         self.file_count -= 1;
456///         println!(
457///             "Removed a data file at {:?} (file count: {})",
458///             path.as_ref(),
459///             self.file_count
460///         );
461///
462///         Ok(())
463///     }
464/// }
465///
466/// fn main() -> anyhow::Result<()> {
467///     // Create an instance of the DataFileManager and wrap it with
468///     // Arc<RwLock<_>> so it can be shared across threads.
469///     let mut base_dir = std::env::temp_dir();
470///     base_dir.push(Uuid::new_v4().as_hyphenated().to_string());
471///     println!("base_dir: {base_dir:?}");
472///     std::fs::create_dir(&base_dir)?;
473///
474///     let file_mgr = DataFileManager::new(base_dir);
475///     let file_mgr = Arc::new(RwLock::new(file_mgr));
476///
477///     let file_mgr1 = Arc::clone(&file_mgr);
478///
479///     // Create an eviction listener closure.
480///     let eviction_listener = move |k, v: PathBuf, cause| {
481///         // Try to remove the data file at the path `v`.
482///         println!("\n== An entry has been evicted. k: {k:?}, v: {v:?}, cause: {cause:?}");
483///
484///         // Acquire the write lock of the DataFileManager. We must handle
485///         // error cases here to prevent the listener from panicking.
486///         match file_mgr1.write() {
487///             Err(_e) => {
488///                 eprintln!("The lock has been poisoned");
489///             }
490///             Ok(mut mgr) => {
491///                 // Remove the data file using the DataFileManager.
492///                 if let Err(_e) = mgr.remove_data_file(v.as_path()) {
493///                     eprintln!("Failed to remove a data file at {v:?}");
494///                 }
495///             }
496///         }
497///     };
498///
499///     // Create the cache. Set time to live for two seconds and set the
500///     // eviction listener.
501///     let cache = Cache::builder()
502///         .max_capacity(100)
503///         .time_to_live(Duration::from_secs(2))
504///         .eviction_listener(eviction_listener)
505///         .build();
506///
507///     // Insert an entry to the cache.
508///     // This will create and write a data file for the key "user1", store the
509///     // path of the file to the cache, and return it.
510///     println!("== try_get_with()");
511///     let key = "user1";
512///     let path = cache
513///         .try_get_with(key, || -> anyhow::Result<_> {
514///             let mut mgr = file_mgr
515///                 .write()
516///                 .map_err(|_e| anyhow::anyhow!("The lock has been poisoned"))?;
517///             let path = mgr
518///                 .write_data_file(key, "user data".into())
519///                 .with_context(|| format!("Failed to create a data file"))?;
520///             Ok(path)
521///         })
522///         .map_err(|e| anyhow!("{e}"))?;
523///
524///     // Read the data file at the path and print the contents.
525///     println!("\n== read_data_file()");
526///     {
527///         let mgr = file_mgr
528///             .read()
529///             .map_err(|_e| anyhow::anyhow!("The lock has been poisoned"))?;
530///         let contents = mgr
531///             .read_data_file(path.as_path())
532///             .with_context(|| format!("Failed to read data from {path:?}"))?;
533///         println!("contents: {contents}");
534///     }
535///
536///     // Sleep for five seconds. While sleeping, the cache entry for key "user1"
537///     // will be expired and evicted, so the eviction listener will be called to
538///     // remove the file.
539///     std::thread::sleep(Duration::from_secs(5));
540///
541///     cache.run_pending_tasks();
542///
543///     Ok(())
544/// }
545/// ```
546///
547/// ## You should avoid eviction listener to panic
548///
549/// It is very important to make an eviction listener closure not to panic.
550/// Otherwise, the cache will stop calling the listener after a panic. This is an
551/// intended behavior because the cache cannot know whether it is memory safe or not
552/// to call the panicked listener again.
553///
554/// When a listener panics, the cache will swallow the panic and disable the
555/// listener. If you want to know when a listener panics and the reason of the panic,
556/// you can enable an optional `logging` feature of Moka and check error-level logs.
557///
558/// To enable the `logging`, do the followings:
559///
560/// 1. In `Cargo.toml`, add the crate feature `logging` for `moka`.
561/// 2. Set the logging level for `moka` to `error` or any lower levels (`warn`,
562///    `info`, ...):
563///     - If you are using the `env_logger` crate, you can achieve this by setting
564///       `RUST_LOG` environment variable to `moka=error`.
565/// 3. If you have more than one caches, you may want to set a distinct name for each
566///    cache by using cache builder's [`name`][builder-name-method] method. The name
567///    will appear in the log.
568///
569/// [builder-name-method]: ./struct.CacheBuilder.html#method.name
570///
571pub struct Cache<K, V, S = RandomState> {
572    pub(crate) base: BaseCache<K, V, S>,
573    value_initializer: Arc<ValueInitializer<K, V, S>>,
574}
575
576unsafe impl<K, V, S> Send for Cache<K, V, S>
577where
578    K: Send + Sync,
579    V: Send + Sync,
580    S: Send,
581{
582}
583
584unsafe impl<K, V, S> Sync for Cache<K, V, S>
585where
586    K: Send + Sync,
587    V: Send + Sync,
588    S: Sync,
589{
590}
591
592// NOTE: We cannot do `#[derive(Clone)]` because it will add `Clone` bound to `K`.
593impl<K, V, S> Clone for Cache<K, V, S> {
594    /// Makes a clone of this shared cache.
595    ///
596    /// This operation is cheap as it only creates thread-safe reference counted
597    /// pointers to the shared internal data structures.
598    fn clone(&self) -> Self {
599        Self {
600            base: self.base.clone(),
601            value_initializer: Arc::clone(&self.value_initializer),
602        }
603    }
604}
605
606impl<K, V, S> fmt::Debug for Cache<K, V, S>
607where
608    K: fmt::Debug + Eq + Hash + Send + Sync + 'static,
609    V: fmt::Debug + Clone + Send + Sync + 'static,
610    // TODO: Remove these bounds from S.
611    S: BuildHasher + Clone + Send + Sync + 'static,
612{
613    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
614        let mut d_map = f.debug_map();
615
616        for (k, v) in self {
617            d_map.entry(&k, &v);
618        }
619
620        d_map.finish()
621    }
622}
623
624impl<K, V, S> Cache<K, V, S> {
625    /// Returns cache’s name.
626    pub fn name(&self) -> Option<&str> {
627        self.base.name()
628    }
629
630    /// Returns a read-only cache policy of this cache.
631    ///
632    /// At this time, cache policy cannot be modified after cache creation.
633    /// A future version may support to modify it.
634    pub fn policy(&self) -> Policy {
635        self.base.policy()
636    }
637
638    /// Returns an approximate number of entries in this cache.
639    ///
640    /// The value returned is _an estimate_; the actual count may differ if there are
641    /// concurrent insertions or removals, or if some entries are pending removal due
642    /// to expiration. This inaccuracy can be mitigated by performing a
643    /// `run_pending_tasks` first.
644    ///
645    /// # Example
646    ///
647    /// ```rust
648    /// use moka::sync::Cache;
649    ///
650    /// let cache = Cache::new(10);
651    /// cache.insert('n', "Netherland Dwarf");
652    /// cache.insert('l', "Lop Eared");
653    /// cache.insert('d', "Dutch");
654    ///
655    /// // Ensure an entry exists.
656    /// assert!(cache.contains_key(&'n'));
657    ///
658    /// // However, followings may print stale number zeros instead of threes.
659    /// println!("{}", cache.entry_count());   // -> 0
660    /// println!("{}", cache.weighted_size()); // -> 0
661    ///
662    /// // To mitigate the inaccuracy, Call `run_pending_tasks` method to run
663    /// // pending internal tasks.
664    /// cache.run_pending_tasks();
665    ///
666    /// // Followings will print the actual numbers.
667    /// println!("{}", cache.entry_count());   // -> 3
668    /// println!("{}", cache.weighted_size()); // -> 3
669    /// ```
670    ///
671    pub fn entry_count(&self) -> u64 {
672        self.base.entry_count()
673    }
674
675    /// Returns an approximate total weighted size of entries in this cache.
676    ///
677    /// The value returned is _an estimate_; the actual size may differ if there are
678    /// concurrent insertions or removals, or if some entries are pending removal due
679    /// to expiration. This inaccuracy can be mitigated by performing a
680    /// `run_pending_tasks` first. See [`entry_count`](#method.entry_count) for a
681    /// sample code.
682    pub fn weighted_size(&self) -> u64 {
683        self.base.weighted_size()
684    }
685}
686
687impl<K, V> Cache<K, V, RandomState>
688where
689    K: Hash + Eq + Send + Sync + 'static,
690    V: Clone + Send + Sync + 'static,
691{
692    /// Constructs a new `Cache<K, V>` that will store up to the `max_capacity`.
693    ///
694    /// To adjust various configuration knobs such as `initial_capacity` or
695    /// `time_to_live`, use the [`CacheBuilder`][builder-struct].
696    ///
697    /// [builder-struct]: ./struct.CacheBuilder.html
698    pub fn new(max_capacity: u64) -> Self {
699        let build_hasher = RandomState::default();
700        Self::with_everything(
701            None,
702            Some(max_capacity),
703            None,
704            build_hasher,
705            None,
706            EvictionPolicy::default(),
707            None,
708            ExpirationPolicy::default(),
709            HousekeeperConfig::default(),
710            false,
711            Clock::default(),
712        )
713    }
714
715    /// Returns a [`CacheBuilder`][builder-struct], which can builds a `Cache` or
716    /// `SegmentedCache` with various configuration knobs.
717    ///
718    /// [builder-struct]: ./struct.CacheBuilder.html
719    pub fn builder() -> CacheBuilder<K, V, Cache<K, V, RandomState>> {
720        CacheBuilder::default()
721    }
722}
723
724impl<K, V, S> Cache<K, V, S>
725where
726    K: Hash + Eq + Send + Sync + 'static,
727    V: Clone + Send + Sync + 'static,
728    S: BuildHasher + Clone + Send + Sync + 'static,
729{
730    // https://rust-lang.github.io/rust-clippy/master/index.html#too_many_arguments
731    #[allow(clippy::too_many_arguments)]
732    pub(crate) fn with_everything(
733        name: Option<String>,
734        max_capacity: Option<u64>,
735        initial_capacity: Option<usize>,
736        build_hasher: S,
737        weigher: Option<Weigher<K, V>>,
738        eviction_policy: EvictionPolicy,
739        eviction_listener: Option<EvictionListener<K, V>>,
740        expiration_policy: ExpirationPolicy<K, V>,
741        housekeeper_config: HousekeeperConfig,
742        invalidator_enabled: bool,
743        clock: Clock,
744    ) -> Self {
745        Self {
746            base: BaseCache::new(
747                name,
748                max_capacity,
749                initial_capacity,
750                build_hasher.clone(),
751                weigher,
752                eviction_policy,
753                eviction_listener,
754                expiration_policy,
755                housekeeper_config,
756                invalidator_enabled,
757                clock,
758            ),
759            value_initializer: Arc::new(ValueInitializer::with_hasher(build_hasher)),
760        }
761    }
762
763    /// Returns `true` if the cache contains a value for the key.
764    ///
765    /// Unlike the `get` method, this method is not considered a cache read operation,
766    /// so it does not update the historic popularity estimator or reset the idle
767    /// timer for the key.
768    ///
769    /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
770    /// on the borrowed form _must_ match those for the key type.
771    pub fn contains_key<Q>(&self, key: &Q) -> bool
772    where
773        Q: Equivalent<K> + Hash + ?Sized,
774    {
775        self.base.contains_key_with_hash(key, self.base.hash(key))
776    }
777
778    pub(crate) fn contains_key_with_hash<Q>(&self, key: &Q, hash: u64) -> bool
779    where
780        Q: Equivalent<K> + Hash + ?Sized,
781    {
782        self.base.contains_key_with_hash(key, hash)
783    }
784
785    /// Returns a _clone_ of the value corresponding to the key.
786    ///
787    /// If you want to store values that will be expensive to clone, wrap them by
788    /// `std::sync::Arc` before storing in a cache. [`Arc`][rustdoc-std-arc] is a
789    /// thread-safe reference-counted pointer and its `clone()` method is cheap.
790    ///
791    /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
792    /// on the borrowed form _must_ match those for the key type.
793    ///
794    /// [rustdoc-std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html
795    pub fn get<Q>(&self, key: &Q) -> Option<V>
796    where
797        Q: Equivalent<K> + Hash + ?Sized,
798    {
799        self.base
800            .get_with_hash(key, self.base.hash(key), false)
801            .map(Entry::into_value)
802    }
803
804    pub(crate) fn get_with_hash<Q>(&self, key: &Q, hash: u64, need_key: bool) -> Option<Entry<K, V>>
805    where
806        Q: Equivalent<K> + Hash + ?Sized,
807    {
808        self.base.get_with_hash(key, hash, need_key)
809    }
810
811    /// Takes a key `K` and returns an [`OwnedKeyEntrySelector`] that can be used to
812    /// select or insert an entry.
813    ///
814    /// [`OwnedKeyEntrySelector`]: ./struct.OwnedKeyEntrySelector.html
815    ///
816    /// # Example
817    ///
818    /// ```rust
819    /// use moka::sync::Cache;
820    ///
821    /// let cache: Cache<String, u32> = Cache::new(100);
822    /// let key = "key1".to_string();
823    ///
824    /// let entry = cache.entry(key.clone()).or_insert(3);
825    /// assert!(entry.is_fresh());
826    /// assert_eq!(entry.key(), &key);
827    /// assert_eq!(entry.into_value(), 3);
828    ///
829    /// let entry = cache.entry(key).or_insert(6);
830    /// // Not fresh because the value was already in the cache.
831    /// assert!(!entry.is_fresh());
832    /// assert_eq!(entry.into_value(), 3);
833    /// ```
834    pub fn entry(&self, key: K) -> OwnedKeyEntrySelector<'_, K, V, S>
835    where
836        K: Hash + Eq,
837    {
838        let hash = self.base.hash(&key);
839        OwnedKeyEntrySelector::new(key, hash, self)
840    }
841
842    /// Takes a reference `&Q` of a key and returns an [`RefKeyEntrySelector`] that
843    /// can be used to select or insert an entry.
844    ///
845    /// [`RefKeyEntrySelector`]: ./struct.RefKeyEntrySelector.html
846    ///
847    /// # Example
848    ///
849    /// ```rust
850    /// use moka::sync::Cache;
851    ///
852    /// let cache: Cache<String, u32> = Cache::new(100);
853    /// let key = "key1".to_string();
854    ///
855    /// let entry = cache.entry_by_ref(&key).or_insert(3);
856    /// assert!(entry.is_fresh());
857    /// assert_eq!(entry.key(), &key);
858    /// assert_eq!(entry.into_value(), 3);
859    ///
860    /// let entry = cache.entry_by_ref(&key).or_insert(6);
861    /// // Not fresh because the value was already in the cache.
862    /// assert!(!entry.is_fresh());
863    /// assert_eq!(entry.into_value(), 3);
864    /// ```
865    pub fn entry_by_ref<'a, Q>(&'a self, key: &'a Q) -> RefKeyEntrySelector<'a, K, Q, V, S>
866    where
867        Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
868    {
869        let hash = self.base.hash(key);
870        RefKeyEntrySelector::new(key, hash, self)
871    }
872
873    /// Returns a _clone_ of the value corresponding to the key. If the value does
874    /// not exist, evaluates the `init` closure and inserts the output.
875    ///
876    /// # Concurrent calls on the same key
877    ///
878    /// This method guarantees that concurrent calls on the same not-existing key are
879    /// coalesced into one evaluation of the `init` closure. Only one of the calls
880    /// evaluates its closure, and other calls wait for that closure to complete.
881    ///
882    /// The following code snippet demonstrates this behavior:
883    ///
884    /// ```rust
885    /// use moka::sync::Cache;
886    /// use std::{sync::Arc, thread};
887    ///
888    /// const TEN_MIB: usize = 10 * 1024 * 1024; // 10MiB
889    /// let cache = Cache::new(100);
890    ///
891    /// // Spawn four threads.
892    /// let threads: Vec<_> = (0..4_u8)
893    ///     .map(|task_id| {
894    ///         let my_cache = cache.clone();
895    ///         thread::spawn(move || {
896    ///             println!("Thread {task_id} started.");
897    ///
898    ///             // Try to insert and get the value for key1. Although all four
899    ///             // threads will call `get_with` at the same time, the `init` closure
900    ///             // must be evaluated only once.
901    ///             let value = my_cache.get_with("key1", || {
902    ///                 println!("Thread {task_id} inserting a value.");
903    ///                 Arc::new(vec![0u8; TEN_MIB])
904    ///             });
905    ///
906    ///             // Ensure the value exists now.
907    ///             assert_eq!(value.len(), TEN_MIB);
908    ///             assert!(my_cache.get(&"key1").is_some());
909    ///
910    ///             println!("Thread {task_id} got the value. (len: {})", value.len());
911    ///         })
912    ///     })
913    ///     .collect();
914    ///
915    /// // Wait all threads to complete.
916    /// threads
917    ///     .into_iter()
918    ///     .for_each(|t| t.join().expect("Thread failed"));
919    /// ```
920    ///
921    /// **Result**
922    ///
923    /// - The `init` closure was called exactly once by thread 1.
924    /// - Other threads were blocked until thread 1 inserted the value.
925    ///
926    /// ```console
927    /// Thread 1 started.
928    /// Thread 0 started.
929    /// Thread 3 started.
930    /// Thread 2 started.
931    /// Thread 1 inserting a value.
932    /// Thread 2 got the value. (len: 10485760)
933    /// Thread 1 got the value. (len: 10485760)
934    /// Thread 0 got the value. (len: 10485760)
935    /// Thread 3 got the value. (len: 10485760)
936    /// ```
937    ///
938    /// # Panics
939    ///
940    /// This method panics when the `init` closure has panicked. When it happens,
941    /// only the caller whose `init` closure panicked will get the panic (e.g. only
942    /// thread 1 in the above sample). If there are other calls in progress (e.g.
943    /// thread 0, 2 and 3 above), this method will restart and resolve one of the
944    /// remaining `init` closure.
945    ///
946    pub fn get_with(&self, key: K, init: impl FnOnce() -> V) -> V {
947        let hash = self.base.hash(&key);
948        let key = Arc::new(key);
949        let replace_if = None as Option<fn(&V) -> bool>;
950        self.get_or_insert_with_hash_and_fun(key, hash, init, replace_if, false)
951            .into_value()
952    }
953
954    /// Similar to [`get_with`](#method.get_with), but instead of passing an owned
955    /// key, you can pass a reference to the key. If the key does not exist in the
956    /// cache, the key will be cloned to create new entry in the cache.
957    pub fn get_with_by_ref<Q>(&self, key: &Q, init: impl FnOnce() -> V) -> V
958    where
959        Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
960    {
961        let hash = self.base.hash(key);
962        let replace_if = None as Option<fn(&V) -> bool>;
963
964        self.get_or_insert_with_hash_by_ref_and_fun(key, hash, init, replace_if, false)
965            .into_value()
966    }
967
968    /// TODO: Remove this in v0.13.0.
969    /// Deprecated, replaced with
970    /// [`entry()::or_insert_with_if()`](./struct.OwnedKeyEntrySelector.html#method.or_insert_with_if)
971    #[deprecated(since = "0.10.0", note = "Replaced with `entry().or_insert_with_if()`")]
972    pub fn get_with_if(
973        &self,
974        key: K,
975        init: impl FnOnce() -> V,
976        replace_if: impl FnMut(&V) -> bool,
977    ) -> V {
978        let hash = self.base.hash(&key);
979        let key = Arc::new(key);
980        self.get_or_insert_with_hash_and_fun(key, hash, init, Some(replace_if), false)
981            .into_value()
982    }
983
984    pub(crate) fn get_or_insert_with_hash_and_fun(
985        &self,
986        key: Arc<K>,
987        hash: u64,
988        init: impl FnOnce() -> V,
989        mut replace_if: Option<impl FnMut(&V) -> bool>,
990        need_key: bool,
991    ) -> Entry<K, V> {
992        self.base
993            .get_with_hash_and_ignore_if(&*key, hash, replace_if.as_mut(), need_key)
994            .unwrap_or_else(|| self.insert_with_hash_and_fun(key, hash, init, replace_if, need_key))
995    }
996
997    // Need to create new function instead of using the existing
998    // `get_or_insert_with_hash_and_fun`. The reason is `by_ref` function will
999    // require key reference to have `ToOwned` trait. If we modify the existing
1000    // `get_or_insert_with_hash_and_fun` function, it will require all the existing
1001    // apis that depends on it to make the `K` to have `ToOwned` trait.
1002    pub(crate) fn get_or_insert_with_hash_by_ref_and_fun<Q>(
1003        &self,
1004        key: &Q,
1005        hash: u64,
1006        init: impl FnOnce() -> V,
1007        mut replace_if: Option<impl FnMut(&V) -> bool>,
1008        need_key: bool,
1009    ) -> Entry<K, V>
1010    where
1011        Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
1012    {
1013        self.base
1014            .get_with_hash_and_ignore_if(key, hash, replace_if.as_mut(), need_key)
1015            .unwrap_or_else(|| {
1016                let key = Arc::new(key.to_owned());
1017                self.insert_with_hash_and_fun(key, hash, init, replace_if, need_key)
1018            })
1019    }
1020
1021    pub(crate) fn insert_with_hash_and_fun(
1022        &self,
1023        key: Arc<K>,
1024        hash: u64,
1025        init: impl FnOnce() -> V,
1026        mut replace_if: Option<impl FnMut(&V) -> bool>,
1027        need_key: bool,
1028    ) -> Entry<K, V> {
1029        let get = || {
1030            self.base
1031                .get_with_hash_without_recording(&*key, hash, replace_if.as_mut())
1032        };
1033        let insert = |v| self.insert_with_hash(key.clone(), hash, v);
1034
1035        let k = if need_key {
1036            Some(Arc::clone(&key))
1037        } else {
1038            None
1039        };
1040
1041        let type_id = ValueInitializer::<K, V, S>::type_id_for_get_with();
1042        let post_init = ValueInitializer::<K, V, S>::post_init_for_get_with;
1043
1044        match self
1045            .value_initializer
1046            .try_init_or_read(&key, type_id, get, init, insert, post_init)
1047        {
1048            InitResult::Initialized(v) => {
1049                crossbeam_epoch::pin().flush();
1050                Entry::new(k, v, true, false)
1051            }
1052            InitResult::ReadExisting(v) => Entry::new(k, v, false, false),
1053            InitResult::InitErr(_) => unreachable!(),
1054        }
1055    }
1056
1057    pub(crate) fn get_or_insert_with_hash(
1058        &self,
1059        key: Arc<K>,
1060        hash: u64,
1061        init: impl FnOnce() -> V,
1062    ) -> Entry<K, V> {
1063        match self.base.get_with_hash(&*key, hash, true) {
1064            Some(entry) => entry,
1065            None => {
1066                let value = init();
1067                self.insert_with_hash(Arc::clone(&key), hash, value.clone());
1068                Entry::new(Some(key), value, true, false)
1069            }
1070        }
1071    }
1072
1073    pub(crate) fn get_or_insert_with_hash_by_ref<Q>(
1074        &self,
1075        key: &Q,
1076        hash: u64,
1077        init: impl FnOnce() -> V,
1078    ) -> Entry<K, V>
1079    where
1080        Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
1081    {
1082        match self.base.get_with_hash(key, hash, true) {
1083            Some(entry) => entry,
1084            None => {
1085                let key = Arc::new(key.to_owned());
1086                let value = init();
1087                self.insert_with_hash(Arc::clone(&key), hash, value.clone());
1088                Entry::new(Some(key), value, true, false)
1089            }
1090        }
1091    }
1092
1093    /// Returns a _clone_ of the value corresponding to the key. If the value does
1094    /// not exist, evaluates the `init` closure, and inserts the value if
1095    /// `Some(value)` was returned. If `None` was returned from the closure, this
1096    /// method does not insert a value and returns `None`.
1097    ///
1098    /// # Concurrent calls on the same key
1099    ///
1100    /// This method guarantees that concurrent calls on the same not-existing key are
1101    /// coalesced into one evaluation of the `init` closure. Only one of the calls
1102    /// evaluates its closure, and other calls wait for that closure to complete.
1103    ///
1104    /// The following code snippet demonstrates this behavior:
1105    ///
1106    /// ```rust
1107    /// use moka::sync::Cache;
1108    /// use std::{path::Path, thread};
1109    ///
1110    /// /// This function tries to get the file size in bytes.
1111    /// fn get_file_size(thread_id: u8, path: impl AsRef<Path>) -> Option<u64> {
1112    ///     println!("get_file_size() called by thread {thread_id}.");
1113    ///     std::fs::metadata(path).ok().map(|m| m.len())
1114    /// }
1115    ///
1116    /// let cache = Cache::new(100);
1117    ///
1118    /// // Spawn four threads.
1119    /// let threads: Vec<_> = (0..4_u8)
1120    ///     .map(|thread_id| {
1121    ///         let my_cache = cache.clone();
1122    ///         thread::spawn(move || {
1123    ///             println!("Thread {thread_id} started.");
1124    ///
1125    ///             // Try to insert and get the value for key1. Although all four
1126    ///             // threads will call `optionally_get_with` at the same time,
1127    ///             // get_file_size() must be called only once.
1128    ///             let value = my_cache.optionally_get_with(
1129    ///                 "key1",
1130    ///                 || get_file_size(thread_id, "./Cargo.toml"),
1131    ///             );
1132    ///
1133    ///             // Ensure the value exists now.
1134    ///             assert!(value.is_some());
1135    ///             assert!(my_cache.get(&"key1").is_some());
1136    ///
1137    ///             println!(
1138    ///                 "Thread {thread_id} got the value. (len: {})",
1139    ///                 value.unwrap()
1140    ///             );
1141    ///         })
1142    ///     })
1143    ///     .collect();
1144    ///
1145    /// // Wait all threads to complete.
1146    /// threads
1147    ///     .into_iter()
1148    ///     .for_each(|t| t.join().expect("Thread failed"));
1149    /// ```
1150    ///
1151    /// **Result**
1152    ///
1153    /// - `get_file_size()` was called exactly once by thread 0.
1154    /// - Other threads were blocked until thread 0 inserted the value.
1155    ///
1156    /// ```console
1157    /// Thread 0 started.
1158    /// Thread 1 started.
1159    /// Thread 2 started.
1160    /// get_file_size() called by thread 0.
1161    /// Thread 3 started.
1162    /// Thread 2 got the value. (len: 1466)
1163    /// Thread 0 got the value. (len: 1466)
1164    /// Thread 1 got the value. (len: 1466)
1165    /// Thread 3 got the value. (len: 1466)
1166    /// ```
1167    ///
1168    /// # Panics
1169    ///
1170    /// This method panics when the `init` closure has panicked. When it happens,
1171    /// only the caller whose `init` closure panicked will get the panic (e.g. only
1172    /// thread 1 in the above sample). If there are other calls in progress (e.g.
1173    /// thread 0, 2 and 3 above), this method will restart and resolve one of the
1174    /// remaining `init` closure.
1175    ///
1176    pub fn optionally_get_with<F>(&self, key: K, init: F) -> Option<V>
1177    where
1178        F: FnOnce() -> Option<V>,
1179    {
1180        let hash = self.base.hash(&key);
1181        let key = Arc::new(key);
1182
1183        self.get_or_optionally_insert_with_hash_and_fun(key, hash, init, false)
1184            .map(Entry::into_value)
1185    }
1186
1187    /// Similar to [`optionally_get_with`](#method.optionally_get_with), but instead
1188    /// of passing an owned key, you can pass a reference to the key. If the key does
1189    /// not exist in the cache, the key will be cloned to create new entry in the
1190    /// cache.
1191    pub fn optionally_get_with_by_ref<F, Q>(&self, key: &Q, init: F) -> Option<V>
1192    where
1193        F: FnOnce() -> Option<V>,
1194        Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
1195    {
1196        let hash = self.base.hash(key);
1197        self.get_or_optionally_insert_with_hash_by_ref_and_fun(key, hash, init, false)
1198            .map(Entry::into_value)
1199    }
1200
1201    pub(super) fn get_or_optionally_insert_with_hash_and_fun<F>(
1202        &self,
1203        key: Arc<K>,
1204        hash: u64,
1205        init: F,
1206        need_key: bool,
1207    ) -> Option<Entry<K, V>>
1208    where
1209        F: FnOnce() -> Option<V>,
1210    {
1211        let entry = self.get_with_hash(&*key, hash, need_key);
1212        if entry.is_some() {
1213            return entry;
1214        }
1215
1216        self.optionally_insert_with_hash_and_fun(key, hash, init, need_key)
1217    }
1218
1219    pub(super) fn get_or_optionally_insert_with_hash_by_ref_and_fun<F, Q>(
1220        &self,
1221        key: &Q,
1222        hash: u64,
1223        init: F,
1224        need_key: bool,
1225    ) -> Option<Entry<K, V>>
1226    where
1227        F: FnOnce() -> Option<V>,
1228        Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
1229    {
1230        let entry = self.get_with_hash(key, hash, need_key);
1231        if entry.is_some() {
1232            return entry;
1233        }
1234
1235        let key = Arc::new(key.to_owned());
1236        self.optionally_insert_with_hash_and_fun(key, hash, init, need_key)
1237    }
1238
1239    pub(super) fn optionally_insert_with_hash_and_fun<F>(
1240        &self,
1241        key: Arc<K>,
1242        hash: u64,
1243        init: F,
1244        need_key: bool,
1245    ) -> Option<Entry<K, V>>
1246    where
1247        F: FnOnce() -> Option<V>,
1248    {
1249        let get = || {
1250            let ignore_if = None as Option<&mut fn(&V) -> bool>;
1251            self.base
1252                .get_with_hash_without_recording(&*key, hash, ignore_if)
1253        };
1254        let insert = |v| self.insert_with_hash(key.clone(), hash, v);
1255
1256        let k = if need_key {
1257            Some(Arc::clone(&key))
1258        } else {
1259            None
1260        };
1261
1262        let type_id = ValueInitializer::<K, V, S>::type_id_for_optionally_get_with();
1263        let post_init = ValueInitializer::<K, V, S>::post_init_for_optionally_get_with;
1264
1265        match self
1266            .value_initializer
1267            .try_init_or_read(&key, type_id, get, init, insert, post_init)
1268        {
1269            InitResult::Initialized(v) => {
1270                crossbeam_epoch::pin().flush();
1271                Some(Entry::new(k, v, true, false))
1272            }
1273            InitResult::ReadExisting(v) => Some(Entry::new(k, v, false, false)),
1274            InitResult::InitErr(_) => {
1275                crossbeam_epoch::pin().flush();
1276                None
1277            }
1278        }
1279    }
1280
1281    /// Returns a _clone_ of the value corresponding to the key. If the value does
1282    /// not exist, evaluates the `init` closure, and inserts the value if `Ok(value)`
1283    /// was returned. If `Err(_)` was returned from the closure, this method does not
1284    /// insert a value and returns the `Err` wrapped by [`std::sync::Arc`][std-arc].
1285    ///
1286    /// [std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html
1287    ///
1288    /// # Concurrent calls on the same key
1289    ///
1290    /// This method guarantees that concurrent calls on the same not-existing key are
1291    /// coalesced into one evaluation of the `init` closure (as long as these
1292    /// closures return the same error type). Only one of the calls evaluates its
1293    /// closure, and other calls wait for that closure to complete.
1294    ///
1295    /// The following code snippet demonstrates this behavior:
1296    ///
1297    /// ```rust
1298    /// use moka::sync::Cache;
1299    /// use std::{path::Path, thread};
1300    ///
1301    /// /// This function tries to get the file size in bytes.
1302    /// fn get_file_size(thread_id: u8, path: impl AsRef<Path>) -> Result<u64, std::io::Error> {
1303    ///     println!("get_file_size() called by thread {thread_id}.");
1304    ///     Ok(std::fs::metadata(path)?.len())
1305    /// }
1306    ///
1307    /// let cache = Cache::new(100);
1308    ///
1309    /// // Spawn four threads.
1310    /// let threads: Vec<_> = (0..4_u8)
1311    ///     .map(|thread_id| {
1312    ///         let my_cache = cache.clone();
1313    ///         thread::spawn(move || {
1314    ///             println!("Thread {thread_id} started.");
1315    ///
1316    ///             // Try to insert and get the value for key1. Although all four
1317    ///             // threads will call `try_get_with` at the same time,
1318    ///             // get_file_size() must be called only once.
1319    ///             let value = my_cache.try_get_with(
1320    ///                 "key1",
1321    ///                 || get_file_size(thread_id, "./Cargo.toml"),
1322    ///             );
1323    ///
1324    ///             // Ensure the value exists now.
1325    ///             assert!(value.is_ok());
1326    ///             assert!(my_cache.get(&"key1").is_some());
1327    ///
1328    ///             println!(
1329    ///                 "Thread {thread_id} got the value. (len: {})",
1330    ///                 value.unwrap()
1331    ///             );
1332    ///         })
1333    ///     })
1334    ///     .collect();
1335    ///
1336    /// // Wait all threads to complete.
1337    /// threads
1338    ///     .into_iter()
1339    ///     .for_each(|t| t.join().expect("Thread failed"));
1340    /// ```
1341    ///
1342    /// **Result**
1343    ///
1344    /// - `get_file_size()` was called exactly once by thread 1.
1345    /// - Other threads were blocked until thread 1 inserted the value.
1346    ///
1347    /// ```console
1348    /// Thread 1 started.
1349    /// Thread 2 started.
1350    /// get_file_size() called by thread 1.
1351    /// Thread 3 started.
1352    /// Thread 0 started.
1353    /// Thread 2 got the value. (len: 1466)
1354    /// Thread 0 got the value. (len: 1466)
1355    /// Thread 1 got the value. (len: 1466)
1356    /// Thread 3 got the value. (len: 1466)
1357    /// ```
1358    ///
1359    /// # Panics
1360    ///
1361    /// This method panics when the `init` closure has panicked. When it happens,
1362    /// only the caller whose `init` closure panicked will get the panic (e.g. only
1363    /// thread 1 in the above sample). If there are other calls in progress (e.g.
1364    /// thread 0, 2 and 3 above), this method will restart and resolve one of the
1365    /// remaining `init` closure.
1366    ///
1367    pub fn try_get_with<F, E>(&self, key: K, init: F) -> Result<V, Arc<E>>
1368    where
1369        F: FnOnce() -> Result<V, E>,
1370        E: Send + Sync + 'static,
1371    {
1372        let hash = self.base.hash(&key);
1373        let key = Arc::new(key);
1374        self.get_or_try_insert_with_hash_and_fun(key, hash, init, false)
1375            .map(Entry::into_value)
1376    }
1377
1378    /// Similar to [`try_get_with`](#method.try_get_with), but instead of passing an
1379    /// owned key, you can pass a reference to the key. If the key does not exist in
1380    /// the cache, the key will be cloned to create new entry in the cache.
1381    pub fn try_get_with_by_ref<F, E, Q>(&self, key: &Q, init: F) -> Result<V, Arc<E>>
1382    where
1383        F: FnOnce() -> Result<V, E>,
1384        E: Send + Sync + 'static,
1385        Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
1386    {
1387        let hash = self.base.hash(key);
1388        self.get_or_try_insert_with_hash_by_ref_and_fun(key, hash, init, false)
1389            .map(Entry::into_value)
1390    }
1391
1392    pub(crate) fn get_or_try_insert_with_hash_and_fun<F, E>(
1393        &self,
1394        key: Arc<K>,
1395        hash: u64,
1396        init: F,
1397        need_key: bool,
1398    ) -> Result<Entry<K, V>, Arc<E>>
1399    where
1400        F: FnOnce() -> Result<V, E>,
1401        E: Send + Sync + 'static,
1402    {
1403        if let Some(entry) = self.get_with_hash(&*key, hash, need_key) {
1404            return Ok(entry);
1405        }
1406
1407        self.try_insert_with_hash_and_fun(key, hash, init, need_key)
1408    }
1409
1410    pub(crate) fn get_or_try_insert_with_hash_by_ref_and_fun<F, Q, E>(
1411        &self,
1412        key: &Q,
1413        hash: u64,
1414        init: F,
1415        need_key: bool,
1416    ) -> Result<Entry<K, V>, Arc<E>>
1417    where
1418        F: FnOnce() -> Result<V, E>,
1419        E: Send + Sync + 'static,
1420        Q: Equivalent<K> + ToOwned<Owned = K> + Hash + ?Sized,
1421    {
1422        if let Some(entry) = self.get_with_hash(key, hash, false) {
1423            return Ok(entry);
1424        }
1425
1426        let key = Arc::new(key.to_owned());
1427        self.try_insert_with_hash_and_fun(key, hash, init, need_key)
1428    }
1429
1430    pub(crate) fn try_insert_with_hash_and_fun<F, E>(
1431        &self,
1432        key: Arc<K>,
1433        hash: u64,
1434        init: F,
1435        need_key: bool,
1436    ) -> Result<Entry<K, V>, Arc<E>>
1437    where
1438        F: FnOnce() -> Result<V, E>,
1439        E: Send + Sync + 'static,
1440    {
1441        let get = || {
1442            let ignore_if = None as Option<&mut fn(&V) -> bool>;
1443            self.base
1444                .get_with_hash_without_recording(&*key, hash, ignore_if)
1445        };
1446        let insert = |v| self.insert_with_hash(key.clone(), hash, v);
1447
1448        let k = if need_key {
1449            Some(Arc::clone(&key))
1450        } else {
1451            None
1452        };
1453
1454        let type_id = ValueInitializer::<K, V, S>::type_id_for_try_get_with::<E>();
1455        let post_init = ValueInitializer::<K, V, S>::post_init_for_try_get_with;
1456
1457        match self
1458            .value_initializer
1459            .try_init_or_read(&key, type_id, get, init, insert, post_init)
1460        {
1461            InitResult::Initialized(v) => {
1462                crossbeam_epoch::pin().flush();
1463                Ok(Entry::new(k, v, true, false))
1464            }
1465            InitResult::ReadExisting(v) => Ok(Entry::new(k, v, false, false)),
1466            InitResult::InitErr(e) => {
1467                crossbeam_epoch::pin().flush();
1468                Err(e)
1469            }
1470        }
1471    }
1472
1473    /// Inserts a key-value pair into the cache.
1474    ///
1475    /// If the cache has this key present, the value is updated.
1476    pub fn insert(&self, key: K, value: V) {
1477        let hash = self.base.hash(&key);
1478        let key = Arc::new(key);
1479        self.insert_with_hash(key, hash, value);
1480    }
1481
1482    pub(crate) fn insert_with_hash(&self, key: Arc<K>, hash: u64, value: V) {
1483        if self.base.is_map_disabled() {
1484            return;
1485        }
1486
1487        let (op, now) = self.base.do_insert_with_hash(key, hash, value);
1488        let hk = self.base.housekeeper.as_ref();
1489        Self::schedule_write_op(
1490            self.base.inner.as_ref(),
1491            &self.base.write_op_ch,
1492            op,
1493            now,
1494            hk,
1495        )
1496        .expect("Failed to insert");
1497    }
1498
1499    pub(crate) fn compute_with_hash_and_fun<F>(
1500        &self,
1501        key: Arc<K>,
1502        hash: u64,
1503        f: F,
1504    ) -> compute::CompResult<K, V>
1505    where
1506        F: FnOnce(Option<Entry<K, V>>) -> compute::Op<V>,
1507    {
1508        let post_init = ValueInitializer::<K, V, S>::post_init_for_compute_with;
1509        match self
1510            .value_initializer
1511            .try_compute(key, hash, self, f, post_init, true)
1512        {
1513            Ok(result) => result,
1514            Err(_) => unreachable!(),
1515        }
1516    }
1517
1518    pub(crate) fn try_compute_with_hash_and_fun<F, E>(
1519        &self,
1520        key: Arc<K>,
1521        hash: u64,
1522        f: F,
1523    ) -> Result<compute::CompResult<K, V>, E>
1524    where
1525        F: FnOnce(Option<Entry<K, V>>) -> Result<compute::Op<V>, E>,
1526        E: Send + Sync + 'static,
1527    {
1528        let post_init = ValueInitializer::<K, V, S>::post_init_for_try_compute_with;
1529        self.value_initializer
1530            .try_compute(key, hash, self, f, post_init, true)
1531    }
1532
1533    pub(crate) fn upsert_with_hash_and_fun<F>(&self, key: Arc<K>, hash: u64, f: F) -> Entry<K, V>
1534    where
1535        F: FnOnce(Option<Entry<K, V>>) -> V,
1536    {
1537        let post_init = ValueInitializer::<K, V, S>::post_init_for_upsert_with;
1538        match self
1539            .value_initializer
1540            .try_compute(key, hash, self, f, post_init, false)
1541        {
1542            Ok(CompResult::Inserted(entry) | CompResult::ReplacedWith(entry)) => entry,
1543            _ => unreachable!(),
1544        }
1545    }
1546
1547    /// Discards any cached value for the key.
1548    ///
1549    /// If you need to get a the value that has been discarded, use the
1550    /// [`remove`](#method.remove) method instead.
1551    ///
1552    /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
1553    /// on the borrowed form _must_ match those for the key type.
1554    pub fn invalidate<Q>(&self, key: &Q)
1555    where
1556        Q: Equivalent<K> + Hash + ?Sized,
1557    {
1558        let hash = self.base.hash(key);
1559        self.invalidate_with_hash(key, hash, false);
1560    }
1561
1562    /// Discards any cached value for the key and returns a _clone_ of the value.
1563    ///
1564    /// If you do not need to get the value that has been discarded, use the
1565    /// [`invalidate`](#method.invalidate) method instead.
1566    ///
1567    /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
1568    /// on the borrowed form _must_ match those for the key type.
1569    pub fn remove<Q>(&self, key: &Q) -> Option<V>
1570    where
1571        Q: Equivalent<K> + Hash + ?Sized,
1572    {
1573        let hash = self.base.hash(key);
1574        self.invalidate_with_hash(key, hash, true)
1575    }
1576
1577    pub(crate) fn invalidate_with_hash<Q>(&self, key: &Q, hash: u64, need_value: bool) -> Option<V>
1578    where
1579        Q: Equivalent<K> + Hash + ?Sized,
1580    {
1581        // Lock the key for removal if blocking removal notification is enabled.
1582        let mut kl = None;
1583        let mut klg = None;
1584        if self.base.is_removal_notifier_enabled() {
1585            // To lock the key, we have to get Arc<K> for key (&Q).
1586            //
1587            // TODO: Enhance this if possible. This is rather hack now because
1588            // it cannot prevent race conditions like this:
1589            //
1590            // 1. We miss the key because it does not exist. So we do not lock
1591            //    the key.
1592            // 2. Somebody else (other thread) inserts the key.
1593            // 3. We remove the entry for the key, but without the key lock!
1594            //
1595            if let Some(arc_key) = self.base.get_key_with_hash(key, hash) {
1596                kl = self.base.maybe_key_lock(&arc_key);
1597                klg = kl.as_ref().map(|kl| kl.lock());
1598            }
1599        }
1600
1601        match self.base.remove_entry(key, hash) {
1602            None => None,
1603            Some(kv) => {
1604                let now = self.base.current_time();
1605
1606                let info = kv.entry.entry_info();
1607                let entry_gen = info.incr_entry_gen();
1608
1609                if self.base.is_removal_notifier_enabled() {
1610                    self.base.notify_invalidate(&kv.key, &kv.entry);
1611                }
1612                // Drop the locks before scheduling write op to avoid a potential
1613                // dead lock. (Scheduling write can do spin lock when the queue is
1614                // full, and queue will be drained by the housekeeping thread that
1615                // can lock the same key)
1616                std::mem::drop(klg);
1617                std::mem::drop(kl);
1618
1619                let maybe_v = if need_value {
1620                    Some(kv.entry.value.clone())
1621                } else {
1622                    None
1623                };
1624
1625                let op = WriteOp::Remove {
1626                    kv_entry: kv,
1627                    entry_gen,
1628                };
1629                let hk = self.base.housekeeper.as_ref();
1630                Self::schedule_write_op(
1631                    self.base.inner.as_ref(),
1632                    &self.base.write_op_ch,
1633                    op,
1634                    now,
1635                    hk,
1636                )
1637                .expect("Failed to remove");
1638                crossbeam_epoch::pin().flush();
1639                maybe_v
1640            }
1641        }
1642    }
1643
1644    /// Discards all cached values.
1645    ///
1646    /// This method returns immediately by just setting the current time as the
1647    /// invalidation time. `get` and other retrieval methods are guaranteed not to
1648    /// return the entries inserted before or at the invalidation time.
1649    ///
1650    /// The actual removal of the invalidated entries is done as a maintenance task
1651    /// driven by a user thread. For more details, see
1652    /// [the Maintenance Tasks section](../index.html#maintenance-tasks) in the crate
1653    /// level documentation.
1654    ///
1655    /// Like the `invalidate` method, this method does not clear the historic
1656    /// popularity estimator of keys so that it retains the client activities of
1657    /// trying to retrieve an item.
1658    pub fn invalidate_all(&self) {
1659        self.base.invalidate_all();
1660    }
1661
1662    /// Discards cached values that satisfy a predicate.
1663    ///
1664    /// `invalidate_entries_if` takes a closure that returns `true` or `false`. The
1665    /// closure is called against each cached entry inserted before or at the time
1666    /// when this method was called. If the closure returns `true` that entry will be
1667    /// evicted from the cache.
1668    ///
1669    /// This method returns immediately by not actually removing the invalidated
1670    /// entries. Instead, it just sets the predicate to the cache with the time when
1671    /// this method was called. The actual removal of the invalidated entries is done
1672    /// as a maintenance task driven by a user thread. For more details, see
1673    /// [the Maintenance Tasks section](../index.html#maintenance-tasks) in the crate
1674    /// level documentation.
1675    ///
1676    /// Also the `get` and other retrieval methods will apply the closure to a cached
1677    /// entry to determine if it should have been invalidated. Therefore, it is
1678    /// guaranteed that these methods must not return invalidated values.
1679    ///
1680    /// Note that you must call
1681    /// [`CacheBuilder::support_invalidation_closures`][support-invalidation-closures]
1682    /// at the cache creation time as the cache needs to maintain additional internal
1683    /// data structures to support this method. Otherwise, calling this method will
1684    /// fail with a
1685    /// [`PredicateError::InvalidationClosuresDisabled`][invalidation-disabled-error].
1686    ///
1687    /// Like the `invalidate` method, this method does not clear the historic
1688    /// popularity estimator of keys so that it retains the client activities of
1689    /// trying to retrieve an item.
1690    ///
1691    /// [support-invalidation-closures]:
1692    ///     ./struct.CacheBuilder.html#method.support_invalidation_closures
1693    /// [invalidation-disabled-error]:
1694    ///     ../enum.PredicateError.html#variant.InvalidationClosuresDisabled
1695    pub fn invalidate_entries_if<F>(&self, predicate: F) -> Result<PredicateId, PredicateError>
1696    where
1697        F: Fn(&K, &V) -> bool + Send + Sync + 'static,
1698    {
1699        self.base.invalidate_entries_if(Arc::new(predicate))
1700    }
1701
1702    pub(crate) fn invalidate_entries_with_arc_fun<F>(
1703        &self,
1704        predicate: Arc<F>,
1705    ) -> Result<PredicateId, PredicateError>
1706    where
1707        F: Fn(&K, &V) -> bool + Send + Sync + 'static,
1708    {
1709        self.base.invalidate_entries_if(predicate)
1710    }
1711
1712    /// Creates an iterator visiting all key-value pairs in arbitrary order. The
1713    /// iterator element type is `(Arc<K>, V)`, where `V` is a clone of a stored
1714    /// value.
1715    ///
1716    /// Iterators do not block concurrent reads and writes on the cache. An entry can
1717    /// be inserted to, invalidated or evicted from a cache while iterators are alive
1718    /// on the same cache.
1719    ///
1720    /// Unlike the `get` method, visiting entries via an iterator do not update the
1721    /// historic popularity estimator or reset idle timers for keys.
1722    ///
1723    /// # Guarantees
1724    ///
1725    /// In order to allow concurrent access to the cache, iterator's `next` method
1726    /// does _not_ guarantee the following:
1727    ///
1728    /// - It does not guarantee to return a key-value pair (an entry) if its key has
1729    ///   been inserted to the cache _after_ the iterator was created.
1730    ///   - Such an entry may or may not be returned depending on key's hash and
1731    ///     timing.
1732    ///
1733    /// and the `next` method guarantees the followings:
1734    ///
1735    /// - It guarantees not to return the same entry more than once.
1736    /// - It guarantees not to return an entry if it has been removed from the cache
1737    ///   after the iterator was created.
1738    ///     - Note: An entry can be removed by following reasons:
1739    ///         - Manually invalidated.
1740    ///         - Expired (e.g. time-to-live).
1741    ///         - Evicted as the cache capacity exceeded.
1742    ///
1743    /// # Examples
1744    ///
1745    /// ```rust
1746    /// use moka::sync::Cache;
1747    ///
1748    /// let cache = Cache::new(100);
1749    /// cache.insert("Julia", 14);
1750    ///
1751    /// let mut iter = cache.iter();
1752    /// let (k, v) = iter.next().unwrap(); // (Arc<K>, V)
1753    /// assert_eq!(*k, "Julia");
1754    /// assert_eq!(v, 14);
1755    ///
1756    /// assert!(iter.next().is_none());
1757    /// ```
1758    ///
1759    pub fn iter(&self) -> Iter<'_, K, V> {
1760        Iter::with_single_cache_segment(&self.base, self.num_cht_segments())
1761    }
1762
1763    /// Performs any pending maintenance operations needed by the cache.
1764    pub fn run_pending_tasks(&self) {
1765        if let Some(hk) = &self.base.housekeeper {
1766            hk.run_pending_tasks(&*self.base.inner);
1767        }
1768    }
1769}
1770
1771impl<'a, K, V, S> IntoIterator for &'a Cache<K, V, S>
1772where
1773    K: Hash + Eq + Send + Sync + 'static,
1774    V: Clone + Send + Sync + 'static,
1775    S: BuildHasher + Clone + Send + Sync + 'static,
1776{
1777    type Item = (Arc<K>, V);
1778
1779    type IntoIter = Iter<'a, K, V>;
1780
1781    fn into_iter(self) -> Self::IntoIter {
1782        self.iter()
1783    }
1784}
1785
1786//
1787// Iterator support
1788//
1789impl<K, V, S> ScanningGet<K, V> for Cache<K, V, S>
1790where
1791    K: Hash + Eq + Send + Sync + 'static,
1792    V: Clone + Send + Sync + 'static,
1793    S: BuildHasher + Clone + Send + Sync + 'static,
1794{
1795    fn num_cht_segments(&self) -> usize {
1796        self.base.num_cht_segments()
1797    }
1798
1799    fn scanning_get(&self, key: &Arc<K>) -> Option<V> {
1800        self.base.scanning_get(key)
1801    }
1802
1803    fn keys(&self, cht_segment: usize) -> Option<Vec<Arc<K>>> {
1804        self.base.keys(cht_segment)
1805    }
1806}
1807
1808//
1809// private methods
1810//
1811impl<K, V, S> Cache<K, V, S>
1812where
1813    K: Hash + Eq + Send + Sync + 'static,
1814    V: Clone + Send + Sync + 'static,
1815    S: BuildHasher + Clone + Send + Sync + 'static,
1816{
1817    // TODO: Like future::Cache, move this method to BaseCache.
1818    #[inline]
1819    fn schedule_write_op(
1820        inner: &impl InnerSync,
1821        ch: &Sender<WriteOp<K, V>>,
1822        op: WriteOp<K, V>,
1823        now: Instant,
1824        housekeeper: Option<&HouseKeeperArc>,
1825    ) -> Result<(), TrySendError<WriteOp<K, V>>> {
1826        let mut op = op;
1827
1828        // NOTES:
1829        // - This will block when the channel is full.
1830        // - We are doing a busy-loop here. We were originally calling `ch.send(op)?`,
1831        //   but we got a notable performance degradation.
1832        loop {
1833            BaseCache::<K, V, S>::apply_reads_writes_if_needed(inner, ch, now, housekeeper);
1834            match ch.try_send(op) {
1835                Ok(()) => break,
1836                Err(TrySendError::Full(op1)) => {
1837                    op = op1;
1838                    std::thread::sleep(Duration::from_micros(WRITE_RETRY_INTERVAL_MICROS));
1839                }
1840                Err(e @ TrySendError::Disconnected(_)) => return Err(e),
1841            }
1842        }
1843        Ok(())
1844    }
1845}
1846
1847// For unit tests.
1848#[cfg(test)]
1849impl<K, V, S> Cache<K, V, S> {
1850    pub(crate) fn is_table_empty(&self) -> bool {
1851        self.entry_count() == 0
1852    }
1853
1854    pub(crate) fn is_waiter_map_empty(&self) -> bool {
1855        self.value_initializer.waiter_count() == 0
1856    }
1857}
1858
1859#[cfg(test)]
1860impl<K, V, S> Cache<K, V, S>
1861where
1862    K: Hash + Eq + Send + Sync + 'static,
1863    V: Clone + Send + Sync + 'static,
1864    S: BuildHasher + Clone + Send + Sync + 'static,
1865{
1866    pub(crate) fn invalidation_predicate_count(&self) -> usize {
1867        self.base.invalidation_predicate_count()
1868    }
1869
1870    pub(crate) fn reconfigure_for_testing(&mut self) {
1871        self.base.reconfigure_for_testing();
1872    }
1873
1874    pub(crate) fn key_locks_map_is_empty(&self) -> bool {
1875        self.base.key_locks_map_is_empty()
1876    }
1877}
1878
1879// To see the debug prints, run test as `cargo test -- --nocapture`
1880#[cfg(test)]
1881mod tests {
1882    use super::Cache;
1883    use crate::{
1884        common::{time::Clock, HousekeeperConfig},
1885        notification::RemovalCause,
1886        policy::{test_utils::ExpiryCallCounters, EvictionPolicy},
1887        Expiry,
1888    };
1889
1890    use parking_lot::Mutex;
1891    use std::{
1892        convert::Infallible,
1893        sync::{
1894            atomic::{AtomicU8, Ordering},
1895            Arc,
1896        },
1897        time::{Duration, Instant as StdInstant},
1898    };
1899
1900    #[test]
1901    fn max_capacity_zero() {
1902        let mut cache = Cache::new(0);
1903        cache.reconfigure_for_testing();
1904
1905        // Make the cache exterior immutable.
1906        let cache = cache;
1907
1908        cache.insert(0, ());
1909
1910        assert!(!cache.contains_key(&0));
1911        assert!(cache.get(&0).is_none());
1912        cache.run_pending_tasks();
1913        assert!(!cache.contains_key(&0));
1914        assert!(cache.get(&0).is_none());
1915        assert_eq!(cache.entry_count(), 0)
1916    }
1917
1918    #[test]
1919    fn basic_single_thread() {
1920        // The following `Vec`s will hold actual and expected notifications.
1921        let actual = Arc::new(Mutex::new(Vec::new()));
1922        let mut expected = Vec::new();
1923
1924        // Create an eviction listener.
1925        let a1 = Arc::clone(&actual);
1926        let listener = move |k, v, cause| a1.lock().push((k, v, cause));
1927
1928        // Create a cache with the eviction listener.
1929        let mut cache = Cache::builder()
1930            .max_capacity(3)
1931            .eviction_listener(listener)
1932            .build();
1933        cache.reconfigure_for_testing();
1934
1935        // Make the cache exterior immutable.
1936        let cache = cache;
1937
1938        cache.insert("a", "alice");
1939        cache.insert("b", "bob");
1940        assert_eq!(cache.get(&"a"), Some("alice"));
1941        assert!(cache.contains_key(&"a"));
1942        assert!(cache.contains_key(&"b"));
1943        assert_eq!(cache.get(&"b"), Some("bob"));
1944        cache.run_pending_tasks();
1945        // counts: a -> 1, b -> 1
1946
1947        cache.insert("c", "cindy");
1948        assert_eq!(cache.get(&"c"), Some("cindy"));
1949        assert!(cache.contains_key(&"c"));
1950        // counts: a -> 1, b -> 1, c -> 1
1951        cache.run_pending_tasks();
1952
1953        assert!(cache.contains_key(&"a"));
1954        assert_eq!(cache.get(&"a"), Some("alice"));
1955        assert_eq!(cache.get(&"b"), Some("bob"));
1956        assert!(cache.contains_key(&"b"));
1957        cache.run_pending_tasks();
1958        // counts: a -> 2, b -> 2, c -> 1
1959
1960        // "d" should not be admitted because its frequency is too low.
1961        cache.insert("d", "david"); //   count: d -> 0
1962        expected.push((Arc::new("d"), "david", RemovalCause::Size));
1963        cache.run_pending_tasks();
1964        assert_eq!(cache.get(&"d"), None); //   d -> 1
1965        assert!(!cache.contains_key(&"d"));
1966
1967        cache.insert("d", "david");
1968        expected.push((Arc::new("d"), "david", RemovalCause::Size));
1969        cache.run_pending_tasks();
1970        assert!(!cache.contains_key(&"d"));
1971        assert_eq!(cache.get(&"d"), None); //   d -> 2
1972
1973        // "d" should be admitted and "c" should be evicted
1974        // because d's frequency is higher than c's.
1975        cache.insert("d", "dennis");
1976        expected.push((Arc::new("c"), "cindy", RemovalCause::Size));
1977        cache.run_pending_tasks();
1978        assert_eq!(cache.get(&"a"), Some("alice"));
1979        assert_eq!(cache.get(&"b"), Some("bob"));
1980        assert_eq!(cache.get(&"c"), None);
1981        assert_eq!(cache.get(&"d"), Some("dennis"));
1982        assert!(cache.contains_key(&"a"));
1983        assert!(cache.contains_key(&"b"));
1984        assert!(!cache.contains_key(&"c"));
1985        assert!(cache.contains_key(&"d"));
1986
1987        cache.invalidate(&"b");
1988        expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
1989        cache.run_pending_tasks();
1990        assert_eq!(cache.get(&"b"), None);
1991        assert!(!cache.contains_key(&"b"));
1992
1993        assert!(cache.remove(&"b").is_none());
1994        assert_eq!(cache.remove(&"d"), Some("dennis"));
1995        expected.push((Arc::new("d"), "dennis", RemovalCause::Explicit));
1996        cache.run_pending_tasks();
1997        assert_eq!(cache.get(&"d"), None);
1998        assert!(!cache.contains_key(&"d"));
1999
2000        verify_notification_vec(&cache, actual, &expected);
2001        assert!(cache.key_locks_map_is_empty());
2002    }
2003
2004    #[test]
2005    fn basic_lru_single_thread() {
2006        // The following `Vec`s will hold actual and expected notifications.
2007        let actual = Arc::new(Mutex::new(Vec::new()));
2008        let mut expected = Vec::new();
2009
2010        // Create an eviction listener.
2011        let a1 = Arc::clone(&actual);
2012        let listener = move |k, v, cause| a1.lock().push((k, v, cause));
2013
2014        // Create a cache with the eviction listener.
2015        let mut cache = Cache::builder()
2016            .max_capacity(3)
2017            .eviction_policy(EvictionPolicy::lru())
2018            .eviction_listener(listener)
2019            .build();
2020        cache.reconfigure_for_testing();
2021
2022        // Make the cache exterior immutable.
2023        let cache = cache;
2024
2025        cache.insert("a", "alice");
2026        cache.insert("b", "bob");
2027        assert_eq!(cache.get(&"a"), Some("alice"));
2028        assert!(cache.contains_key(&"a"));
2029        assert!(cache.contains_key(&"b"));
2030        assert_eq!(cache.get(&"b"), Some("bob"));
2031        cache.run_pending_tasks();
2032        // a -> b
2033
2034        cache.insert("c", "cindy");
2035        assert_eq!(cache.get(&"c"), Some("cindy"));
2036        assert!(cache.contains_key(&"c"));
2037        cache.run_pending_tasks();
2038        // a -> b -> c
2039
2040        assert!(cache.contains_key(&"a"));
2041        assert_eq!(cache.get(&"a"), Some("alice"));
2042        assert_eq!(cache.get(&"b"), Some("bob"));
2043        assert!(cache.contains_key(&"b"));
2044        cache.run_pending_tasks();
2045        // c -> a -> b
2046
2047        // "d" should be admitted because the cache uses the LRU strategy.
2048        cache.insert("d", "david");
2049        // "c" is the LRU and should have be evicted.
2050        expected.push((Arc::new("c"), "cindy", RemovalCause::Size));
2051        cache.run_pending_tasks();
2052
2053        assert_eq!(cache.get(&"a"), Some("alice"));
2054        assert_eq!(cache.get(&"b"), Some("bob"));
2055        assert_eq!(cache.get(&"c"), None);
2056        assert_eq!(cache.get(&"d"), Some("david"));
2057        assert!(cache.contains_key(&"a"));
2058        assert!(cache.contains_key(&"b"));
2059        assert!(!cache.contains_key(&"c"));
2060        assert!(cache.contains_key(&"d"));
2061        cache.run_pending_tasks();
2062        // a -> b -> d
2063
2064        cache.invalidate(&"b");
2065        expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
2066        cache.run_pending_tasks();
2067        // a -> d
2068        assert_eq!(cache.get(&"b"), None);
2069        assert!(!cache.contains_key(&"b"));
2070
2071        assert!(cache.remove(&"b").is_none());
2072        assert_eq!(cache.remove(&"d"), Some("david"));
2073        expected.push((Arc::new("d"), "david", RemovalCause::Explicit));
2074        cache.run_pending_tasks();
2075        // a
2076        assert_eq!(cache.get(&"d"), None);
2077        assert!(!cache.contains_key(&"d"));
2078
2079        cache.insert("e", "emily");
2080        cache.insert("f", "frank");
2081        // "a" should be evicted because it is the LRU.
2082        cache.insert("g", "gina");
2083        expected.push((Arc::new("a"), "alice", RemovalCause::Size));
2084        cache.run_pending_tasks();
2085        // e -> f -> g
2086        assert_eq!(cache.get(&"a"), None);
2087        assert_eq!(cache.get(&"e"), Some("emily"));
2088        assert_eq!(cache.get(&"f"), Some("frank"));
2089        assert_eq!(cache.get(&"g"), Some("gina"));
2090        assert!(!cache.contains_key(&"a"));
2091        assert!(cache.contains_key(&"e"));
2092        assert!(cache.contains_key(&"f"));
2093        assert!(cache.contains_key(&"g"));
2094
2095        verify_notification_vec(&cache, actual, &expected);
2096        assert!(cache.key_locks_map_is_empty());
2097    }
2098
2099    #[test]
2100    fn size_aware_eviction() {
2101        let weigher = |_k: &&str, v: &(&str, u32)| v.1;
2102
2103        let alice = ("alice", 10);
2104        let bob = ("bob", 15);
2105        let bill = ("bill", 20);
2106        let cindy = ("cindy", 5);
2107        let david = ("david", 15);
2108        let dennis = ("dennis", 15);
2109
2110        // The following `Vec`s will hold actual and expected notifications.
2111        let actual = Arc::new(Mutex::new(Vec::new()));
2112        let mut expected = Vec::new();
2113
2114        // Create an eviction listener.
2115        let a1 = Arc::clone(&actual);
2116        let listener = move |k, v, cause| a1.lock().push((k, v, cause));
2117
2118        // Create a cache with the eviction listener.
2119        let mut cache = Cache::builder()
2120            .max_capacity(31)
2121            .weigher(weigher)
2122            .eviction_listener(listener)
2123            .build();
2124        cache.reconfigure_for_testing();
2125
2126        // Make the cache exterior immutable.
2127        let cache = cache;
2128
2129        cache.insert("a", alice);
2130        cache.insert("b", bob);
2131        assert_eq!(cache.get(&"a"), Some(alice));
2132        assert!(cache.contains_key(&"a"));
2133        assert!(cache.contains_key(&"b"));
2134        assert_eq!(cache.get(&"b"), Some(bob));
2135        cache.run_pending_tasks();
2136        // order (LRU -> MRU) and counts: a -> 1, b -> 1
2137
2138        cache.insert("c", cindy);
2139        assert_eq!(cache.get(&"c"), Some(cindy));
2140        assert!(cache.contains_key(&"c"));
2141        // order and counts: a -> 1, b -> 1, c -> 1
2142        cache.run_pending_tasks();
2143
2144        assert!(cache.contains_key(&"a"));
2145        assert_eq!(cache.get(&"a"), Some(alice));
2146        assert_eq!(cache.get(&"b"), Some(bob));
2147        assert!(cache.contains_key(&"b"));
2148        cache.run_pending_tasks();
2149        // order and counts: c -> 1, a -> 2, b -> 2
2150
2151        // To enter "d" (weight: 15), it needs to evict "c" (w: 5) and "a" (w: 10).
2152        // "d" must have higher count than 3, which is the aggregated count
2153        // of "a" and "c".
2154        cache.insert("d", david); //   count: d -> 0
2155        expected.push((Arc::new("d"), david, RemovalCause::Size));
2156        cache.run_pending_tasks();
2157        assert_eq!(cache.get(&"d"), None); //   d -> 1
2158        assert!(!cache.contains_key(&"d"));
2159
2160        cache.insert("d", david);
2161        expected.push((Arc::new("d"), david, RemovalCause::Size));
2162        cache.run_pending_tasks();
2163        assert!(!cache.contains_key(&"d"));
2164        assert_eq!(cache.get(&"d"), None); //   d -> 2
2165
2166        cache.insert("d", david);
2167        expected.push((Arc::new("d"), david, RemovalCause::Size));
2168        cache.run_pending_tasks();
2169        assert_eq!(cache.get(&"d"), None); //   d -> 3
2170        assert!(!cache.contains_key(&"d"));
2171
2172        cache.insert("d", david);
2173        expected.push((Arc::new("d"), david, RemovalCause::Size));
2174        cache.run_pending_tasks();
2175        assert!(!cache.contains_key(&"d"));
2176        assert_eq!(cache.get(&"d"), None); //   d -> 4
2177
2178        // Finally "d" should be admitted by evicting "c" and "a".
2179        cache.insert("d", dennis);
2180        expected.push((Arc::new("c"), cindy, RemovalCause::Size));
2181        expected.push((Arc::new("a"), alice, RemovalCause::Size));
2182        cache.run_pending_tasks();
2183        assert_eq!(cache.get(&"a"), None);
2184        assert_eq!(cache.get(&"b"), Some(bob));
2185        assert_eq!(cache.get(&"c"), None);
2186        assert_eq!(cache.get(&"d"), Some(dennis));
2187        assert!(!cache.contains_key(&"a"));
2188        assert!(cache.contains_key(&"b"));
2189        assert!(!cache.contains_key(&"c"));
2190        assert!(cache.contains_key(&"d"));
2191
2192        // Update "b" with "bill" (w: 15 -> 20). This should evict "d" (w: 15).
2193        cache.insert("b", bill);
2194        expected.push((Arc::new("b"), bob, RemovalCause::Replaced));
2195        expected.push((Arc::new("d"), dennis, RemovalCause::Size));
2196        cache.run_pending_tasks();
2197        assert_eq!(cache.get(&"b"), Some(bill));
2198        assert_eq!(cache.get(&"d"), None);
2199        assert!(cache.contains_key(&"b"));
2200        assert!(!cache.contains_key(&"d"));
2201
2202        // Re-add "a" (w: 10) and update "b" with "bob" (w: 20 -> 15).
2203        cache.insert("a", alice);
2204        cache.insert("b", bob);
2205        expected.push((Arc::new("b"), bill, RemovalCause::Replaced));
2206        cache.run_pending_tasks();
2207        assert_eq!(cache.get(&"a"), Some(alice));
2208        assert_eq!(cache.get(&"b"), Some(bob));
2209        assert_eq!(cache.get(&"d"), None);
2210        assert!(cache.contains_key(&"a"));
2211        assert!(cache.contains_key(&"b"));
2212        assert!(!cache.contains_key(&"d"));
2213
2214        // Verify the sizes.
2215        assert_eq!(cache.entry_count(), 2);
2216        assert_eq!(cache.weighted_size(), 25);
2217
2218        verify_notification_vec(&cache, actual, &expected);
2219        assert!(cache.key_locks_map_is_empty());
2220    }
2221
2222    #[test]
2223    fn basic_multi_threads() {
2224        let num_threads = 4;
2225        let cache = Cache::new(100);
2226
2227        // https://rust-lang.github.io/rust-clippy/master/index.html#needless_collect
2228        #[allow(clippy::needless_collect)]
2229        let handles = (0..num_threads)
2230            .map(|id| {
2231                let cache = cache.clone();
2232                std::thread::spawn(move || {
2233                    cache.insert(10, format!("{id}-100"));
2234                    cache.get(&10);
2235                    cache.insert(20, format!("{id}-200"));
2236                    cache.invalidate(&10);
2237                })
2238            })
2239            .collect::<Vec<_>>();
2240
2241        handles.into_iter().for_each(|h| h.join().expect("Failed"));
2242
2243        assert!(cache.get(&10).is_none());
2244        assert!(cache.get(&20).is_some());
2245        assert!(!cache.contains_key(&10));
2246        assert!(cache.contains_key(&20));
2247    }
2248
2249    #[test]
2250    fn invalidate_all() {
2251        // The following `Vec`s will hold actual and expected notifications.
2252        let actual = Arc::new(Mutex::new(Vec::new()));
2253        let mut expected = Vec::new();
2254
2255        // Create an eviction listener.
2256        let a1 = Arc::clone(&actual);
2257        let listener = move |k, v, cause| a1.lock().push((k, v, cause));
2258
2259        // Create a cache with the eviction listener.
2260        let mut cache = Cache::builder()
2261            .max_capacity(100)
2262            .eviction_listener(listener)
2263            .build();
2264        cache.reconfigure_for_testing();
2265
2266        // Make the cache exterior immutable.
2267        let cache = cache;
2268
2269        cache.insert("a", "alice");
2270        cache.insert("b", "bob");
2271        cache.insert("c", "cindy");
2272        assert_eq!(cache.get(&"a"), Some("alice"));
2273        assert_eq!(cache.get(&"b"), Some("bob"));
2274        assert_eq!(cache.get(&"c"), Some("cindy"));
2275        assert!(cache.contains_key(&"a"));
2276        assert!(cache.contains_key(&"b"));
2277        assert!(cache.contains_key(&"c"));
2278
2279        // `cache.run_pending_tasks()` is no longer needed here before invalidating. The last
2280        // modified timestamp of the entries were updated when they were inserted.
2281        // https://github.com/moka-rs/moka/issues/155
2282
2283        cache.invalidate_all();
2284        expected.push((Arc::new("a"), "alice", RemovalCause::Explicit));
2285        expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
2286        expected.push((Arc::new("c"), "cindy", RemovalCause::Explicit));
2287        cache.run_pending_tasks();
2288
2289        cache.insert("d", "david");
2290        cache.run_pending_tasks();
2291
2292        assert!(cache.get(&"a").is_none());
2293        assert!(cache.get(&"b").is_none());
2294        assert!(cache.get(&"c").is_none());
2295        assert_eq!(cache.get(&"d"), Some("david"));
2296        assert!(!cache.contains_key(&"a"));
2297        assert!(!cache.contains_key(&"b"));
2298        assert!(!cache.contains_key(&"c"));
2299        assert!(cache.contains_key(&"d"));
2300
2301        verify_notification_vec(&cache, actual, &expected);
2302    }
2303
2304    #[test]
2305    fn invalidate_entries_if() -> Result<(), Box<dyn std::error::Error>> {
2306        use std::collections::HashSet;
2307
2308        // The following `Vec`s will hold actual and expected notifications.
2309        let actual = Arc::new(Mutex::new(Vec::new()));
2310        let mut expected = Vec::new();
2311
2312        // Create an eviction listener.
2313        let a1 = Arc::clone(&actual);
2314        let listener = move |k, v, cause| a1.lock().push((k, v, cause));
2315
2316        let (clock, mock) = Clock::mock();
2317
2318        // Create a cache with the eviction listener.
2319        let mut cache = Cache::builder()
2320            .max_capacity(100)
2321            .support_invalidation_closures()
2322            .eviction_listener(listener)
2323            .clock(clock)
2324            .build();
2325        cache.reconfigure_for_testing();
2326
2327        // Make the cache exterior immutable.
2328        let cache = cache;
2329
2330        cache.insert(0, "alice");
2331        cache.insert(1, "bob");
2332        cache.insert(2, "alex");
2333        cache.run_pending_tasks();
2334
2335        mock.increment(Duration::from_secs(5)); // 5 secs from the start.
2336        cache.run_pending_tasks();
2337
2338        assert_eq!(cache.get(&0), Some("alice"));
2339        assert_eq!(cache.get(&1), Some("bob"));
2340        assert_eq!(cache.get(&2), Some("alex"));
2341        assert!(cache.contains_key(&0));
2342        assert!(cache.contains_key(&1));
2343        assert!(cache.contains_key(&2));
2344
2345        let names = ["alice", "alex"].iter().cloned().collect::<HashSet<_>>();
2346        cache.invalidate_entries_if(move |_k, &v| names.contains(v))?;
2347        assert_eq!(cache.base.invalidation_predicate_count(), 1);
2348        expected.push((Arc::new(0), "alice", RemovalCause::Explicit));
2349        expected.push((Arc::new(2), "alex", RemovalCause::Explicit));
2350
2351        mock.increment(Duration::from_secs(5)); // 10 secs from the start.
2352
2353        cache.insert(3, "alice");
2354
2355        // Run the invalidation task and wait for it to finish. (TODO: Need a better way than sleeping)
2356        cache.run_pending_tasks(); // To submit the invalidation task.
2357        std::thread::sleep(Duration::from_millis(200));
2358        cache.run_pending_tasks(); // To process the task result.
2359        std::thread::sleep(Duration::from_millis(200));
2360
2361        assert!(cache.get(&0).is_none());
2362        assert!(cache.get(&2).is_none());
2363        assert_eq!(cache.get(&1), Some("bob"));
2364        // This should survive as it was inserted after calling invalidate_entries_if.
2365        assert_eq!(cache.get(&3), Some("alice"));
2366
2367        assert!(!cache.contains_key(&0));
2368        assert!(cache.contains_key(&1));
2369        assert!(!cache.contains_key(&2));
2370        assert!(cache.contains_key(&3));
2371
2372        assert_eq!(cache.entry_count(), 2);
2373        assert_eq!(cache.invalidation_predicate_count(), 0);
2374
2375        mock.increment(Duration::from_secs(5)); // 15 secs from the start.
2376
2377        cache.invalidate_entries_if(|_k, &v| v == "alice")?;
2378        cache.invalidate_entries_if(|_k, &v| v == "bob")?;
2379        assert_eq!(cache.invalidation_predicate_count(), 2);
2380        // key 1 was inserted before key 3.
2381        expected.push((Arc::new(1), "bob", RemovalCause::Explicit));
2382        expected.push((Arc::new(3), "alice", RemovalCause::Explicit));
2383
2384        // Run the invalidation task and wait for it to finish. (TODO: Need a better way than sleeping)
2385        cache.run_pending_tasks(); // To submit the invalidation task.
2386        std::thread::sleep(Duration::from_millis(200));
2387        cache.run_pending_tasks(); // To process the task result.
2388        std::thread::sleep(Duration::from_millis(200));
2389
2390        assert!(cache.get(&1).is_none());
2391        assert!(cache.get(&3).is_none());
2392
2393        assert!(!cache.contains_key(&1));
2394        assert!(!cache.contains_key(&3));
2395
2396        assert_eq!(cache.entry_count(), 0);
2397        assert_eq!(cache.invalidation_predicate_count(), 0);
2398
2399        verify_notification_vec(&cache, actual, &expected);
2400
2401        Ok(())
2402    }
2403
2404    #[test]
2405    fn time_to_live() {
2406        // The following `Vec`s will hold actual and expected notifications.
2407        let actual = Arc::new(Mutex::new(Vec::new()));
2408        let mut expected = Vec::new();
2409
2410        // Create an eviction listener.
2411        let a1 = Arc::clone(&actual);
2412        let listener = move |k, v, cause| a1.lock().push((k, v, cause));
2413
2414        let (clock, mock) = Clock::mock();
2415
2416        // Create a cache with the eviction listener.
2417        let mut cache = Cache::builder()
2418            .max_capacity(100)
2419            .time_to_live(Duration::from_secs(10))
2420            .eviction_listener(listener)
2421            .clock(clock)
2422            .build();
2423        cache.reconfigure_for_testing();
2424
2425        // Make the cache exterior immutable.
2426        let cache = cache;
2427
2428        cache.insert("a", "alice");
2429        cache.run_pending_tasks();
2430
2431        mock.increment(Duration::from_secs(5)); // 5 secs from the start.
2432        cache.run_pending_tasks();
2433
2434        assert_eq!(cache.get(&"a"), Some("alice"));
2435        assert!(cache.contains_key(&"a"));
2436
2437        mock.increment(Duration::from_secs(5)); // 10 secs.
2438        expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
2439        assert_eq!(cache.get(&"a"), None);
2440        assert!(!cache.contains_key(&"a"));
2441
2442        assert_eq!(cache.iter().count(), 0);
2443
2444        cache.run_pending_tasks();
2445        assert!(cache.is_table_empty());
2446
2447        cache.insert("b", "bob");
2448        cache.run_pending_tasks();
2449
2450        assert_eq!(cache.entry_count(), 1);
2451
2452        mock.increment(Duration::from_secs(5)); // 15 secs.
2453        cache.run_pending_tasks();
2454
2455        assert_eq!(cache.get(&"b"), Some("bob"));
2456        assert!(cache.contains_key(&"b"));
2457        assert_eq!(cache.entry_count(), 1);
2458
2459        cache.insert("b", "bill");
2460        expected.push((Arc::new("b"), "bob", RemovalCause::Replaced));
2461        cache.run_pending_tasks();
2462
2463        mock.increment(Duration::from_secs(5)); // 20 secs
2464        cache.run_pending_tasks();
2465
2466        assert_eq!(cache.get(&"b"), Some("bill"));
2467        assert!(cache.contains_key(&"b"));
2468        assert_eq!(cache.entry_count(), 1);
2469
2470        mock.increment(Duration::from_secs(5)); // 25 secs
2471        expected.push((Arc::new("b"), "bill", RemovalCause::Expired));
2472
2473        assert_eq!(cache.get(&"a"), None);
2474        assert_eq!(cache.get(&"b"), None);
2475        assert!(!cache.contains_key(&"a"));
2476        assert!(!cache.contains_key(&"b"));
2477
2478        assert_eq!(cache.iter().count(), 0);
2479
2480        cache.run_pending_tasks();
2481        assert!(cache.is_table_empty());
2482
2483        verify_notification_vec(&cache, actual, &expected);
2484    }
2485
2486    #[test]
2487    fn time_to_idle() {
2488        // The following `Vec`s will hold actual and expected notifications.
2489        let actual = Arc::new(Mutex::new(Vec::new()));
2490        let mut expected = Vec::new();
2491
2492        // Create an eviction listener.
2493        let a1 = Arc::clone(&actual);
2494        let listener = move |k, v, cause| a1.lock().push((k, v, cause));
2495
2496        let (clock, mock) = Clock::mock();
2497
2498        // Create a cache with the eviction listener.
2499        let mut cache = Cache::builder()
2500            .max_capacity(100)
2501            .time_to_idle(Duration::from_secs(10))
2502            .eviction_listener(listener)
2503            .clock(clock)
2504            .build();
2505        cache.reconfigure_for_testing();
2506
2507        // Make the cache exterior immutable.
2508        let cache = cache;
2509
2510        cache.insert("a", "alice");
2511        cache.run_pending_tasks();
2512
2513        mock.increment(Duration::from_secs(5)); // 5 secs from the start.
2514        cache.run_pending_tasks();
2515
2516        assert_eq!(cache.get(&"a"), Some("alice"));
2517
2518        mock.increment(Duration::from_secs(5)); // 10 secs.
2519        cache.run_pending_tasks();
2520
2521        cache.insert("b", "bob");
2522        cache.run_pending_tasks();
2523
2524        assert_eq!(cache.entry_count(), 2);
2525
2526        mock.increment(Duration::from_secs(2)); // 12 secs.
2527        cache.run_pending_tasks();
2528
2529        // contains_key does not reset the idle timer for the key.
2530        assert!(cache.contains_key(&"a"));
2531        assert!(cache.contains_key(&"b"));
2532        cache.run_pending_tasks();
2533
2534        assert_eq!(cache.entry_count(), 2);
2535
2536        mock.increment(Duration::from_secs(3)); // 15 secs.
2537        expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
2538
2539        assert_eq!(cache.get(&"a"), None);
2540        assert_eq!(cache.get(&"b"), Some("bob"));
2541        assert!(!cache.contains_key(&"a"));
2542        assert!(cache.contains_key(&"b"));
2543
2544        assert_eq!(cache.iter().count(), 1);
2545
2546        cache.run_pending_tasks();
2547        assert_eq!(cache.entry_count(), 1);
2548
2549        mock.increment(Duration::from_secs(10)); // 25 secs
2550        expected.push((Arc::new("b"), "bob", RemovalCause::Expired));
2551
2552        assert_eq!(cache.get(&"a"), None);
2553        assert_eq!(cache.get(&"b"), None);
2554        assert!(!cache.contains_key(&"a"));
2555        assert!(!cache.contains_key(&"b"));
2556
2557        assert_eq!(cache.iter().count(), 0);
2558
2559        cache.run_pending_tasks();
2560        assert!(cache.is_table_empty());
2561
2562        verify_notification_vec(&cache, actual, &expected);
2563    }
2564
2565    // https://github.com/moka-rs/moka/issues/359
2566    #[test]
2567    fn ensure_access_time_is_updated_immediately_after_read() {
2568        let (clock, mock) = Clock::mock();
2569        let mut cache = Cache::builder()
2570            .max_capacity(10)
2571            .time_to_idle(Duration::from_secs(5))
2572            .clock(clock)
2573            .build();
2574        cache.reconfigure_for_testing();
2575
2576        // Make the cache exterior immutable.
2577        let cache = cache;
2578
2579        cache.insert(1, 1);
2580
2581        mock.increment(Duration::from_secs(4));
2582        assert_eq!(cache.get(&1), Some(1));
2583
2584        mock.increment(Duration::from_secs(2));
2585        assert_eq!(cache.get(&1), Some(1));
2586        cache.run_pending_tasks();
2587        assert_eq!(cache.get(&1), Some(1));
2588    }
2589
2590    #[test]
2591    fn time_to_live_by_expiry_type() {
2592        // Define an expiry type.
2593        struct MyExpiry {
2594            counters: Arc<ExpiryCallCounters>,
2595        }
2596
2597        impl MyExpiry {
2598            fn new(counters: Arc<ExpiryCallCounters>) -> Self {
2599                Self { counters }
2600            }
2601        }
2602
2603        impl Expiry<&str, &str> for MyExpiry {
2604            fn expire_after_create(
2605                &self,
2606                _key: &&str,
2607                _value: &&str,
2608                _current_time: StdInstant,
2609            ) -> Option<Duration> {
2610                self.counters.incl_actual_creations();
2611                Some(Duration::from_secs(10))
2612            }
2613
2614            fn expire_after_update(
2615                &self,
2616                _key: &&str,
2617                _value: &&str,
2618                _current_time: StdInstant,
2619                _current_duration: Option<Duration>,
2620            ) -> Option<Duration> {
2621                self.counters.incl_actual_updates();
2622                Some(Duration::from_secs(10))
2623            }
2624        }
2625
2626        // The following `Vec`s will hold actual and expected notifications.
2627        let actual = Arc::new(Mutex::new(Vec::new()));
2628        let mut expected = Vec::new();
2629
2630        // Create expiry counters and the expiry.
2631        let expiry_counters = Arc::new(ExpiryCallCounters::default());
2632        let expiry = MyExpiry::new(Arc::clone(&expiry_counters));
2633
2634        // Create an eviction listener.
2635        let a1 = Arc::clone(&actual);
2636        let listener = move |k, v, cause| a1.lock().push((k, v, cause));
2637
2638        let (clock, mock) = Clock::mock();
2639
2640        // Create a cache with the eviction listener.
2641        let mut cache = Cache::builder()
2642            .max_capacity(100)
2643            .expire_after(expiry)
2644            .eviction_listener(listener)
2645            .clock(clock)
2646            .build();
2647        cache.reconfigure_for_testing();
2648
2649        // Make the cache exterior immutable.
2650        let cache = cache;
2651
2652        cache.insert("a", "alice");
2653        expiry_counters.incl_expected_creations();
2654        cache.run_pending_tasks();
2655
2656        mock.increment(Duration::from_secs(5)); // 5 secs from the start.
2657        cache.run_pending_tasks();
2658
2659        assert_eq!(cache.get(&"a"), Some("alice"));
2660        assert!(cache.contains_key(&"a"));
2661
2662        mock.increment(Duration::from_secs(5)); // 10 secs.
2663        expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
2664        assert_eq!(cache.get(&"a"), None);
2665        assert!(!cache.contains_key(&"a"));
2666
2667        assert_eq!(cache.iter().count(), 0);
2668
2669        cache.run_pending_tasks();
2670        assert!(cache.is_table_empty());
2671
2672        cache.insert("b", "bob");
2673        expiry_counters.incl_expected_creations();
2674        cache.run_pending_tasks();
2675
2676        assert_eq!(cache.entry_count(), 1);
2677
2678        mock.increment(Duration::from_secs(5)); // 15 secs.
2679        cache.run_pending_tasks();
2680
2681        assert_eq!(cache.get(&"b"), Some("bob"));
2682        assert!(cache.contains_key(&"b"));
2683        assert_eq!(cache.entry_count(), 1);
2684
2685        cache.insert("b", "bill");
2686        expected.push((Arc::new("b"), "bob", RemovalCause::Replaced));
2687        expiry_counters.incl_expected_updates();
2688        cache.run_pending_tasks();
2689
2690        mock.increment(Duration::from_secs(5)); // 20 secs
2691        cache.run_pending_tasks();
2692
2693        assert_eq!(cache.get(&"b"), Some("bill"));
2694        assert!(cache.contains_key(&"b"));
2695        assert_eq!(cache.entry_count(), 1);
2696
2697        mock.increment(Duration::from_secs(5)); // 25 secs
2698        expected.push((Arc::new("b"), "bill", RemovalCause::Expired));
2699
2700        assert_eq!(cache.get(&"a"), None);
2701        assert_eq!(cache.get(&"b"), None);
2702        assert!(!cache.contains_key(&"a"));
2703        assert!(!cache.contains_key(&"b"));
2704
2705        assert_eq!(cache.iter().count(), 0);
2706
2707        cache.run_pending_tasks();
2708        assert!(cache.is_table_empty());
2709
2710        expiry_counters.verify();
2711        verify_notification_vec(&cache, actual, &expected);
2712    }
2713
2714    #[test]
2715    fn time_to_idle_by_expiry_type() {
2716        // Define an expiry type.
2717        struct MyExpiry {
2718            counters: Arc<ExpiryCallCounters>,
2719        }
2720
2721        impl MyExpiry {
2722            fn new(counters: Arc<ExpiryCallCounters>) -> Self {
2723                Self { counters }
2724            }
2725        }
2726
2727        impl Expiry<&str, &str> for MyExpiry {
2728            fn expire_after_read(
2729                &self,
2730                _key: &&str,
2731                _value: &&str,
2732                _current_time: StdInstant,
2733                _current_duration: Option<Duration>,
2734                _last_modified_at: StdInstant,
2735            ) -> Option<Duration> {
2736                self.counters.incl_actual_reads();
2737                Some(Duration::from_secs(10))
2738            }
2739        }
2740
2741        // The following `Vec`s will hold actual and expected notifications.
2742        let actual = Arc::new(Mutex::new(Vec::new()));
2743        let mut expected = Vec::new();
2744
2745        // Create expiry counters and the expiry.
2746        let expiry_counters = Arc::new(ExpiryCallCounters::default());
2747        let expiry = MyExpiry::new(Arc::clone(&expiry_counters));
2748
2749        // Create an eviction listener.
2750        let a1 = Arc::clone(&actual);
2751        let listener = move |k, v, cause| a1.lock().push((k, v, cause));
2752
2753        let (clock, mock) = Clock::mock();
2754
2755        // Create a cache with the eviction listener.
2756        let mut cache = Cache::builder()
2757            .max_capacity(100)
2758            .expire_after(expiry)
2759            .eviction_listener(listener)
2760            .clock(clock)
2761            .build();
2762        cache.reconfigure_for_testing();
2763
2764        // Make the cache exterior immutable.
2765        let cache = cache;
2766
2767        cache.insert("a", "alice");
2768        cache.run_pending_tasks();
2769
2770        mock.increment(Duration::from_secs(5)); // 5 secs from the start.
2771        cache.run_pending_tasks();
2772
2773        assert_eq!(cache.get(&"a"), Some("alice"));
2774        expiry_counters.incl_expected_reads();
2775
2776        mock.increment(Duration::from_secs(5)); // 10 secs.
2777        cache.run_pending_tasks();
2778
2779        cache.insert("b", "bob");
2780        cache.run_pending_tasks();
2781
2782        assert_eq!(cache.entry_count(), 2);
2783
2784        mock.increment(Duration::from_secs(2)); // 12 secs.
2785        cache.run_pending_tasks();
2786
2787        // contains_key does not reset the idle timer for the key.
2788        assert!(cache.contains_key(&"a"));
2789        assert!(cache.contains_key(&"b"));
2790        cache.run_pending_tasks();
2791
2792        assert_eq!(cache.entry_count(), 2);
2793
2794        mock.increment(Duration::from_secs(3)); // 15 secs.
2795        expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
2796
2797        assert_eq!(cache.get(&"a"), None);
2798        assert_eq!(cache.get(&"b"), Some("bob"));
2799        expiry_counters.incl_expected_reads();
2800        assert!(!cache.contains_key(&"a"));
2801        assert!(cache.contains_key(&"b"));
2802
2803        assert_eq!(cache.iter().count(), 1);
2804
2805        cache.run_pending_tasks();
2806        assert_eq!(cache.entry_count(), 1);
2807
2808        mock.increment(Duration::from_secs(10)); // 25 secs
2809        expected.push((Arc::new("b"), "bob", RemovalCause::Expired));
2810
2811        assert_eq!(cache.get(&"a"), None);
2812        assert_eq!(cache.get(&"b"), None);
2813        assert!(!cache.contains_key(&"a"));
2814        assert!(!cache.contains_key(&"b"));
2815
2816        assert_eq!(cache.iter().count(), 0);
2817
2818        cache.run_pending_tasks();
2819        assert!(cache.is_table_empty());
2820
2821        expiry_counters.verify();
2822        verify_notification_vec(&cache, actual, &expected);
2823    }
2824
2825    /// Verify that the `Expiry::expire_after_read()` method is called in `get_with`
2826    /// only when the key was already present in the cache.
2827    #[test]
2828    fn test_expiry_using_get_with() {
2829        // Define an expiry type, which always return `None`.
2830        struct NoExpiry {
2831            counters: Arc<ExpiryCallCounters>,
2832        }
2833
2834        impl NoExpiry {
2835            fn new(counters: Arc<ExpiryCallCounters>) -> Self {
2836                Self { counters }
2837            }
2838        }
2839
2840        impl Expiry<&str, &str> for NoExpiry {
2841            fn expire_after_create(
2842                &self,
2843                _key: &&str,
2844                _value: &&str,
2845                _current_time: StdInstant,
2846            ) -> Option<Duration> {
2847                self.counters.incl_actual_creations();
2848                None
2849            }
2850
2851            fn expire_after_read(
2852                &self,
2853                _key: &&str,
2854                _value: &&str,
2855                _current_time: StdInstant,
2856                _current_duration: Option<Duration>,
2857                _last_modified_at: StdInstant,
2858            ) -> Option<Duration> {
2859                self.counters.incl_actual_reads();
2860                None
2861            }
2862
2863            fn expire_after_update(
2864                &self,
2865                _key: &&str,
2866                _value: &&str,
2867                _current_time: StdInstant,
2868                _current_duration: Option<Duration>,
2869            ) -> Option<Duration> {
2870                unreachable!("The `expire_after_update()` method should not be called.");
2871            }
2872        }
2873
2874        // Create expiry counters and the expiry.
2875        let expiry_counters = Arc::new(ExpiryCallCounters::default());
2876        let expiry = NoExpiry::new(Arc::clone(&expiry_counters));
2877
2878        // Create a cache with the expiry and eviction listener.
2879        let mut cache = Cache::builder()
2880            .max_capacity(100)
2881            .expire_after(expiry)
2882            .build();
2883        cache.reconfigure_for_testing();
2884
2885        // Make the cache exterior immutable.
2886        let cache = cache;
2887
2888        // The key is not present.
2889        cache.get_with("a", || "alice");
2890        expiry_counters.incl_expected_creations();
2891        cache.run_pending_tasks();
2892
2893        // The key is present.
2894        cache.get_with("a", || "alex");
2895        expiry_counters.incl_expected_reads();
2896        cache.run_pending_tasks();
2897
2898        // The key is not present.
2899        cache.invalidate("a");
2900        cache.get_with("a", || "amanda");
2901        expiry_counters.incl_expected_creations();
2902        cache.run_pending_tasks();
2903
2904        expiry_counters.verify();
2905    }
2906
2907    /// Test that returning `None` from `expire_after_update` properly clears
2908    /// expiration for an already expired entry.
2909    ///
2910    /// Expected behavior: After `expire_after_update` returns `None`, the entry
2911    /// should become accessible via `get()`.
2912    #[test]
2913    fn expire_after_update_none_on_expired_entry() {
2914        use std::sync::atomic::{AtomicBool, Ordering};
2915
2916        // A flag to control whether the entry should expire immediately
2917        let should_expire = Arc::new(AtomicBool::new(true));
2918
2919        struct TestExpiry {
2920            should_expire: Arc<AtomicBool>,
2921        }
2922
2923        impl crate::Expiry<String, String> for TestExpiry {
2924            fn expire_after_create(
2925                &self,
2926                _key: &String,
2927                _value: &String,
2928                _current_time: StdInstant,
2929            ) -> Option<Duration> {
2930                if self.should_expire.load(Ordering::SeqCst) {
2931                    // Expire immediately
2932                    Some(Duration::ZERO)
2933                } else {
2934                    // No expiration
2935                    None
2936                }
2937            }
2938
2939            fn expire_after_update(
2940                &self,
2941                _key: &String,
2942                _value: &String,
2943                _current_time: StdInstant,
2944                _duration_until_expiry: Option<Duration>,
2945            ) -> Option<Duration> {
2946                if self.should_expire.load(Ordering::SeqCst) {
2947                    Some(Duration::ZERO)
2948                } else {
2949                    // According to docs, None means "no expiration"
2950                    // This should make the entry accessible again
2951                    None
2952                }
2953            }
2954        }
2955
2956        let expiry = TestExpiry {
2957            should_expire: Arc::clone(&should_expire),
2958        };
2959
2960        let cache: Cache<String, String> = Cache::builder()
2961            .max_capacity(100)
2962            .expire_after(expiry)
2963            .build();
2964
2965        let key = "test_key".to_string();
2966
2967        // Insert entry that expires immediately
2968        cache.insert(key.clone(), "first_value".to_string());
2969        cache.run_pending_tasks();
2970
2971        // Entry should exist but be expired
2972        assert_eq!(cache.entry_count(), 1, "Entry should exist in cache");
2973        assert_eq!(
2974            cache.get(&key),
2975            None,
2976            "Entry should not be accessible (expired)"
2977        );
2978        cache.run_pending_tasks();
2979
2980        // Now update the entry to NOT expire (expire_after_update returns None)
2981        should_expire.store(false, Ordering::SeqCst);
2982        cache.insert(key.clone(), "second_value".to_string());
2983        cache.run_pending_tasks();
2984
2985        // The entry should now be accessible since we returned None
2986        // (meaning "no expiration") from expire_after_update
2987        assert_eq!(cache.entry_count(), 1, "Entry should exist in cache");
2988
2989        // Returning None from expire_after_update should clear the expiration.
2990        let result = cache.get(&key);
2991        assert_eq!(
2992            result,
2993            Some("second_value".to_string()),
2994            "Entry should be accessible after clearing expiration"
2995        );
2996    }
2997
2998    // https://github.com/moka-rs/moka/issues/575
2999    //
3000    // Regression test: with a custom `Expiry` that only overrides
3001    // `expire_after_create`, re-inserting an expired key should call
3002    // `expire_after_create` (not `expire_after_update`), matching the
3003    // documented behavior that `expire_after_create` handles new entries.
3004    //
3005    // Before the fix, the expired entry in the hash table caused the update
3006    // path to run, calling `expire_after_update` (default returns
3007    // `duration_until_expiry`). The `else if` branch then cleared per-entry
3008    // expiration, making the entry immortal (stale).
3009    #[test]
3010    fn test_expire_after_create_only_no_stale_entries() {
3011        use std::sync::atomic::{AtomicBool, Ordering};
3012
3013        struct TestExpiry {
3014            should_expire: Arc<AtomicBool>,
3015        }
3016
3017        impl crate::Expiry<String, String> for TestExpiry {
3018            fn expire_after_create(
3019                &self,
3020                _key: &String,
3021                _value: &String,
3022                _current_time: StdInstant,
3023            ) -> Option<Duration> {
3024                if self.should_expire.load(Ordering::SeqCst) {
3025                    Some(Duration::from_secs(1))
3026                } else {
3027                    None
3028                }
3029            }
3030            // expire_after_read and expire_after_update use defaults (return
3031            // duration_until_expiry unchanged).
3032        }
3033
3034        let (clock, mock) = Clock::mock();
3035        let should_expire = Arc::new(AtomicBool::new(true));
3036
3037        let mut cache: Cache<String, String> = Cache::builder()
3038            .max_capacity(100)
3039            .expire_after(TestExpiry {
3040                should_expire: Arc::clone(&should_expire),
3041            })
3042            .clock(clock)
3043            .build();
3044        cache.reconfigure_for_testing();
3045
3046        let key = "key1".to_string();
3047
3048        // Insert an entry that expires in 1 second.
3049        cache.insert(key.clone(), "value1".to_string());
3050        cache.run_pending_tasks();
3051        assert_eq!(cache.get(&key), Some("value1".to_string()));
3052
3053        // Advance time past expiration.
3054        mock.increment(Duration::from_secs(2));
3055
3056        // Entry should be expired but still in the hash table (no
3057        // run_pending_tasks to evict it).
3058        assert_eq!(cache.get(&key), None, "Entry should be expired");
3059
3060        // Re-insert with no expiration. The old entry is expired, so this
3061        // calls expire_after_create (not expire_after_update).
3062        // expire_after_create returns None (no expiry) → entry lives forever.
3063        should_expire.store(false, Ordering::SeqCst);
3064        cache.insert(key.clone(), "value2".to_string());
3065        cache.run_pending_tasks();
3066
3067        // The entry should be accessible (expire_after_create cleared expiry).
3068        assert_eq!(
3069            cache.get(&key),
3070            Some("value2".to_string()),
3071            "Re-inserted entry with no expiry should be accessible"
3072        );
3073
3074        // Now test: re-insert an expired key with expiration enabled.
3075        // Verify it gets a fresh TTL from expire_after_create.
3076        should_expire.store(true, Ordering::SeqCst);
3077
3078        // Insert key2 with 1s expiry.
3079        let key2 = "key2".to_string();
3080        cache.insert(key2.clone(), "v1".to_string());
3081        cache.run_pending_tasks();
3082        assert_eq!(cache.get(&key2), Some("v1".to_string()));
3083
3084        // Expire it.
3085        mock.increment(Duration::from_secs(2));
3086        assert_eq!(cache.get(&key2), None, "key2 should be expired");
3087
3088        // Re-insert → calls expire_after_create → fresh 1s TTL.
3089        cache.insert(key2.clone(), "v2".to_string());
3090        cache.run_pending_tasks();
3091        assert_eq!(
3092            cache.get(&key2),
3093            Some("v2".to_string()),
3094            "Re-inserted key2 should be accessible with fresh TTL"
3095        );
3096
3097        // Advance past the fresh 1s TTL.
3098        mock.increment(Duration::from_secs(2));
3099        assert_eq!(
3100            cache.get(&key2),
3101            None,
3102            "key2 should expire with fresh TTL from expire_after_create"
3103        );
3104    }
3105
3106    // https://github.com/moka-rs/moka/issues/345
3107    #[test]
3108    fn test_race_between_updating_entry_and_processing_its_write_ops() {
3109        let (clock, mock) = Clock::mock();
3110        let cache = Cache::builder()
3111            .max_capacity(2)
3112            .time_to_idle(Duration::from_secs(1))
3113            .clock(clock)
3114            .build();
3115
3116        cache.insert("a", "alice");
3117        cache.insert("b", "bob");
3118        cache.insert("c", "cathy"); // c1
3119        mock.increment(Duration::from_secs(2));
3120
3121        // The following `insert` will do the followings:
3122        // 1. Replaces current "c" (c1) in the concurrent hash table (cht).
3123        // 2. Runs the pending tasks implicitly.
3124        //    (1) "a" will be admitted.
3125        //    (2) "b" will be admitted.
3126        //    (3) c1 will be evicted by size constraint.
3127        //    (4) "a" will be evicted due to expiration.
3128        //    (5) "b" will be evicted due to expiration.
3129        // 3. Send its `WriteOp` log to the channel.
3130        cache.insert("c", "cindy"); // c2
3131
3132        // Remove "c" (c2) from the cht.
3133        assert_eq!(cache.remove(&"c"), Some("cindy")); // c-remove
3134
3135        mock.increment(Duration::from_secs(2));
3136
3137        // The following `run_pending_tasks` will do the followings:
3138        // 1. Admits "c" (c2) to the cache. (Create a node in the LRU deque)
3139        // 2. Because of c-remove, removes c2's node from the LRU deque.
3140        cache.run_pending_tasks();
3141        assert_eq!(cache.entry_count(), 0);
3142    }
3143
3144    #[test]
3145    fn test_race_between_recreating_entry_and_processing_its_write_ops() {
3146        let cache = Cache::builder().max_capacity(2).build();
3147
3148        cache.insert('a', "a");
3149        cache.insert('b', "b");
3150        cache.run_pending_tasks();
3151
3152        cache.insert('c', "c1"); // (a) `EntryInfo` 1, gen: 1
3153        assert!(cache.remove(&'a').is_some()); // (b)
3154        assert!(cache.remove(&'b').is_some()); // (c)
3155        assert!(cache.remove(&'c').is_some()); // (d) `EntryInfo` 1, gen: 2
3156        cache.insert('c', "c2"); // (e) `EntryInfo` 2, gen: 1
3157
3158        // Now the `write_op_ch` channel contains the following `WriteOp`s:
3159        //
3160        // - 0: (a) insert "c1" (`EntryInfo` 1, gen: 1)
3161        // - 1: (b) remove "a"
3162        // - 2: (c) remove "b"
3163        // - 3: (d) remove "c1" (`EntryInfo` 1, gen: 2)
3164        // - 4: (e) insert "c2" (`EntryInfo` 2, gen: 1)
3165        //
3166        // 0 for "c1" is going to be rejected because the cache is full. Let's ensure
3167        // processing 0 must not remove "c2" from the concurrent hash table. (Their
3168        // gen are the same, but `EntryInfo`s are different)
3169        cache.run_pending_tasks();
3170        assert_eq!(cache.get(&'c'), Some("c2"));
3171    }
3172
3173    #[test]
3174    fn test_iter() {
3175        const NUM_KEYS: usize = 50;
3176
3177        fn make_value(key: usize) -> String {
3178            format!("val: {key}")
3179        }
3180
3181        let cache = Cache::builder()
3182            .max_capacity(100)
3183            .time_to_idle(Duration::from_secs(10))
3184            .build();
3185
3186        for key in 0..NUM_KEYS {
3187            cache.insert(key, make_value(key));
3188        }
3189
3190        let mut key_set = std::collections::HashSet::new();
3191
3192        for (key, value) in &cache {
3193            assert_eq!(value, make_value(*key));
3194
3195            key_set.insert(*key);
3196        }
3197
3198        // Ensure there are no missing or duplicate keys in the iteration.
3199        assert_eq!(key_set.len(), NUM_KEYS);
3200    }
3201
3202    /// Runs 16 threads at the same time and ensures no deadlock occurs.
3203    ///
3204    /// - Eight of the threads will update key-values in the cache.
3205    /// - Eight others will iterate the cache.
3206    ///
3207    #[test]
3208    fn test_iter_multi_threads() {
3209        use std::collections::HashSet;
3210
3211        const NUM_KEYS: usize = 1024;
3212        const NUM_THREADS: usize = 16;
3213
3214        fn make_value(key: usize) -> String {
3215            format!("val: {key}")
3216        }
3217
3218        let cache = Cache::builder()
3219            .max_capacity(2048)
3220            .time_to_idle(Duration::from_secs(10))
3221            .build();
3222
3223        // Initialize the cache.
3224        for key in 0..NUM_KEYS {
3225            cache.insert(key, make_value(key));
3226        }
3227
3228        let rw_lock = Arc::new(std::sync::RwLock::<()>::default());
3229        let write_lock = rw_lock.write().unwrap();
3230
3231        // https://rust-lang.github.io/rust-clippy/master/index.html#needless_collect
3232        #[allow(clippy::needless_collect)]
3233        let handles = (0..NUM_THREADS)
3234            .map(|n| {
3235                let cache = cache.clone();
3236                let rw_lock = Arc::clone(&rw_lock);
3237
3238                if n % 2 == 0 {
3239                    // This thread will update the cache.
3240                    std::thread::spawn(move || {
3241                        let read_lock = rw_lock.read().unwrap();
3242                        for key in 0..NUM_KEYS {
3243                            // TODO: Update keys in a random order?
3244                            cache.insert(key, make_value(key));
3245                        }
3246                        std::mem::drop(read_lock);
3247                    })
3248                } else {
3249                    // This thread will iterate the cache.
3250                    std::thread::spawn(move || {
3251                        let read_lock = rw_lock.read().unwrap();
3252                        let mut key_set = HashSet::new();
3253                        for (key, value) in &cache {
3254                            assert_eq!(value, make_value(*key));
3255                            key_set.insert(*key);
3256                        }
3257                        // Ensure there are no missing or duplicate keys in the iteration.
3258                        assert_eq!(key_set.len(), NUM_KEYS);
3259                        std::mem::drop(read_lock);
3260                    })
3261                }
3262            })
3263            .collect::<Vec<_>>();
3264
3265        // Let these threads to run by releasing the write lock.
3266        std::mem::drop(write_lock);
3267
3268        handles.into_iter().for_each(|h| h.join().expect("Failed"));
3269
3270        // Ensure there are no missing or duplicate keys in the iteration.
3271        let key_set = cache.iter().map(|(k, _v)| *k).collect::<HashSet<_>>();
3272        assert_eq!(key_set.len(), NUM_KEYS);
3273    }
3274
3275    #[test]
3276    fn get_with() {
3277        use std::thread::{sleep, spawn};
3278
3279        let cache = Cache::new(100);
3280        const KEY: u32 = 0;
3281
3282        // This test will run five threads:
3283        //
3284        // Thread1 will be the first thread to call `get_with` for a key, so its init
3285        // closure will be evaluated and then a &str value "thread1" will be inserted
3286        // to the cache.
3287        let thread1 = {
3288            let cache1 = cache.clone();
3289            spawn(move || {
3290                // Call `get_with` immediately.
3291                let v = cache1.get_with(KEY, || {
3292                    // Wait for 300 ms and return a &str value.
3293                    sleep(Duration::from_millis(300));
3294                    "thread1"
3295                });
3296                assert_eq!(v, "thread1");
3297            })
3298        };
3299
3300        // Thread2 will be the second thread to call `get_with` for the same key, so
3301        // its init closure will not be evaluated. Once thread1's init closure
3302        // finishes, it will get the value inserted by thread1's init closure.
3303        let thread2 = {
3304            let cache2 = cache.clone();
3305            spawn(move || {
3306                // Wait for 100 ms before calling `get_with`.
3307                sleep(Duration::from_millis(100));
3308                let v = cache2.get_with(KEY, || unreachable!());
3309                assert_eq!(v, "thread1");
3310            })
3311        };
3312
3313        // Thread3 will be the third thread to call `get_with` for the same key. By
3314        // the time it calls, thread1's init closure should have finished already and
3315        // the value should be already inserted to the cache. So its init closure
3316        // will not be evaluated and will get the value insert by thread1's init
3317        // closure immediately.
3318        let thread3 = {
3319            let cache3 = cache.clone();
3320            spawn(move || {
3321                // Wait for 400 ms before calling `get_with`.
3322                sleep(Duration::from_millis(400));
3323                let v = cache3.get_with(KEY, || unreachable!());
3324                assert_eq!(v, "thread1");
3325            })
3326        };
3327
3328        // Thread4 will call `get` for the same key. It will call when thread1's init
3329        // closure is still running, so it will get none for the key.
3330        let thread4 = {
3331            let cache4 = cache.clone();
3332            spawn(move || {
3333                // Wait for 200 ms before calling `get`.
3334                sleep(Duration::from_millis(200));
3335                let maybe_v = cache4.get(&KEY);
3336                assert!(maybe_v.is_none());
3337            })
3338        };
3339
3340        // Thread5 will call `get` for the same key. It will call after thread1's init
3341        // closure finished, so it will get the value insert by thread1's init closure.
3342        let thread5 = {
3343            let cache5 = cache.clone();
3344            spawn(move || {
3345                // Wait for 400 ms before calling `get`.
3346                sleep(Duration::from_millis(400));
3347                let maybe_v = cache5.get(&KEY);
3348                assert_eq!(maybe_v, Some("thread1"));
3349            })
3350        };
3351
3352        for t in [thread1, thread2, thread3, thread4, thread5] {
3353            t.join().expect("Failed to join");
3354        }
3355
3356        assert!(cache.is_waiter_map_empty());
3357    }
3358
3359    #[test]
3360    fn get_with_by_ref() {
3361        use std::thread::{sleep, spawn};
3362
3363        let cache = Cache::new(100);
3364        const KEY: &u32 = &0;
3365
3366        // This test will run five threads:
3367        //
3368        // Thread1 will be the first thread to call `get_with_by_ref` for a key, so
3369        // its init closure will be evaluated and then a &str value "thread1" will be
3370        // inserted to the cache.
3371        let thread1 = {
3372            let cache1 = cache.clone();
3373            spawn(move || {
3374                // Call `get_with_by_ref` immediately.
3375                let v = cache1.get_with_by_ref(KEY, || {
3376                    // Wait for 300 ms and return a &str value.
3377                    sleep(Duration::from_millis(300));
3378                    "thread1"
3379                });
3380                assert_eq!(v, "thread1");
3381            })
3382        };
3383
3384        // Thread2 will be the second thread to call `get_with_by_ref` for the same
3385        // key, so its init closure will not be evaluated. Once thread1's init
3386        // closure finishes, it will get the value inserted by thread1's init
3387        // closure.
3388        let thread2 = {
3389            let cache2 = cache.clone();
3390            spawn(move || {
3391                // Wait for 100 ms before calling `get_with_by_ref`.
3392                sleep(Duration::from_millis(100));
3393                let v = cache2.get_with_by_ref(KEY, || unreachable!());
3394                assert_eq!(v, "thread1");
3395            })
3396        };
3397
3398        // Thread3 will be the third thread to call `get_with_by_ref` for the same
3399        // key. By the time it calls, thread1's init closure should have finished
3400        // already and the value should be already inserted to the cache. So its init
3401        // closure will not be evaluated and will get the value insert by thread1's
3402        // init closure immediately.
3403        let thread3 = {
3404            let cache3 = cache.clone();
3405            spawn(move || {
3406                // Wait for 400 ms before calling `get_with_by_ref`.
3407                sleep(Duration::from_millis(400));
3408                let v = cache3.get_with_by_ref(KEY, || unreachable!());
3409                assert_eq!(v, "thread1");
3410            })
3411        };
3412
3413        // Thread4 will call `get` for the same key. It will call when thread1's init
3414        // closure is still running, so it will get none for the key.
3415        let thread4 = {
3416            let cache4 = cache.clone();
3417            spawn(move || {
3418                // Wait for 200 ms before calling `get`.
3419                sleep(Duration::from_millis(200));
3420                let maybe_v = cache4.get(KEY);
3421                assert!(maybe_v.is_none());
3422            })
3423        };
3424
3425        // Thread5 will call `get` for the same key. It will call after thread1's init
3426        // closure finished, so it will get the value insert by thread1's init closure.
3427        let thread5 = {
3428            let cache5 = cache.clone();
3429            spawn(move || {
3430                // Wait for 400 ms before calling `get`.
3431                sleep(Duration::from_millis(400));
3432                let maybe_v = cache5.get(KEY);
3433                assert_eq!(maybe_v, Some("thread1"));
3434            })
3435        };
3436
3437        for t in [thread1, thread2, thread3, thread4, thread5] {
3438            t.join().expect("Failed to join");
3439        }
3440
3441        assert!(cache.is_waiter_map_empty());
3442    }
3443
3444    #[test]
3445    fn entry_or_insert_with_if() {
3446        use std::thread::{sleep, spawn};
3447
3448        let cache = Cache::new(100);
3449        const KEY: u32 = 0;
3450
3451        // This test will run seven threads:
3452        //
3453        // Thread1 will be the first thread to call `or_insert_with_if` for a key, so
3454        // its init closure will be evaluated and then a &str value "thread1" will be
3455        // inserted to the cache.
3456        let thread1 = {
3457            let cache1 = cache.clone();
3458            spawn(move || {
3459                // Call `get_with` immediately.
3460                let entry = cache1.entry(KEY).or_insert_with_if(
3461                    || {
3462                        // Wait for 300 ms and return a &str value.
3463                        sleep(Duration::from_millis(300));
3464                        "thread1"
3465                    },
3466                    |_v| unreachable!(),
3467                );
3468                // Entry should be fresh because our async block should have been
3469                // evaluated.
3470                assert!(entry.is_fresh());
3471                assert_eq!(entry.into_value(), "thread1");
3472            })
3473        };
3474
3475        // Thread2 will be the second thread to call `or_insert_with_if` for the same
3476        // key, so its init closure will not be evaluated. Once thread1's init
3477        // closure finishes, it will get the value inserted by thread1's init
3478        // closure.
3479        let thread2 = {
3480            let cache2 = cache.clone();
3481            spawn(move || {
3482                // Wait for 100 ms before calling `get_with`.
3483                sleep(Duration::from_millis(100));
3484                let entry = cache2
3485                    .entry(KEY)
3486                    .or_insert_with_if(|| unreachable!(), |_v| unreachable!());
3487                // Entry should not be fresh because thread1's async block should have
3488                // been evaluated instead of ours.
3489                assert!(!entry.is_fresh());
3490                assert_eq!(entry.into_value(), "thread1");
3491            })
3492        };
3493
3494        // Thread3 will be the third thread to call `or_insert_with_if` for the same
3495        // key. By the time it calls, thread1's init closure should have finished
3496        // already and the value should be already inserted to the cache. Also
3497        // thread3's `replace_if` closure returns `false`. So its init closure will
3498        // not be evaluated and will get the value inserted by thread1's init closure
3499        // immediately.
3500        let thread3 = {
3501            let cache3 = cache.clone();
3502            spawn(move || {
3503                // Wait for 350 ms before calling `or_insert_with_if`.
3504                sleep(Duration::from_millis(350));
3505                let entry = cache3.entry(KEY).or_insert_with_if(
3506                    || unreachable!(),
3507                    |v| {
3508                        assert_eq!(v, &"thread1");
3509                        false
3510                    },
3511                );
3512                assert!(!entry.is_fresh());
3513                assert_eq!(entry.into_value(), "thread1");
3514            })
3515        };
3516
3517        // Thread4 will be the fourth thread to call `or_insert_with_if` for the same
3518        // key. The value should have been already inserted to the cache by thread1.
3519        // However thread4's `replace_if` closure returns `true`. So its init closure
3520        // will be evaluated to replace the current value.
3521        let thread4 = {
3522            let cache4 = cache.clone();
3523            spawn(move || {
3524                // Wait for 400 ms before calling `or_insert_with_if`.
3525                sleep(Duration::from_millis(400));
3526                let entry = cache4.entry(KEY).or_insert_with_if(
3527                    || "thread4",
3528                    |v| {
3529                        assert_eq!(v, &"thread1");
3530                        true
3531                    },
3532                );
3533                assert!(entry.is_fresh());
3534                assert_eq!(entry.into_value(), "thread4");
3535            })
3536        };
3537
3538        // Thread5 will call `get` for the same key. It will call when thread1's init
3539        // closure is still running, so it will get none for the key.
3540        let thread5 = {
3541            let cache5 = cache.clone();
3542            spawn(move || {
3543                // Wait for 200 ms before calling `get`.
3544                sleep(Duration::from_millis(200));
3545                let maybe_v = cache5.get(&KEY);
3546                assert!(maybe_v.is_none());
3547            })
3548        };
3549
3550        // Thread6 will call `get` for the same key. It will call when thread1's init
3551        // closure is still running, so it will get none for the key.
3552        let thread6 = {
3553            let cache6 = cache.clone();
3554            spawn(move || {
3555                // Wait for 350 ms before calling `get`.
3556                sleep(Duration::from_millis(350));
3557                let maybe_v = cache6.get(&KEY);
3558                assert_eq!(maybe_v, Some("thread1"));
3559            })
3560        };
3561
3562        // Thread7 will call `get` for the same key. It will call after thread1's init
3563        // closure finished, so it will get the value insert by thread1's init closure.
3564        let thread7 = {
3565            let cache7 = cache.clone();
3566            spawn(move || {
3567                // Wait for 450 ms before calling `get`.
3568                sleep(Duration::from_millis(450));
3569                let maybe_v = cache7.get(&KEY);
3570                assert_eq!(maybe_v, Some("thread4"));
3571            })
3572        };
3573
3574        for t in [
3575            thread1, thread2, thread3, thread4, thread5, thread6, thread7,
3576        ] {
3577            t.join().expect("Failed to join");
3578        }
3579
3580        assert!(cache.is_waiter_map_empty());
3581    }
3582
3583    #[test]
3584    fn entry_by_ref_or_insert_with_if() {
3585        use std::thread::{sleep, spawn};
3586
3587        let cache: Cache<u32, &str> = Cache::new(100);
3588        const KEY: &u32 = &0;
3589
3590        // This test will run seven threads:
3591        //
3592        // Thread1 will be the first thread to call `or_insert_with_if` for a key, so
3593        // its init closure will be evaluated and then a &str value "thread1" will be
3594        // inserted to the cache.
3595        let thread1 = {
3596            let cache1 = cache.clone();
3597            spawn(move || {
3598                // Call `get_with` immediately.
3599                let v = cache1
3600                    .entry_by_ref(KEY)
3601                    .or_insert_with_if(
3602                        || {
3603                            // Wait for 300 ms and return a &str value.
3604                            sleep(Duration::from_millis(300));
3605                            "thread1"
3606                        },
3607                        |_v| unreachable!(),
3608                    )
3609                    .into_value();
3610                assert_eq!(v, "thread1");
3611            })
3612        };
3613
3614        // Thread2 will be the second thread to call `or_insert_with_if` for the same
3615        // key, so its init closure will not be evaluated. Once thread1's init
3616        // closure finishes, it will get the value inserted by thread1's init
3617        // closure.
3618        let thread2 = {
3619            let cache2 = cache.clone();
3620            spawn(move || {
3621                // Wait for 100 ms before calling `get_with`.
3622                sleep(Duration::from_millis(100));
3623                let v = cache2
3624                    .entry_by_ref(KEY)
3625                    .or_insert_with_if(|| unreachable!(), |_v| unreachable!())
3626                    .into_value();
3627                assert_eq!(v, "thread1");
3628            })
3629        };
3630
3631        // Thread3 will be the third thread to call `or_insert_with_if` for the same
3632        // key. By the time it calls, thread1's init closure should have finished
3633        // already and the value should be already inserted to the cache. Also
3634        // thread3's `replace_if` closure returns `false`. So its init closure will
3635        // not be evaluated and will get the value inserted by thread1's init closure
3636        // immediately.
3637        let thread3 = {
3638            let cache3 = cache.clone();
3639            spawn(move || {
3640                // Wait for 350 ms before calling `or_insert_with_if`.
3641                sleep(Duration::from_millis(350));
3642                let v = cache3
3643                    .entry_by_ref(KEY)
3644                    .or_insert_with_if(
3645                        || unreachable!(),
3646                        |v| {
3647                            assert_eq!(v, &"thread1");
3648                            false
3649                        },
3650                    )
3651                    .into_value();
3652                assert_eq!(v, "thread1");
3653            })
3654        };
3655
3656        // Thread4 will be the fourth thread to call `or_insert_with_if` for the same
3657        // key. The value should have been already inserted to the cache by
3658        // thread1. However thread4's `replace_if` closure returns `true`. So its
3659        // init closure will be evaluated to replace the current value.
3660        let thread4 = {
3661            let cache4 = cache.clone();
3662            spawn(move || {
3663                // Wait for 400 ms before calling `or_insert_with_if`.
3664                sleep(Duration::from_millis(400));
3665                let v = cache4
3666                    .entry_by_ref(KEY)
3667                    .or_insert_with_if(
3668                        || "thread4",
3669                        |v| {
3670                            assert_eq!(v, &"thread1");
3671                            true
3672                        },
3673                    )
3674                    .into_value();
3675                assert_eq!(v, "thread4");
3676            })
3677        };
3678
3679        // Thread5 will call `get` for the same key. It will call when thread1's init
3680        // closure is still running, so it will get none for the key.
3681        let thread5 = {
3682            let cache5 = cache.clone();
3683            spawn(move || {
3684                // Wait for 200 ms before calling `get`.
3685                sleep(Duration::from_millis(200));
3686                let maybe_v = cache5.get(KEY);
3687                assert!(maybe_v.is_none());
3688            })
3689        };
3690
3691        // Thread6 will call `get` for the same key. It will call when thread1's init
3692        // closure is still running, so it will get none for the key.
3693        let thread6 = {
3694            let cache6 = cache.clone();
3695            spawn(move || {
3696                // Wait for 350 ms before calling `get`.
3697                sleep(Duration::from_millis(350));
3698                let maybe_v = cache6.get(KEY);
3699                assert_eq!(maybe_v, Some("thread1"));
3700            })
3701        };
3702
3703        // Thread7 will call `get` for the same key. It will call after thread1's init
3704        // closure finished, so it will get the value insert by thread1's init closure.
3705        let thread7 = {
3706            let cache7 = cache.clone();
3707            spawn(move || {
3708                // Wait for 450 ms before calling `get`.
3709                sleep(Duration::from_millis(450));
3710                let maybe_v = cache7.get(KEY);
3711                assert_eq!(maybe_v, Some("thread4"));
3712            })
3713        };
3714
3715        for t in [
3716            thread1, thread2, thread3, thread4, thread5, thread6, thread7,
3717        ] {
3718            t.join().expect("Failed to join");
3719        }
3720
3721        assert!(cache.is_waiter_map_empty());
3722    }
3723
3724    #[test]
3725    fn try_get_with() {
3726        use std::{
3727            sync::Arc,
3728            thread::{sleep, spawn},
3729        };
3730
3731        // Note that MyError does not implement std::error::Error trait like
3732        // anyhow::Error.
3733        #[derive(Debug)]
3734        pub struct MyError(#[allow(dead_code)] String);
3735
3736        type MyResult<T> = Result<T, Arc<MyError>>;
3737
3738        let cache = Cache::new(100);
3739        const KEY: u32 = 0;
3740
3741        // This test will run eight threads:
3742        //
3743        // Thread1 will be the first thread to call `try_get_with` for a key, so its
3744        // init closure will be evaluated and then an error will be returned. Nothing
3745        // will be inserted to the cache.
3746        let thread1 = {
3747            let cache1 = cache.clone();
3748            spawn(move || {
3749                // Call `try_get_with` immediately.
3750                let v = cache1.try_get_with(KEY, || {
3751                    // Wait for 300 ms and return an error.
3752                    sleep(Duration::from_millis(300));
3753                    Err(MyError("thread1 error".into()))
3754                });
3755                assert!(v.is_err());
3756            })
3757        };
3758
3759        // Thread2 will be the second thread to call `try_get_with` for the same key,
3760        // so its init closure will not be evaluated. Once thread1's init closure
3761        // finishes, it will get the same error value returned by thread1's init
3762        // closure.
3763        let thread2 = {
3764            let cache2 = cache.clone();
3765            spawn(move || {
3766                // Wait for 100 ms before calling `try_get_with`.
3767                sleep(Duration::from_millis(100));
3768                let v: MyResult<_> = cache2.try_get_with(KEY, || unreachable!());
3769                assert!(v.is_err());
3770            })
3771        };
3772
3773        // Thread3 will be the third thread to call `get_with` for the same key. By
3774        // the time it calls, thread1's init closure should have finished already,
3775        // but the key still does not exist in the cache. So its init closure will be
3776        // evaluated and then an okay &str value will be returned. That value will be
3777        // inserted to the cache.
3778        let thread3 = {
3779            let cache3 = cache.clone();
3780            spawn(move || {
3781                // Wait for 400 ms before calling `try_get_with`.
3782                sleep(Duration::from_millis(400));
3783                let v: MyResult<_> = cache3.try_get_with(KEY, || {
3784                    // Wait for 300 ms and return an Ok(&str) value.
3785                    sleep(Duration::from_millis(300));
3786                    Ok("thread3")
3787                });
3788                assert_eq!(v.unwrap(), "thread3");
3789            })
3790        };
3791
3792        // thread4 will be the fourth thread to call `try_get_with` for the same
3793        // key. So its init closure will not be evaluated. Once thread3's init
3794        // closure finishes, it will get the same okay &str value.
3795        let thread4 = {
3796            let cache4 = cache.clone();
3797            spawn(move || {
3798                // Wait for 500 ms before calling `try_get_with`.
3799                sleep(Duration::from_millis(500));
3800                let v: MyResult<_> = cache4.try_get_with(KEY, || unreachable!());
3801                assert_eq!(v.unwrap(), "thread3");
3802            })
3803        };
3804
3805        // Thread5 will be the fifth thread to call `try_get_with` for the same
3806        // key. So its init closure will not be evaluated. By the time it calls,
3807        // thread3's init closure should have finished already, so its init closure
3808        // will not be evaluated and will get the value insert by thread3's init
3809        // closure immediately.
3810        let thread5 = {
3811            let cache5 = cache.clone();
3812            spawn(move || {
3813                // Wait for 800 ms before calling `try_get_with`.
3814                sleep(Duration::from_millis(800));
3815                let v: MyResult<_> = cache5.try_get_with(KEY, || unreachable!());
3816                assert_eq!(v.unwrap(), "thread3");
3817            })
3818        };
3819
3820        // Thread6 will call `get` for the same key. It will call when thread1's init
3821        // closure is still running, so it will get none for the key.
3822        let thread6 = {
3823            let cache6 = cache.clone();
3824            spawn(move || {
3825                // Wait for 200 ms before calling `get`.
3826                sleep(Duration::from_millis(200));
3827                let maybe_v = cache6.get(&KEY);
3828                assert!(maybe_v.is_none());
3829            })
3830        };
3831
3832        // Thread7 will call `get` for the same key. It will call after thread1's init
3833        // closure finished with an error. So it will get none for the key.
3834        let thread7 = {
3835            let cache7 = cache.clone();
3836            spawn(move || {
3837                // Wait for 400 ms before calling `get`.
3838                sleep(Duration::from_millis(400));
3839                let maybe_v = cache7.get(&KEY);
3840                assert!(maybe_v.is_none());
3841            })
3842        };
3843
3844        // Thread8 will call `get` for the same key. It will call after thread3's init
3845        // closure finished, so it will get the value insert by thread3's init closure.
3846        let thread8 = {
3847            let cache8 = cache.clone();
3848            spawn(move || {
3849                // Wait for 800 ms before calling `get`.
3850                sleep(Duration::from_millis(800));
3851                let maybe_v = cache8.get(&KEY);
3852                assert_eq!(maybe_v, Some("thread3"));
3853            })
3854        };
3855
3856        for t in [
3857            thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8,
3858        ] {
3859            t.join().expect("Failed to join");
3860        }
3861
3862        assert!(cache.is_waiter_map_empty());
3863    }
3864
3865    #[test]
3866    fn try_get_with_by_ref() {
3867        use std::{
3868            sync::Arc,
3869            thread::{sleep, spawn},
3870        };
3871
3872        // Note that MyError does not implement std::error::Error trait like
3873        // anyhow::Error.
3874        #[derive(Debug)]
3875        pub struct MyError(#[allow(dead_code)] String);
3876
3877        type MyResult<T> = Result<T, Arc<MyError>>;
3878
3879        let cache = Cache::new(100);
3880        const KEY: &u32 = &0;
3881
3882        // This test will run eight threads:
3883        //
3884        // Thread1 will be the first thread to call `try_get_with_by_ref` for a key,
3885        // so its init closure will be evaluated and then an error will be returned.
3886        // Nothing will be inserted to the cache.
3887        let thread1 = {
3888            let cache1 = cache.clone();
3889            spawn(move || {
3890                // Call `try_get_with_by_ref` immediately.
3891                let v = cache1.try_get_with_by_ref(KEY, || {
3892                    // Wait for 300 ms and return an error.
3893                    sleep(Duration::from_millis(300));
3894                    Err(MyError("thread1 error".into()))
3895                });
3896                assert!(v.is_err());
3897            })
3898        };
3899
3900        // Thread2 will be the second thread to call `try_get_with_by_ref` for the
3901        // same key, so its init closure will not be evaluated. Once thread1's init
3902        // closure finishes, it will get the same error value returned by thread1's
3903        // init closure.
3904        let thread2 = {
3905            let cache2 = cache.clone();
3906            spawn(move || {
3907                // Wait for 100 ms before calling `try_get_with_by_ref`.
3908                sleep(Duration::from_millis(100));
3909                let v: MyResult<_> = cache2.try_get_with_by_ref(KEY, || unreachable!());
3910                assert!(v.is_err());
3911            })
3912        };
3913
3914        // Thread3 will be the third thread to call `get_with` for the same key. By
3915        // the time it calls, thread1's init closure should have finished already,
3916        // but the key still does not exist in the cache. So its init closure will be
3917        // evaluated and then an okay &str value will be returned. That value will be
3918        // inserted to the cache.
3919        let thread3 = {
3920            let cache3 = cache.clone();
3921            spawn(move || {
3922                // Wait for 400 ms before calling `try_get_with_by_ref`.
3923                sleep(Duration::from_millis(400));
3924                let v: MyResult<_> = cache3.try_get_with_by_ref(KEY, || {
3925                    // Wait for 300 ms and return an Ok(&str) value.
3926                    sleep(Duration::from_millis(300));
3927                    Ok("thread3")
3928                });
3929                assert_eq!(v.unwrap(), "thread3");
3930            })
3931        };
3932
3933        // thread4 will be the fourth thread to call `try_get_with_by_ref` for the
3934        // same key. So its init closure will not be evaluated. Once thread3's init
3935        // closure finishes, it will get the same okay &str value.
3936        let thread4 = {
3937            let cache4 = cache.clone();
3938            spawn(move || {
3939                // Wait for 500 ms before calling `try_get_with_by_ref`.
3940                sleep(Duration::from_millis(500));
3941                let v: MyResult<_> = cache4.try_get_with_by_ref(KEY, || unreachable!());
3942                assert_eq!(v.unwrap(), "thread3");
3943            })
3944        };
3945
3946        // Thread5 will be the fifth thread to call `try_get_with_by_ref` for the
3947        // same key. So its init closure will not be evaluated. By the time it calls,
3948        // thread3's init closure should have finished already, so its init closure
3949        // will not be evaluated and will get the value insert by thread3's init
3950        // closure immediately.
3951        let thread5 = {
3952            let cache5 = cache.clone();
3953            spawn(move || {
3954                // Wait for 800 ms before calling `try_get_with_by_ref`.
3955                sleep(Duration::from_millis(800));
3956                let v: MyResult<_> = cache5.try_get_with_by_ref(KEY, || unreachable!());
3957                assert_eq!(v.unwrap(), "thread3");
3958            })
3959        };
3960
3961        // Thread6 will call `get` for the same key. It will call when thread1's init
3962        // closure is still running, so it will get none for the key.
3963        let thread6 = {
3964            let cache6 = cache.clone();
3965            spawn(move || {
3966                // Wait for 200 ms before calling `get`.
3967                sleep(Duration::from_millis(200));
3968                let maybe_v = cache6.get(KEY);
3969                assert!(maybe_v.is_none());
3970            })
3971        };
3972
3973        // Thread7 will call `get` for the same key. It will call after thread1's init
3974        // closure finished with an error. So it will get none for the key.
3975        let thread7 = {
3976            let cache7 = cache.clone();
3977            spawn(move || {
3978                // Wait for 400 ms before calling `get`.
3979                sleep(Duration::from_millis(400));
3980                let maybe_v = cache7.get(KEY);
3981                assert!(maybe_v.is_none());
3982            })
3983        };
3984
3985        // Thread8 will call `get` for the same key. It will call after thread3's init
3986        // closure finished, so it will get the value insert by thread3's init closure.
3987        let thread8 = {
3988            let cache8 = cache.clone();
3989            spawn(move || {
3990                // Wait for 800 ms before calling `get`.
3991                sleep(Duration::from_millis(800));
3992                let maybe_v = cache8.get(KEY);
3993                assert_eq!(maybe_v, Some("thread3"));
3994            })
3995        };
3996
3997        for t in [
3998            thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8,
3999        ] {
4000            t.join().expect("Failed to join");
4001        }
4002
4003        assert!(cache.is_waiter_map_empty());
4004    }
4005
4006    #[test]
4007    fn optionally_get_with() {
4008        use std::thread::{sleep, spawn};
4009
4010        let cache = Cache::new(100);
4011        const KEY: u32 = 0;
4012
4013        // This test will run eight threads:
4014        //
4015        // Thread1 will be the first thread to call `optionally_get_with` for a key,
4016        // so its init closure will be evaluated and then an error will be returned.
4017        // Nothing will be inserted to the cache.
4018        let thread1 = {
4019            let cache1 = cache.clone();
4020            spawn(move || {
4021                // Call `optionally_get_with` immediately.
4022                let v = cache1.optionally_get_with(KEY, || {
4023                    // Wait for 300 ms and return an error.
4024                    sleep(Duration::from_millis(300));
4025                    None
4026                });
4027                assert!(v.is_none());
4028            })
4029        };
4030
4031        // Thread2 will be the second thread to call `optionally_get_with` for the
4032        // same key, so its init closure will not be evaluated. Once thread1's init
4033        // closure finishes, it will get the same error value returned by thread1's
4034        // init closure.
4035        let thread2 = {
4036            let cache2 = cache.clone();
4037            spawn(move || {
4038                // Wait for 100 ms before calling `optionally_get_with`.
4039                sleep(Duration::from_millis(100));
4040                let v = cache2.optionally_get_with(KEY, || unreachable!());
4041                assert!(v.is_none());
4042            })
4043        };
4044
4045        // Thread3 will be the third thread to call `get_with` for the same key. By
4046        // the time it calls, thread1's init closure should have finished already,
4047        // but the key still does not exist in the cache. So its init closure will be
4048        // evaluated and then an okay &str value will be returned. That value will be
4049        // inserted to the cache.
4050        let thread3 = {
4051            let cache3 = cache.clone();
4052            spawn(move || {
4053                // Wait for 400 ms before calling `optionally_get_with`.
4054                sleep(Duration::from_millis(400));
4055                let v = cache3.optionally_get_with(KEY, || {
4056                    // Wait for 300 ms and return an Ok(&str) value.
4057                    sleep(Duration::from_millis(300));
4058                    Some("thread3")
4059                });
4060                assert_eq!(v.unwrap(), "thread3");
4061            })
4062        };
4063
4064        // thread4 will be the fourth thread to call `optionally_get_with` for the
4065        // same key. So its init closure will not be evaluated. Once thread3's init
4066        // closure finishes, it will get the same okay &str value.
4067        let thread4 = {
4068            let cache4 = cache.clone();
4069            spawn(move || {
4070                // Wait for 500 ms before calling `optionally_get_with`.
4071                sleep(Duration::from_millis(500));
4072                let v = cache4.optionally_get_with(KEY, || unreachable!());
4073                assert_eq!(v.unwrap(), "thread3");
4074            })
4075        };
4076
4077        // Thread5 will be the fifth thread to call `optionally_get_with` for the
4078        // same key. So its init closure will not be evaluated. By the time it calls,
4079        // thread3's init closure should have finished already, so its init closure
4080        // will not be evaluated and will get the value insert by thread3's init
4081        // closure immediately.
4082        let thread5 = {
4083            let cache5 = cache.clone();
4084            spawn(move || {
4085                // Wait for 800 ms before calling `optionally_get_with`.
4086                sleep(Duration::from_millis(800));
4087                let v = cache5.optionally_get_with(KEY, || unreachable!());
4088                assert_eq!(v.unwrap(), "thread3");
4089            })
4090        };
4091
4092        // Thread6 will call `get` for the same key. It will call when thread1's init
4093        // closure is still running, so it will get none for the key.
4094        let thread6 = {
4095            let cache6 = cache.clone();
4096            spawn(move || {
4097                // Wait for 200 ms before calling `get`.
4098                sleep(Duration::from_millis(200));
4099                let maybe_v = cache6.get(&KEY);
4100                assert!(maybe_v.is_none());
4101            })
4102        };
4103
4104        // Thread7 will call `get` for the same key. It will call after thread1's init
4105        // closure finished with an error. So it will get none for the key.
4106        let thread7 = {
4107            let cache7 = cache.clone();
4108            spawn(move || {
4109                // Wait for 400 ms before calling `get`.
4110                sleep(Duration::from_millis(400));
4111                let maybe_v = cache7.get(&KEY);
4112                assert!(maybe_v.is_none());
4113            })
4114        };
4115
4116        // Thread8 will call `get` for the same key. It will call after thread3's init
4117        // closure finished, so it will get the value insert by thread3's init closure.
4118        let thread8 = {
4119            let cache8 = cache.clone();
4120            spawn(move || {
4121                // Wait for 800 ms before calling `get`.
4122                sleep(Duration::from_millis(800));
4123                let maybe_v = cache8.get(&KEY);
4124                assert_eq!(maybe_v, Some("thread3"));
4125            })
4126        };
4127
4128        for t in [
4129            thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8,
4130        ] {
4131            t.join().expect("Failed to join");
4132        }
4133
4134        assert!(cache.is_waiter_map_empty());
4135    }
4136
4137    #[test]
4138    fn optionally_get_with_by_ref() {
4139        use std::thread::{sleep, spawn};
4140
4141        let cache = Cache::new(100);
4142        const KEY: &u32 = &0;
4143
4144        // This test will run eight threads:
4145        //
4146        // Thread1 will be the first thread to call `optionally_get_with_by_ref` for
4147        // a key, so its init closure will be evaluated and then an error will be
4148        // returned. Nothing will be inserted to the cache.
4149        let thread1 = {
4150            let cache1 = cache.clone();
4151            spawn(move || {
4152                // Call `optionally_get_with_by_ref` immediately.
4153                let v = cache1.optionally_get_with_by_ref(KEY, || {
4154                    // Wait for 300 ms and return an error.
4155                    sleep(Duration::from_millis(300));
4156                    None
4157                });
4158                assert!(v.is_none());
4159            })
4160        };
4161
4162        // Thread2 will be the second thread to call `optionally_get_with_by_ref` for
4163        // the same key, so its init closure will not be evaluated. Once thread1's
4164        // init closure finishes, it will get the same error value returned by
4165        // thread1's init closure.
4166        let thread2 = {
4167            let cache2 = cache.clone();
4168            spawn(move || {
4169                // Wait for 100 ms before calling `optionally_get_with_by_ref`.
4170                sleep(Duration::from_millis(100));
4171                let v = cache2.optionally_get_with_by_ref(KEY, || unreachable!());
4172                assert!(v.is_none());
4173            })
4174        };
4175
4176        // Thread3 will be the third thread to call `get_with` for the same key. By
4177        // the time it calls, thread1's init closure should have finished already,
4178        // but the key still does not exist in the cache. So its init closure will be
4179        // evaluated and then an okay &str value will be returned. That value will be
4180        // inserted to the cache.
4181        let thread3 = {
4182            let cache3 = cache.clone();
4183            spawn(move || {
4184                // Wait for 400 ms before calling `optionally_get_with_by_ref`.
4185                sleep(Duration::from_millis(400));
4186                let v = cache3.optionally_get_with_by_ref(KEY, || {
4187                    // Wait for 300 ms and return an Ok(&str) value.
4188                    sleep(Duration::from_millis(300));
4189                    Some("thread3")
4190                });
4191                assert_eq!(v.unwrap(), "thread3");
4192            })
4193        };
4194
4195        // thread4 will be the fourth thread to call `optionally_get_with_by_ref` for
4196        // the same key. So its init closure will not be evaluated. Once thread3's
4197        // init closure finishes, it will get the same okay &str value.
4198        let thread4 = {
4199            let cache4 = cache.clone();
4200            spawn(move || {
4201                // Wait for 500 ms before calling `optionally_get_with_by_ref`.
4202                sleep(Duration::from_millis(500));
4203                let v = cache4.optionally_get_with_by_ref(KEY, || unreachable!());
4204                assert_eq!(v.unwrap(), "thread3");
4205            })
4206        };
4207
4208        // Thread5 will be the fifth thread to call `optionally_get_with_by_ref` for
4209        // the same key. So its init closure will not be evaluated. By the time it
4210        // calls, thread3's init closure should have finished already, so its init
4211        // closure will not be evaluated and will get the value insert by thread3's
4212        // init closure immediately.
4213        let thread5 = {
4214            let cache5 = cache.clone();
4215            spawn(move || {
4216                // Wait for 800 ms before calling `optionally_get_with_by_ref`.
4217                sleep(Duration::from_millis(800));
4218                let v = cache5.optionally_get_with_by_ref(KEY, || unreachable!());
4219                assert_eq!(v.unwrap(), "thread3");
4220            })
4221        };
4222
4223        // Thread6 will call `get` for the same key. It will call when thread1's init
4224        // closure is still running, so it will get none for the key.
4225        let thread6 = {
4226            let cache6 = cache.clone();
4227            spawn(move || {
4228                // Wait for 200 ms before calling `get`.
4229                sleep(Duration::from_millis(200));
4230                let maybe_v = cache6.get(KEY);
4231                assert!(maybe_v.is_none());
4232            })
4233        };
4234
4235        // Thread7 will call `get` for the same key. It will call after thread1's init
4236        // closure finished with an error. So it will get none for the key.
4237        let thread7 = {
4238            let cache7 = cache.clone();
4239            spawn(move || {
4240                // Wait for 400 ms before calling `get`.
4241                sleep(Duration::from_millis(400));
4242                let maybe_v = cache7.get(KEY);
4243                assert!(maybe_v.is_none());
4244            })
4245        };
4246
4247        // Thread8 will call `get` for the same key. It will call after thread3's init
4248        // closure finished, so it will get the value insert by thread3's init closure.
4249        let thread8 = {
4250            let cache8 = cache.clone();
4251            spawn(move || {
4252                // Wait for 800 ms before calling `get`.
4253                sleep(Duration::from_millis(800));
4254                let maybe_v = cache8.get(KEY);
4255                assert_eq!(maybe_v, Some("thread3"));
4256            })
4257        };
4258
4259        for t in [
4260            thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8,
4261        ] {
4262            t.join().expect("Failed to join");
4263        }
4264
4265        assert!(cache.is_waiter_map_empty());
4266    }
4267
4268    #[test]
4269    fn upsert_with() {
4270        use std::thread::{sleep, spawn};
4271
4272        let cache = Cache::new(100);
4273        const KEY: u32 = 0;
4274
4275        // Spawn three threads to call `and_upsert_with` for the same key and each
4276        // task increments the current value by 1. Ensure the key-level lock is
4277        // working by verifying the value is 3 after all threads finish.
4278        //
4279        // |        | thread 1 | thread 2 | thread 3 |
4280        // |--------|----------|----------|----------|
4281        // |   0 ms | get none |          |          |
4282        // | 100 ms |          | blocked  |          |
4283        // | 200 ms | insert 1 |          |          |
4284        // |        |          | get 1    |          |
4285        // | 300 ms |          |          | blocked  |
4286        // | 400 ms |          | insert 2 |          |
4287        // |        |          |          | get 2    |
4288        // | 500 ms |          |          | insert 3 |
4289
4290        let thread1 = {
4291            let cache1 = cache.clone();
4292            spawn(move || {
4293                cache1.entry(KEY).and_upsert_with(|maybe_entry| {
4294                    sleep(Duration::from_millis(200));
4295                    assert!(maybe_entry.is_none());
4296                    1
4297                })
4298            })
4299        };
4300
4301        let thread2 = {
4302            let cache2 = cache.clone();
4303            spawn(move || {
4304                sleep(Duration::from_millis(100));
4305                cache2.entry_by_ref(&KEY).and_upsert_with(|maybe_entry| {
4306                    sleep(Duration::from_millis(200));
4307                    let entry = maybe_entry.expect("The entry should exist");
4308                    entry.into_value() + 1
4309                })
4310            })
4311        };
4312
4313        let thread3 = {
4314            let cache3 = cache.clone();
4315            spawn(move || {
4316                sleep(Duration::from_millis(300));
4317                cache3.entry_by_ref(&KEY).and_upsert_with(|maybe_entry| {
4318                    sleep(Duration::from_millis(100));
4319                    let entry = maybe_entry.expect("The entry should exist");
4320                    entry.into_value() + 1
4321                })
4322            })
4323        };
4324
4325        let ent1 = thread1.join().expect("Thread 1 should finish");
4326        let ent2 = thread2.join().expect("Thread 2 should finish");
4327        let ent3 = thread3.join().expect("Thread 3 should finish");
4328        assert_eq!(ent1.into_value(), 1);
4329        assert_eq!(ent2.into_value(), 2);
4330        assert_eq!(ent3.into_value(), 3);
4331
4332        assert_eq!(cache.get(&KEY), Some(3));
4333
4334        assert!(cache.is_waiter_map_empty());
4335    }
4336
4337    #[test]
4338    fn compute_with() {
4339        use crate::ops::compute;
4340        use std::{
4341            sync::RwLock,
4342            thread::{sleep, spawn},
4343        };
4344
4345        let cache = Cache::new(100);
4346        const KEY: u32 = 0;
4347
4348        // Spawn six threads to call `and_compute_with` for the same key. Ensure the
4349        // key-level lock is working by verifying the value after all threads finish.
4350        //
4351        // |         |  thread 1  |   thread 2    |  thread 3  | thread 4 |  thread 5  | thread 6 |
4352        // |---------|------------|---------------|------------|----------|------------|----------|
4353        // |    0 ms | get none   |               |            |          |            |          |
4354        // |  100 ms |            | blocked       |            |          |            |          |
4355        // |  200 ms | insert [1] |               |            |          |            |          |
4356        // |         |            | get [1]       |            |          |            |          |
4357        // |  300 ms |            |               | blocked    |          |            |          |
4358        // |  400 ms |            | insert [1, 2] |            |          |            |          |
4359        // |         |            |               | get [1, 2] |          |            |          |
4360        // |  500 ms |            |               |            | blocked  |            |          |
4361        // |  600 ms |            |               | remove     |          |            |          |
4362        // |         |            |               |            | get none |            |          |
4363        // |  700 ms |            |               |            |          | blocked    |          |
4364        // |  800 ms |            |               |            | nop      |            |          |
4365        // |         |            |               |            |          | get none   |          |
4366        // |  900 ms |            |               |            |          |            | blocked  |
4367        // | 1000 ms |            |               |            |          | insert [5] |          |
4368        // |         |            |               |            |          |            | get [5]  |
4369        // | 1100 ms |            |               |            |          |            | nop      |
4370
4371        let thread1 = {
4372            let cache1 = cache.clone();
4373            spawn(move || {
4374                cache1.entry(KEY).and_compute_with(|maybe_entry| {
4375                    sleep(Duration::from_millis(200));
4376                    assert!(maybe_entry.is_none());
4377                    compute::Op::Put(Arc::new(RwLock::new(vec![1])))
4378                })
4379            })
4380        };
4381
4382        let thread2 = {
4383            let cache2 = cache.clone();
4384            spawn(move || {
4385                sleep(Duration::from_millis(100));
4386                cache2.entry_by_ref(&KEY).and_compute_with(|maybe_entry| {
4387                    let entry = maybe_entry.expect("The entry should exist");
4388                    let value = entry.into_value();
4389                    assert_eq!(*value.read().unwrap(), vec![1]);
4390                    sleep(Duration::from_millis(200));
4391                    value.write().unwrap().push(2);
4392                    compute::Op::Put(value)
4393                })
4394            })
4395        };
4396
4397        let thread3 = {
4398            let cache3 = cache.clone();
4399            spawn(move || {
4400                sleep(Duration::from_millis(300));
4401                cache3.entry(KEY).and_compute_with(|maybe_entry| {
4402                    let entry = maybe_entry.expect("The entry should exist");
4403                    let value = entry.into_value();
4404                    assert_eq!(*value.read().unwrap(), vec![1, 2]);
4405                    sleep(Duration::from_millis(200));
4406                    compute::Op::Remove
4407                })
4408            })
4409        };
4410
4411        let thread4 = {
4412            let cache4 = cache.clone();
4413            spawn(move || {
4414                sleep(Duration::from_millis(500));
4415                cache4.entry(KEY).and_compute_with(|maybe_entry| {
4416                    assert!(maybe_entry.is_none());
4417                    sleep(Duration::from_millis(200));
4418                    compute::Op::Nop
4419                })
4420            })
4421        };
4422
4423        let thread5 = {
4424            let cache5 = cache.clone();
4425            spawn(move || {
4426                sleep(Duration::from_millis(700));
4427                cache5.entry_by_ref(&KEY).and_compute_with(|maybe_entry| {
4428                    assert!(maybe_entry.is_none());
4429                    sleep(Duration::from_millis(200));
4430                    compute::Op::Put(Arc::new(RwLock::new(vec![5])))
4431                })
4432            })
4433        };
4434
4435        let thread6 = {
4436            let cache6 = cache.clone();
4437            spawn(move || {
4438                sleep(Duration::from_millis(900));
4439                cache6.entry_by_ref(&KEY).and_compute_with(|maybe_entry| {
4440                    let entry = maybe_entry.expect("The entry should exist");
4441                    let value = entry.into_value();
4442                    assert_eq!(*value.read().unwrap(), vec![5]);
4443                    sleep(Duration::from_millis(100));
4444                    compute::Op::Nop
4445                })
4446            })
4447        };
4448
4449        let res1 = thread1.join().expect("Thread 1 should finish");
4450        let res2 = thread2.join().expect("Thread 2 should finish");
4451        let res3 = thread3.join().expect("Thread 3 should finish");
4452        let res4 = thread4.join().expect("Thread 4 should finish");
4453        let res5 = thread5.join().expect("Thread 5 should finish");
4454        let res6 = thread6.join().expect("Thread 6 should finish");
4455
4456        let compute::CompResult::Inserted(entry) = res1 else {
4457            panic!("Expected `Inserted`. Got {res1:?}")
4458        };
4459        assert_eq!(
4460            *entry.into_value().read().unwrap(),
4461            vec![1, 2] // The same Vec was modified by task2.
4462        );
4463
4464        let compute::CompResult::ReplacedWith(entry) = res2 else {
4465            panic!("Expected `ReplacedWith`. Got {res2:?}")
4466        };
4467        assert_eq!(*entry.into_value().read().unwrap(), vec![1, 2]);
4468
4469        let compute::CompResult::Removed(entry) = res3 else {
4470            panic!("Expected `Removed`. Got {res3:?}")
4471        };
4472        assert_eq!(*entry.into_value().read().unwrap(), vec![1, 2]);
4473
4474        let compute::CompResult::StillNone(key) = res4 else {
4475            panic!("Expected `StillNone`. Got {res4:?}")
4476        };
4477        assert_eq!(*key, KEY);
4478
4479        let compute::CompResult::Inserted(entry) = res5 else {
4480            panic!("Expected `Inserted`. Got {res5:?}")
4481        };
4482        assert_eq!(*entry.into_value().read().unwrap(), vec![5]);
4483
4484        let compute::CompResult::Unchanged(entry) = res6 else {
4485            panic!("Expected `Unchanged`. Got {res6:?}")
4486        };
4487        assert_eq!(*entry.into_value().read().unwrap(), vec![5]);
4488
4489        assert!(cache.is_waiter_map_empty());
4490    }
4491
4492    #[test]
4493    fn try_compute_with() {
4494        use crate::ops::compute;
4495        use std::{
4496            sync::RwLock,
4497            thread::{sleep, spawn},
4498        };
4499
4500        let cache: Cache<u32, Arc<RwLock<Vec<i32>>>> = Cache::new(100);
4501        const KEY: u32 = 0;
4502
4503        // Spawn four threads to call `and_try_compute_with` for the same key. Ensure
4504        // the key-level lock is working by verifying the value after all threads
4505        // finish.
4506        //
4507        // |         |  thread 1  |   thread 2    |  thread 3  | thread 4   |
4508        // |---------|------------|---------------|------------|------------|
4509        // |    0 ms | get none   |               |            |            |
4510        // |  100 ms |            | blocked       |            |            |
4511        // |  200 ms | insert [1] |               |            |            |
4512        // |         |            | get [1]       |            |            |
4513        // |  300 ms |            |               | blocked    |            |
4514        // |  400 ms |            | insert [1, 2] |            |            |
4515        // |         |            |               | get [1, 2] |            |
4516        // |  500 ms |            |               |            | blocked    |
4517        // |  600 ms |            |               | err        |            |
4518        // |         |            |               |            | get [1, 2] |
4519        // |  700 ms |            |               |            | remove     |
4520        //
4521        // This test is shorter than `compute_with` test because this one omits `Nop`
4522        // cases.
4523
4524        let thread1 = {
4525            let cache1 = cache.clone();
4526            spawn(move || {
4527                cache1.entry(KEY).and_try_compute_with(|maybe_entry| {
4528                    sleep(Duration::from_millis(200));
4529                    assert!(maybe_entry.is_none());
4530                    Ok(compute::Op::Put(Arc::new(RwLock::new(vec![1])))) as Result<_, ()>
4531                })
4532            })
4533        };
4534
4535        let thread2 = {
4536            let cache2 = cache.clone();
4537            spawn(move || {
4538                sleep(Duration::from_millis(100));
4539                cache2
4540                    .entry_by_ref(&KEY)
4541                    .and_try_compute_with(|maybe_entry| {
4542                        let entry = maybe_entry.expect("The entry should exist");
4543                        let value = entry.into_value();
4544                        assert_eq!(*value.read().unwrap(), vec![1]);
4545                        sleep(Duration::from_millis(200));
4546                        value.write().unwrap().push(2);
4547                        Ok(compute::Op::Put(value)) as Result<_, ()>
4548                    })
4549            })
4550        };
4551
4552        let thread3 = {
4553            let cache3 = cache.clone();
4554            spawn(move || {
4555                sleep(Duration::from_millis(300));
4556                cache3.entry(KEY).and_try_compute_with(|maybe_entry| {
4557                    let entry = maybe_entry.expect("The entry should exist");
4558                    let value = entry.into_value();
4559                    assert_eq!(*value.read().unwrap(), vec![1, 2]);
4560                    sleep(Duration::from_millis(200));
4561                    Err(())
4562                })
4563            })
4564        };
4565
4566        let thread4 = {
4567            let cache4 = cache.clone();
4568            spawn(move || {
4569                sleep(Duration::from_millis(500));
4570                cache4.entry(KEY).and_try_compute_with(|maybe_entry| {
4571                    let entry = maybe_entry.expect("The entry should exist");
4572                    let value = entry.into_value();
4573                    assert_eq!(*value.read().unwrap(), vec![1, 2]);
4574                    sleep(Duration::from_millis(100));
4575                    Ok(compute::Op::Remove) as Result<_, ()>
4576                })
4577            })
4578        };
4579
4580        let res1 = thread1.join().expect("Thread 1 should finish");
4581        let res2 = thread2.join().expect("Thread 2 should finish");
4582        let res3 = thread3.join().expect("Thread 3 should finish");
4583        let res4 = thread4.join().expect("Thread 4 should finish");
4584
4585        let Ok(compute::CompResult::Inserted(entry)) = res1 else {
4586            panic!("Expected `Inserted`. Got {res1:?}")
4587        };
4588        assert_eq!(
4589            *entry.into_value().read().unwrap(),
4590            vec![1, 2] // The same Vec was modified by task2.
4591        );
4592
4593        let Ok(compute::CompResult::ReplacedWith(entry)) = res2 else {
4594            panic!("Expected `ReplacedWith`. Got {res2:?}")
4595        };
4596        assert_eq!(*entry.into_value().read().unwrap(), vec![1, 2]);
4597
4598        assert!(res3.is_err());
4599
4600        let Ok(compute::CompResult::Removed(entry)) = res4 else {
4601            panic!("Expected `Removed`. Got {res4:?}")
4602        };
4603        assert_eq!(
4604            *entry.into_value().read().unwrap(),
4605            vec![1, 2] // Removed value.
4606        );
4607
4608        assert!(cache.is_waiter_map_empty());
4609    }
4610
4611    #[test]
4612    // https://github.com/moka-rs/moka/issues/43
4613    fn handle_panic_in_get_with() {
4614        use std::{sync::Barrier, thread};
4615
4616        let cache = Cache::new(16);
4617        let barrier = Arc::new(Barrier::new(2));
4618        {
4619            let cache_ref = cache.clone();
4620            let barrier_ref = barrier.clone();
4621            thread::spawn(move || {
4622                let _ = cache_ref.get_with(1, || {
4623                    barrier_ref.wait();
4624                    thread::sleep(Duration::from_millis(50));
4625                    panic!("Panic during get_with");
4626                });
4627            });
4628        }
4629
4630        barrier.wait();
4631        assert_eq!(cache.get_with(1, || 5), 5);
4632
4633        assert!(cache.is_waiter_map_empty());
4634    }
4635
4636    #[test]
4637    // https://github.com/moka-rs/moka/issues/43
4638    fn handle_panic_in_try_get_with() {
4639        use std::{sync::Barrier, thread};
4640
4641        let cache = Cache::new(16);
4642        let barrier = Arc::new(Barrier::new(2));
4643        {
4644            let cache_ref = cache.clone();
4645            let barrier_ref = barrier.clone();
4646            thread::spawn(move || {
4647                let _ = cache_ref.try_get_with(1, || {
4648                    barrier_ref.wait();
4649                    thread::sleep(Duration::from_millis(50));
4650                    panic!("Panic during try_get_with");
4651                }) as Result<_, Arc<Infallible>>;
4652            });
4653        }
4654
4655        barrier.wait();
4656        assert_eq!(
4657            cache.try_get_with(1, || Ok(5)) as Result<_, Arc<Infallible>>,
4658            Ok(5)
4659        );
4660
4661        assert!(cache.is_waiter_map_empty());
4662    }
4663
4664    #[test]
4665    fn test_removal_notifications() {
4666        // The following `Vec`s will hold actual and expected notifications.
4667        let actual = Arc::new(Mutex::new(Vec::new()));
4668        let mut expected = Vec::new();
4669
4670        // Create an eviction listener.
4671        let a1 = Arc::clone(&actual);
4672        let listener = move |k, v, cause| a1.lock().push((k, v, cause));
4673
4674        // Create a cache with the eviction listener.
4675        let mut cache = Cache::builder()
4676            .max_capacity(3)
4677            .eviction_listener(listener)
4678            .build();
4679        cache.reconfigure_for_testing();
4680
4681        // Make the cache exterior immutable.
4682        let cache = cache;
4683
4684        cache.insert('a', "alice");
4685        cache.invalidate(&'a');
4686        expected.push((Arc::new('a'), "alice", RemovalCause::Explicit));
4687
4688        cache.run_pending_tasks();
4689        assert_eq!(cache.entry_count(), 0);
4690
4691        cache.insert('b', "bob");
4692        cache.insert('c', "cathy");
4693        cache.insert('d', "david");
4694        cache.run_pending_tasks();
4695        assert_eq!(cache.entry_count(), 3);
4696
4697        // This will be rejected due to the size constraint.
4698        cache.insert('e', "emily");
4699        expected.push((Arc::new('e'), "emily", RemovalCause::Size));
4700        cache.run_pending_tasks();
4701        assert_eq!(cache.entry_count(), 3);
4702
4703        // Raise the popularity of 'e' so it will be accepted next time.
4704        cache.get(&'e');
4705        cache.run_pending_tasks();
4706
4707        // Retry.
4708        cache.insert('e', "eliza");
4709        // and the LRU entry will be evicted.
4710        expected.push((Arc::new('b'), "bob", RemovalCause::Size));
4711        cache.run_pending_tasks();
4712        assert_eq!(cache.entry_count(), 3);
4713
4714        // Replace an existing entry.
4715        cache.insert('d', "dennis");
4716        expected.push((Arc::new('d'), "david", RemovalCause::Replaced));
4717        cache.run_pending_tasks();
4718        assert_eq!(cache.entry_count(), 3);
4719
4720        verify_notification_vec(&cache, actual, &expected);
4721    }
4722
4723    #[test]
4724    fn test_immediate_removal_notifications_with_updates() {
4725        // The following `Vec` will hold actual notifications.
4726        let actual = Arc::new(Mutex::new(Vec::new()));
4727
4728        // Create an eviction listener.
4729        let a1 = Arc::clone(&actual);
4730        let listener = move |k, v, cause| a1.lock().push((k, v, cause));
4731
4732        let (clock, mock) = Clock::mock();
4733
4734        // Create a cache with the eviction listener and also TTL and TTI.
4735        let mut cache = Cache::builder()
4736            .eviction_listener(listener)
4737            .time_to_live(Duration::from_secs(7))
4738            .time_to_idle(Duration::from_secs(5))
4739            .clock(clock)
4740            .build();
4741        cache.reconfigure_for_testing();
4742
4743        // Make the cache exterior immutable.
4744        let cache = cache;
4745
4746        cache.insert("alice", "a0");
4747        cache.run_pending_tasks();
4748
4749        // Now alice (a0) has been expired by the idle timeout (TTI).
4750        mock.increment(Duration::from_secs(6));
4751        assert_eq!(cache.get(&"alice"), None);
4752
4753        // We have not ran sync after the expiration of alice (a0), so it is
4754        // still in the cache.
4755        assert_eq!(cache.entry_count(), 1);
4756
4757        // Re-insert alice with a different value. Since alice (a0) is still
4758        // in the cache, this is actually a replace operation rather than an
4759        // insert operation. We want to verify that the RemovalCause of a0 is
4760        // Expired, not Replaced.
4761        cache.insert("alice", "a1");
4762        {
4763            let mut a = actual.lock();
4764            assert_eq!(a.len(), 1);
4765            assert_eq!(a[0], (Arc::new("alice"), "a0", RemovalCause::Expired));
4766            a.clear();
4767        }
4768
4769        cache.run_pending_tasks();
4770
4771        mock.increment(Duration::from_secs(4));
4772        assert_eq!(cache.get(&"alice"), Some("a1"));
4773        cache.run_pending_tasks();
4774
4775        // Now alice has been expired by time-to-live (TTL).
4776        mock.increment(Duration::from_secs(4));
4777        assert_eq!(cache.get(&"alice"), None);
4778
4779        // But, again, it is still in the cache.
4780        assert_eq!(cache.entry_count(), 1);
4781
4782        // Re-insert alice with a different value and verify that the
4783        // RemovalCause of a1 is Expired (not Replaced).
4784        cache.insert("alice", "a2");
4785        {
4786            let mut a = actual.lock();
4787            assert_eq!(a.len(), 1);
4788            assert_eq!(a[0], (Arc::new("alice"), "a1", RemovalCause::Expired));
4789            a.clear();
4790        }
4791
4792        cache.run_pending_tasks();
4793
4794        assert_eq!(cache.entry_count(), 1);
4795
4796        // Now alice (a2) has been expired by the idle timeout.
4797        mock.increment(Duration::from_secs(6));
4798        assert_eq!(cache.get(&"alice"), None);
4799        assert_eq!(cache.entry_count(), 1);
4800
4801        // This invalidate will internally remove alice (a2).
4802        cache.invalidate(&"alice");
4803        cache.run_pending_tasks();
4804        assert_eq!(cache.entry_count(), 0);
4805
4806        {
4807            let mut a = actual.lock();
4808            assert_eq!(a.len(), 1);
4809            assert_eq!(a[0], (Arc::new("alice"), "a2", RemovalCause::Expired));
4810            a.clear();
4811        }
4812
4813        // Re-insert, and this time, make it expired by the TTL.
4814        cache.insert("alice", "a3");
4815        cache.run_pending_tasks();
4816        mock.increment(Duration::from_secs(4));
4817        assert_eq!(cache.get(&"alice"), Some("a3"));
4818        cache.run_pending_tasks();
4819        mock.increment(Duration::from_secs(4));
4820        assert_eq!(cache.get(&"alice"), None);
4821        assert_eq!(cache.entry_count(), 1);
4822
4823        // This invalidate will internally remove alice (a2).
4824        cache.invalidate(&"alice");
4825        cache.run_pending_tasks();
4826
4827        assert_eq!(cache.entry_count(), 0);
4828
4829        {
4830            let mut a = actual.lock();
4831            assert_eq!(a.len(), 1);
4832            assert_eq!(a[0], (Arc::new("alice"), "a3", RemovalCause::Expired));
4833            a.clear();
4834        }
4835
4836        assert!(cache.key_locks_map_is_empty());
4837    }
4838
4839    // This test ensures the key-level lock for the immediate notification
4840    // delivery mode is working so that the notifications for a given key
4841    // should always be ordered. This is true even if multiple client threads
4842    // try to modify the entries for the key at the same time. (This test will
4843    // run three client threads)
4844    //
4845    // This test is ignored by default. It becomes unstable when run in parallel
4846    // with other tests.
4847    #[test]
4848    #[cfg_attr(not(run_flaky_tests), ignore)]
4849    fn test_key_lock_used_by_immediate_removal_notifications() {
4850        use std::thread::{sleep, spawn};
4851
4852        const KEY: &str = "alice";
4853
4854        type Val = &'static str;
4855
4856        #[derive(PartialEq, Eq, Debug)]
4857        enum Event {
4858            Insert(Val),
4859            Invalidate(Val),
4860            BeginNotify(Val, RemovalCause),
4861            EndNotify(Val, RemovalCause),
4862        }
4863
4864        // The following `Vec will hold actual notifications.
4865        let actual = Arc::new(Mutex::new(Vec::new()));
4866
4867        // Create an eviction listener.
4868        // Note that this listener is slow and will take 300 ms to complete.
4869        let a0 = Arc::clone(&actual);
4870        let listener = move |_k, v, cause| {
4871            a0.lock().push(Event::BeginNotify(v, cause));
4872            sleep(Duration::from_millis(300));
4873            a0.lock().push(Event::EndNotify(v, cause));
4874        };
4875
4876        // Create a cache with the eviction listener and also TTL 500 ms.
4877        let mut cache = Cache::builder()
4878            .eviction_listener(listener)
4879            .time_to_live(Duration::from_millis(500))
4880            .build();
4881        cache.reconfigure_for_testing();
4882
4883        // Make the cache exterior immutable.
4884        let cache = cache;
4885
4886        // - Notifications for the same key must not overlap.
4887
4888        // Time  Event
4889        // ----- -------------------------------------
4890        // 0000: Insert value a0
4891        // 0500: a0 expired
4892        // 0600: Insert value a1 -> expired a0 (N-A0)
4893        // 0800: Insert value a2 (waiting) (A-A2)
4894        // 0900: N-A0 processed
4895        //       A-A2 finished waiting -> replace a1 (N-A1)
4896        // 1100: Invalidate (waiting) (R-A2)
4897        // 1200: N-A1 processed
4898        //       R-A2 finished waiting -> explicit a2 (N-A2)
4899        // 1500: N-A2 processed
4900
4901        let expected = vec![
4902            Event::Insert("a0"),
4903            Event::Insert("a1"),
4904            Event::BeginNotify("a0", RemovalCause::Expired),
4905            Event::Insert("a2"),
4906            Event::EndNotify("a0", RemovalCause::Expired),
4907            Event::BeginNotify("a1", RemovalCause::Replaced),
4908            Event::Invalidate("a2"),
4909            Event::EndNotify("a1", RemovalCause::Replaced),
4910            Event::BeginNotify("a2", RemovalCause::Explicit),
4911            Event::EndNotify("a2", RemovalCause::Explicit),
4912        ];
4913
4914        // 0000: Insert value a0
4915        actual.lock().push(Event::Insert("a0"));
4916        cache.insert(KEY, "a0");
4917        // Call `sync` to set the last modified for the KEY immediately so that
4918        // this entry should expire in 1000 ms from now.
4919        cache.run_pending_tasks();
4920
4921        // 0500: Insert value a1 -> expired a0 (N-A0)
4922        let thread1 = {
4923            let a1 = Arc::clone(&actual);
4924            let c1 = cache.clone();
4925            spawn(move || {
4926                sleep(Duration::from_millis(600));
4927                a1.lock().push(Event::Insert("a1"));
4928                c1.insert(KEY, "a1");
4929            })
4930        };
4931
4932        // 0800: Insert value a2 (waiting) (A-A2)
4933        let thread2 = {
4934            let a2 = Arc::clone(&actual);
4935            let c2 = cache.clone();
4936            spawn(move || {
4937                sleep(Duration::from_millis(800));
4938                a2.lock().push(Event::Insert("a2"));
4939                c2.insert(KEY, "a2");
4940            })
4941        };
4942
4943        // 1100: Invalidate (waiting) (R-A2)
4944        let thread3 = {
4945            let a3 = Arc::clone(&actual);
4946            let c3 = cache.clone();
4947            spawn(move || {
4948                sleep(Duration::from_millis(1100));
4949                a3.lock().push(Event::Invalidate("a2"));
4950                c3.invalidate(&KEY);
4951            })
4952        };
4953
4954        for t in [thread1, thread2, thread3] {
4955            t.join().expect("Failed to join");
4956        }
4957
4958        let actual = actual.lock();
4959        assert_eq!(actual.len(), expected.len());
4960
4961        for (i, (actual, expected)) in actual.iter().zip(&expected).enumerate() {
4962            assert_eq!(actual, expected, "expected[{i}]");
4963        }
4964
4965        assert!(cache.key_locks_map_is_empty());
4966    }
4967
4968    // When the eviction listener is not set, calling `run_pending_tasks` once should
4969    // evict all entries that can be removed.
4970    #[test]
4971    fn no_batch_size_limit_on_eviction() {
4972        const MAX_CAPACITY: u64 = 20;
4973
4974        const EVICTION_TIMEOUT: Duration = Duration::from_nanos(0);
4975        const MAX_LOG_SYNC_REPEATS: u32 = 1;
4976        const EVICTION_BATCH_SIZE: u32 = 1;
4977
4978        let hk_conf = HousekeeperConfig::new(
4979            // Timeout should be ignored when the eviction listener is not provided.
4980            Some(EVICTION_TIMEOUT),
4981            Some(MAX_LOG_SYNC_REPEATS),
4982            Some(EVICTION_BATCH_SIZE),
4983        );
4984
4985        // Create a cache with the LRU policy.
4986        let mut cache = Cache::builder()
4987            .max_capacity(MAX_CAPACITY)
4988            .eviction_policy(EvictionPolicy::lru())
4989            .housekeeper_config(hk_conf)
4990            .build();
4991        cache.reconfigure_for_testing();
4992
4993        // Make the cache exterior immutable.
4994        let cache = cache;
4995
4996        // Fill the cache.
4997        for i in 0..MAX_CAPACITY {
4998            let v = format!("v{i}");
4999            cache.insert(i, v)
5000        }
5001        // The max capacity should not change because we have not called
5002        // `run_pending_tasks` yet.
5003        assert_eq!(cache.entry_count(), 0);
5004
5005        cache.run_pending_tasks();
5006        assert_eq!(cache.entry_count(), MAX_CAPACITY);
5007
5008        // Insert more items to the cache.
5009        for i in MAX_CAPACITY..(MAX_CAPACITY * 2) {
5010            let v = format!("v{i}");
5011            cache.insert(i, v)
5012        }
5013        // The max capacity should not change because we have not called
5014        // `run_pending_tasks` yet.
5015        assert_eq!(cache.entry_count(), MAX_CAPACITY);
5016        // Both old and new keys should exist.
5017        assert!(cache.contains_key(&0)); // old
5018        assert!(cache.contains_key(&(MAX_CAPACITY - 1))); // old
5019        assert!(cache.contains_key(&(MAX_CAPACITY * 2 - 1))); // new
5020
5021        // Process the remaining write op logs (there should be MAX_CAPACITY logs),
5022        // and evict the LRU entries.
5023        cache.run_pending_tasks();
5024        assert_eq!(cache.entry_count(), MAX_CAPACITY);
5025
5026        // Now all the old keys should be gone.
5027        assert!(!cache.contains_key(&0));
5028        assert!(!cache.contains_key(&(MAX_CAPACITY - 1)));
5029        // And the new keys should exist.
5030        assert!(cache.contains_key(&(MAX_CAPACITY * 2 - 1)));
5031    }
5032
5033    #[test]
5034    fn slow_eviction_listener() {
5035        const MAX_CAPACITY: u64 = 20;
5036
5037        const EVICTION_TIMEOUT: Duration = Duration::from_millis(30);
5038        const LISTENER_DELAY: Duration = Duration::from_millis(11);
5039        const MAX_LOG_SYNC_REPEATS: u32 = 1;
5040        const EVICTION_BATCH_SIZE: u32 = 1;
5041
5042        let hk_conf = HousekeeperConfig::new(
5043            Some(EVICTION_TIMEOUT),
5044            Some(MAX_LOG_SYNC_REPEATS),
5045            Some(EVICTION_BATCH_SIZE),
5046        );
5047
5048        let (clock, mock) = Clock::mock();
5049        let listener_call_count = Arc::new(AtomicU8::new(0));
5050        let lcc = Arc::clone(&listener_call_count);
5051
5052        // A slow eviction listener that spend `LISTENER_DELAY` to process a removal
5053        // notification.
5054        let listener = move |_k, _v, _cause| {
5055            mock.increment(LISTENER_DELAY);
5056            lcc.fetch_add(1, Ordering::AcqRel);
5057        };
5058
5059        // Create a cache with the LRU policy.
5060        let mut cache = Cache::builder()
5061            .max_capacity(MAX_CAPACITY)
5062            .eviction_policy(EvictionPolicy::lru())
5063            .eviction_listener(listener)
5064            .housekeeper_config(hk_conf)
5065            .clock(clock)
5066            .build();
5067        cache.reconfigure_for_testing();
5068
5069        // Make the cache exterior immutable.
5070        let cache = cache;
5071
5072        // Fill the cache.
5073        for i in 0..MAX_CAPACITY {
5074            let v = format!("v{i}");
5075            cache.insert(i, v)
5076        }
5077        // The max capacity should not change because we have not called
5078        // `run_pending_tasks` yet.
5079        assert_eq!(cache.entry_count(), 0);
5080
5081        cache.run_pending_tasks();
5082        assert_eq!(listener_call_count.load(Ordering::Acquire), 0);
5083        assert_eq!(cache.entry_count(), MAX_CAPACITY);
5084
5085        // Insert more items to the cache.
5086        for i in MAX_CAPACITY..(MAX_CAPACITY * 2) {
5087            let v = format!("v{i}");
5088            cache.insert(i, v);
5089        }
5090        assert_eq!(cache.entry_count(), MAX_CAPACITY);
5091
5092        cache.run_pending_tasks();
5093        // Because of the slow listener, cache should get an over capacity.
5094        let mut expected_call_count = 3;
5095        assert_eq!(
5096            listener_call_count.load(Ordering::Acquire) as u64,
5097            expected_call_count
5098        );
5099        assert_eq!(cache.entry_count(), MAX_CAPACITY * 2 - expected_call_count);
5100
5101        loop {
5102            cache.run_pending_tasks();
5103
5104            expected_call_count += 3;
5105            if expected_call_count > MAX_CAPACITY {
5106                expected_call_count = MAX_CAPACITY;
5107            }
5108
5109            let actual_count = listener_call_count.load(Ordering::Acquire) as u64;
5110            assert_eq!(actual_count, expected_call_count);
5111            let expected_entry_count = MAX_CAPACITY * 2 - expected_call_count;
5112            assert_eq!(cache.entry_count(), expected_entry_count);
5113
5114            if expected_call_count >= MAX_CAPACITY {
5115                break;
5116            }
5117        }
5118
5119        assert_eq!(cache.entry_count(), MAX_CAPACITY);
5120    }
5121
5122    // NOTE: To enable the panic logging, run the following command:
5123    //
5124    // RUST_LOG=moka=info cargo test --features 'logging' -- \
5125    //   sync::cache::tests::recover_from_panicking_eviction_listener --exact --nocapture
5126    //
5127    #[test]
5128    fn recover_from_panicking_eviction_listener() {
5129        #[cfg(feature = "logging")]
5130        let _ = env_logger::builder().is_test(true).try_init();
5131
5132        // The following `Vec`s will hold actual and expected notifications.
5133        let actual = Arc::new(Mutex::new(Vec::new()));
5134        let mut expected = Vec::new();
5135
5136        // Create an eviction listener that panics when it see
5137        // a value "panic now!".
5138        let a1 = Arc::clone(&actual);
5139        let listener = move |k, v, cause| {
5140            if v == "panic now!" {
5141                panic!("Panic now!");
5142            }
5143            a1.lock().push((k, v, cause))
5144        };
5145
5146        // Create a cache with the eviction listener.
5147        let mut cache = Cache::builder()
5148            .name("My Sync Cache")
5149            .eviction_listener(listener)
5150            .build();
5151        cache.reconfigure_for_testing();
5152
5153        // Make the cache exterior immutable.
5154        let cache = cache;
5155
5156        // Insert an okay value.
5157        cache.insert("alice", "a0");
5158        cache.run_pending_tasks();
5159
5160        // Insert a value that will cause the eviction listener to panic.
5161        cache.insert("alice", "panic now!");
5162        expected.push((Arc::new("alice"), "a0", RemovalCause::Replaced));
5163        cache.run_pending_tasks();
5164
5165        // Insert an okay value. This will replace the previous
5166        // value "panic now!" so the eviction listener will panic.
5167        cache.insert("alice", "a2");
5168        cache.run_pending_tasks();
5169        // No more removal notification should be sent.
5170
5171        // Invalidate the okay value.
5172        cache.invalidate(&"alice");
5173        cache.run_pending_tasks();
5174
5175        verify_notification_vec(&cache, actual, &expected);
5176    }
5177
5178    // This test ensures that the `contains_key`, `get` and `invalidate` can use
5179    // borrowed form `&[u8]` for key with type `Vec<u8>`.
5180    // https://github.com/moka-rs/moka/issues/166
5181    #[test]
5182    fn borrowed_forms_of_key() {
5183        let cache: Cache<Vec<u8>, ()> = Cache::new(1);
5184
5185        let key = vec![1_u8];
5186        cache.insert(key.clone(), ());
5187
5188        // key as &Vec<u8>
5189        let key_v: &Vec<u8> = &key;
5190        assert!(cache.contains_key(key_v));
5191        assert_eq!(cache.get(key_v), Some(()));
5192        cache.invalidate(key_v);
5193
5194        cache.insert(key, ());
5195
5196        // key as &[u8]
5197        let key_s: &[u8] = &[1_u8];
5198        assert!(cache.contains_key(key_s));
5199        assert_eq!(cache.get(key_s), Some(()));
5200        cache.invalidate(key_s);
5201    }
5202
5203    // Ignored by default. This test becomes unstable when run in parallel with
5204    // other tests.
5205    #[test]
5206    #[cfg_attr(not(run_flaky_tests), ignore)]
5207    fn drop_value_immediately_after_eviction() {
5208        use crate::common::test_utils::{Counters, Value};
5209
5210        const MAX_CAPACITY: u32 = 500;
5211        const KEYS: u32 = ((MAX_CAPACITY as f64) * 1.2) as u32;
5212
5213        let counters = Arc::new(Counters::default());
5214        let counters1 = Arc::clone(&counters);
5215
5216        let listener = move |_k, _v, cause| match cause {
5217            RemovalCause::Size => counters1.incl_evicted(),
5218            RemovalCause::Explicit => counters1.incl_invalidated(),
5219            _ => (),
5220        };
5221
5222        let mut cache = Cache::builder()
5223            .max_capacity(MAX_CAPACITY as u64)
5224            .eviction_listener(listener)
5225            .build();
5226        cache.reconfigure_for_testing();
5227
5228        // Make the cache exterior immutable.
5229        let cache = cache;
5230
5231        for key in 0..KEYS {
5232            let value = Arc::new(Value::new(vec![0u8; 1024], &counters));
5233            cache.insert(key, value);
5234            counters.incl_inserted();
5235            cache.run_pending_tasks();
5236        }
5237
5238        let eviction_count = KEYS - MAX_CAPACITY;
5239
5240        cache.run_pending_tasks();
5241        assert_eq!(counters.inserted(), KEYS, "inserted");
5242        assert_eq!(counters.value_created(), KEYS, "value_created");
5243        assert_eq!(counters.evicted(), eviction_count, "evicted");
5244        assert_eq!(counters.invalidated(), 0, "invalidated");
5245        assert_eq!(counters.value_dropped(), eviction_count, "value_dropped");
5246
5247        for key in 0..KEYS {
5248            cache.invalidate(&key);
5249            cache.run_pending_tasks();
5250        }
5251
5252        cache.run_pending_tasks();
5253        assert_eq!(counters.inserted(), KEYS, "inserted");
5254        assert_eq!(counters.value_created(), KEYS, "value_created");
5255        assert_eq!(counters.evicted(), eviction_count, "evicted");
5256        assert_eq!(counters.invalidated(), MAX_CAPACITY, "invalidated");
5257        assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
5258
5259        std::mem::drop(cache);
5260        assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
5261    }
5262
5263    // For testing the issue reported by: https://github.com/moka-rs/moka/issues/383
5264    //
5265    // Ignored by default. This test becomes unstable when run in parallel with
5266    // other tests.
5267    #[test]
5268    #[cfg_attr(not(run_flaky_tests), ignore)]
5269    fn ensure_gc_runs_when_dropping_cache() {
5270        let cache = Cache::builder().build();
5271        let val = Arc::new(0);
5272        {
5273            let val = Arc::clone(&val);
5274            cache.get_with(1, move || val);
5275        }
5276        drop(cache);
5277        assert_eq!(Arc::strong_count(&val), 1);
5278    }
5279
5280    #[test]
5281    fn test_debug_format() {
5282        let cache = Cache::new(10);
5283        cache.insert('a', "alice");
5284        cache.insert('b', "bob");
5285        cache.insert('c', "cindy");
5286
5287        let debug_str = format!("{cache:?}");
5288        assert!(debug_str.starts_with('{'));
5289        assert!(debug_str.contains(r#"'a': "alice""#));
5290        assert!(debug_str.contains(r#"'b': "bob""#));
5291        assert!(debug_str.contains(r#"'c': "cindy""#));
5292        assert!(debug_str.ends_with('}'));
5293    }
5294
5295    type NotificationTuple<K, V> = (Arc<K>, V, RemovalCause);
5296
5297    fn verify_notification_vec<K, V, S>(
5298        cache: &Cache<K, V, S>,
5299        actual: Arc<Mutex<Vec<NotificationTuple<K, V>>>>,
5300        expected: &[NotificationTuple<K, V>],
5301    ) where
5302        K: std::hash::Hash + Eq + std::fmt::Debug + Send + Sync + 'static,
5303        V: Eq + std::fmt::Debug + Clone + Send + Sync + 'static,
5304        S: std::hash::BuildHasher + Clone + Send + Sync + 'static,
5305    {
5306        // Retries will be needed when testing in a QEMU VM.
5307        const MAX_RETRIES: usize = 5;
5308        let mut retries = 0;
5309        loop {
5310            // Ensure all scheduled notifications have been processed.
5311            cache.run_pending_tasks();
5312            std::thread::sleep(Duration::from_millis(500));
5313
5314            let actual = &*actual.lock();
5315            if actual.len() != expected.len() {
5316                if retries <= MAX_RETRIES {
5317                    retries += 1;
5318                    continue;
5319                } else {
5320                    assert_eq!(actual.len(), expected.len(), "Retries exhausted");
5321                }
5322            }
5323
5324            for (i, (actual, expected)) in actual.iter().zip(expected).enumerate() {
5325                assert_eq!(actual, expected, "expected[{i}]");
5326            }
5327
5328            break;
5329        }
5330    }
5331
5332    /// Reproduces a bug where `optionally_get_with` on an expired but not yet evicted
5333    /// entry causes the new value's expiration to be cleared, making it never expire.
5334    ///
5335    /// 1. `optionally_get_with` misses at t=0 and inserts value 1 with 2s expiry.
5336    /// 2. At t=1.98s, `optionally_get_with` still hits value 1.
5337    /// 3. At t=2.01s (expired, not yet evicted), `optionally_get_with` should miss and create value 2.
5338    /// 4. At t=12.01s, value 2 should also be expired; `optionally_get_with` should create a new value.
5339    #[test]
5340    fn test_optionally_get_with_expired_entry_bug() {
5341        use std::sync::atomic::{AtomicU32, Ordering};
5342
5343        struct CreateOnlyExpiry;
5344
5345        impl Expiry<&str, u32> for CreateOnlyExpiry {
5346            fn expire_after_create(
5347                &self,
5348                _key: &&str,
5349                _value: &u32,
5350                _current_time: StdInstant,
5351            ) -> Option<Duration> {
5352                Some(Duration::from_secs(2))
5353            }
5354            // expire_after_update use defaults
5355        }
5356
5357        let (clock, mock) = Clock::mock();
5358
5359        let mut cache = Cache::builder()
5360            .max_capacity(100)
5361            .expire_after(CreateOnlyExpiry)
5362            .clock(clock)
5363            .build();
5364        cache.reconfigure_for_testing();
5365        let cache = cache;
5366
5367        // monotonically increasing counter for test values
5368        let counter = AtomicU32::new(0);
5369        let next_value = || {
5370            let v = counter.fetch_add(1, Ordering::SeqCst) + 1;
5371            Some(v)
5372        };
5373
5374        // 1 - time 0
5375        let value = cache.optionally_get_with("key", next_value);
5376        assert_eq!(value, Some(1), "First access should return 1");
5377        cache.run_pending_tasks();
5378
5379        // 2 - time 1.98s
5380        mock.increment(Duration::from_millis(1980));
5381        let value = cache.optionally_get_with("key", next_value);
5382        assert_eq!(value, Some(1), "Access at 1.98s should hit and return 1");
5383
5384        // 3 - time 2.01s Entry is expired but NOT evicted (no housekeeping since expiration)
5385        mock.increment(Duration::from_millis(30)); // 2.01s
5386        let value = cache.optionally_get_with("key", next_value);
5387        assert_eq!(value, Some(2), "Access at 2.01s should miss and return 2");
5388        cache.run_pending_tasks();
5389
5390        // Step 4: t=12.01s - 10 seconds later, entry should have expired
5391        mock.increment(Duration::from_secs(10)); // Now at 12.01s
5392        cache.run_pending_tasks();
5393
5394        let value = cache.optionally_get_with("key", next_value);
5395        assert_ne!(value, Some(2), "Access at 12.01s should not still be 2");
5396    }
5397}