moka/sync/
cache.rs

1use super::{
2    value_initializer::{InitResult, ValueInitializer},
3    CacheBuilder, OwnedKeyEntrySelector, RefKeyEntrySelector,
4};
5use crate::{
6    common::{
7        concurrent::{
8            constants::WRITE_RETRY_INTERVAL_MICROS, housekeeper::InnerSync, Weigher, WriteOp,
9        },
10        time::{Clock, Instant},
11        HousekeeperConfig,
12    },
13    notification::EvictionListener,
14    ops::compute::{self, CompResult},
15    policy::{EvictionPolicy, ExpirationPolicy},
16    sync::{Iter, PredicateId},
17    sync_base::{
18        base_cache::{BaseCache, HouseKeeperArc},
19        iter::ScanningGet,
20    },
21    Entry, Policy, PredicateError,
22};
23
24use crossbeam_channel::{Sender, TrySendError};
25use std::{
26    borrow::Borrow,
27    collections::hash_map::RandomState,
28    fmt,
29    hash::{BuildHasher, Hash},
30    sync::Arc,
31    time::Duration,
32};
33
34/// A thread-safe concurrent synchronous in-memory cache.
35///
36/// `Cache` supports full concurrency of retrievals and a high expected concurrency
37/// for updates.
38///
39/// `Cache` utilizes a lock-free concurrent hash table as the central key-value
40/// storage. `Cache` performs a best-effort bounding of the map using an entry
41/// replacement algorithm to determine which entries to evict when the capacity is
42/// exceeded.
43///
44/// # Table of Contents
45///
46/// - [Example: `insert`, `get` and `invalidate`](#example-insert-get-and-invalidate)
47/// - [Avoiding to clone the value at `get`](#avoiding-to-clone-the-value-at-get)
48/// - [Sharing a cache across threads](#sharing-a-cache-across-threads)
49///     - [No lock is needed](#no-lock-is-needed)
50/// - [Hashing Algorithm](#hashing-algorithm)
51/// - [Example: Size-based Eviction](#example-size-based-eviction)
52/// - [Example: Time-based Expirations](#example-time-based-expirations)
53///     - [Cache-level TTL and TTI policies](#cache-level-ttl-and-tti-policies)
54///     - [Per-entry expiration policy](#per-entry-expiration-policy)
55/// - [Example: Eviction Listener](#example-eviction-listener)
56///     - [You should avoid eviction listener to
57///       panic](#you-should-avoid-eviction-listener-to-panic)
58///
59/// # Example: `insert`, `get` and `invalidate`
60///
61/// Cache entries are manually added using [`insert`](#method.insert) or
62/// [`get_with`](#method.get_with) methods, and are stored in the cache until either
63/// evicted or manually invalidated.
64///
65/// Here's an example of reading and updating a cache by using multiple threads:
66///
67/// ```rust
68/// use moka::sync::Cache;
69///
70/// use std::thread;
71///
72/// fn value(n: usize) -> String {
73///     format!("value {n}")
74/// }
75///
76/// const NUM_THREADS: usize = 16;
77/// const NUM_KEYS_PER_THREAD: usize = 64;
78///
79/// // Create a cache that can store up to 10,000 entries.
80/// let cache = Cache::new(10_000);
81///
82/// // Spawn threads and read and update the cache simultaneously.
83/// let threads: Vec<_> = (0..NUM_THREADS)
84///     .map(|i| {
85///         // To share the same cache across the threads, clone it.
86///         // This is a cheap operation.
87///         let my_cache = cache.clone();
88///         let start = i * NUM_KEYS_PER_THREAD;
89///         let end = (i + 1) * NUM_KEYS_PER_THREAD;
90///
91///         thread::spawn(move || {
92///             // Insert 64 entries. (NUM_KEYS_PER_THREAD = 64)
93///             for key in start..end {
94///                 my_cache.insert(key, value(key));
95///                 // get() returns Option<String>, a clone of the stored value.
96///                 assert_eq!(my_cache.get(&key), Some(value(key)));
97///             }
98///
99///             // Invalidate every 4 element of the inserted entries.
100///             for key in (start..end).step_by(4) {
101///                 my_cache.invalidate(&key);
102///             }
103///         })
104///     })
105///     .collect();
106///
107/// // Wait for all threads to complete.
108/// threads.into_iter().for_each(|t| t.join().expect("Failed"));
109///
110/// // Verify the result.
111/// for key in 0..(NUM_THREADS * NUM_KEYS_PER_THREAD) {
112///     if key % 4 == 0 {
113///         assert_eq!(cache.get(&key), None);
114///     } else {
115///         assert_eq!(cache.get(&key), Some(value(key)));
116///     }
117/// }
118/// ```
119///
120/// If you want to atomically initialize and insert a value when the key is not
121/// present, you might want to check other insertion methods
122/// [`get_with`](#method.get_with) and [`try_get_with`](#method.try_get_with).
123///
124/// # Avoiding to clone the value at `get`
125///
126/// The return type of `get` method is `Option<V>` instead of `Option<&V>`. Every
127/// time `get` is called for an existing key, it creates a clone of the stored value
128/// `V` and returns it. This is because the `Cache` allows concurrent updates from
129/// threads so a value stored in the cache can be dropped or replaced at any time by
130/// any other thread. `get` cannot return a reference `&V` as it is impossible to
131/// guarantee the value outlives the reference.
132///
133/// If you want to store values that will be expensive to clone, wrap them by
134/// `std::sync::Arc` before storing in a cache. [`Arc`][rustdoc-std-arc] is a
135/// thread-safe reference-counted pointer and its `clone()` method is cheap.
136///
137/// [rustdoc-std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html
138///
139/// # Sharing a cache across threads
140///
141/// To share a cache across threads, do one of the followings:
142///
143/// - Create a clone of the cache by calling its `clone` method and pass it to other
144///   thread.
145/// - Wrap the cache by a `sync::OnceCell` or `sync::Lazy` from
146///   [once_cell][once-cell-crate] create, and set it to a `static` variable.
147///
148/// Cloning is a cheap operation for `Cache` as it only creates thread-safe
149/// reference-counted pointers to the internal data structures.
150///
151/// ## No lock is needed
152///
153/// Don't wrap a `Cache` by a lock such as `Mutex` or `RwLock`. All methods provided
154/// by the `Cache` are considered thread-safe, and can be safely called by multiple
155/// threads at the same time. No lock is needed.
156///
157/// [once-cell-crate]: https://crates.io/crates/once_cell
158///
159/// # Hashing Algorithm
160///
161/// By default, `Cache` uses a hashing algorithm selected to provide resistance
162/// against HashDoS attacks. It will be the same one used by
163/// `std::collections::HashMap`, which is currently SipHash 1-3.
164///
165/// While SipHash's performance is very competitive for medium sized keys, other
166/// hashing algorithms will outperform it for small keys such as integers as well as
167/// large keys such as long strings. However those algorithms will typically not
168/// protect against attacks such as HashDoS.
169///
170/// The hashing algorithm can be replaced on a per-`Cache` basis using the
171/// [`build_with_hasher`][build-with-hasher-method] method of the `CacheBuilder`.
172/// Many alternative algorithms are available on crates.io, such as the
173/// [AHash][ahash-crate] crate.
174///
175/// [build-with-hasher-method]: ./struct.CacheBuilder.html#method.build_with_hasher
176/// [ahash-crate]: https://crates.io/crates/ahash
177///
178/// # Example: Size-based Eviction
179///
180/// ```rust
181/// use moka::sync::Cache;
182///
183/// // Evict based on the number of entries in the cache.
184/// let cache = Cache::builder()
185///     // Up to 10,000 entries.
186///     .max_capacity(10_000)
187///     // Create the cache.
188///     .build();
189/// cache.insert(1, "one".to_string());
190///
191/// // Evict based on the byte length of strings in the cache.
192/// let cache = Cache::builder()
193///     // A weigher closure takes &K and &V and returns a u32
194///     // representing the relative size of the entry.
195///     .weigher(|_key, value: &String| -> u32 {
196///         value.len().try_into().unwrap_or(u32::MAX)
197///     })
198///     // This cache will hold up to 32MiB of values.
199///     .max_capacity(32 * 1024 * 1024)
200///     .build();
201/// cache.insert(2, "two".to_string());
202/// ```
203///
204/// If your cache should not grow beyond a certain size, use the `max_capacity`
205/// method of the [`CacheBuilder`][builder-struct] to set the upper bound. The cache
206/// will try to evict entries that have not been used recently or very often.
207///
208/// At the cache creation time, a weigher closure can be set by the `weigher` method
209/// of the `CacheBuilder`. A weigher closure takes `&K` and `&V` as the arguments and
210/// returns a `u32` representing the relative size of the entry:
211///
212/// - If the `weigher` is _not_ set, the cache will treat each entry has the same
213///   size of `1`. This means the cache will be bounded by the number of entries.
214/// - If the `weigher` is set, the cache will call the weigher to calculate the
215///   weighted size (relative size) on an entry. This means the cache will be bounded
216///   by the total weighted size of entries.
217///
218/// Note that weighted sizes are not used when making eviction selections.
219///
220/// [builder-struct]: ./struct.CacheBuilder.html
221///
222/// # Example: Time-based Expirations
223///
224/// ## Cache-level TTL and TTI policies
225///
226/// `Cache` supports the following cache-level expiration policies:
227///
228/// - **Time to live (TTL)**: A cached entry will be expired after the specified
229///   duration past from `insert`.
230/// - **Time to idle (TTI)**: A cached entry will be expired after the specified
231///   duration past from `get` or `insert`.
232///
233/// They are a cache-level expiration policies; all entries in the cache will have
234/// the same TTL and/or TTI durations. If you want to set different expiration
235/// durations for different entries, see the next section.
236///
237/// ```rust
238/// use moka::sync::Cache;
239/// use std::time::Duration;
240///
241/// let cache = Cache::builder()
242///     // Time to live (TTL): 30 minutes
243///     .time_to_live(Duration::from_secs(30 * 60))
244///     // Time to idle (TTI):  5 minutes
245///     .time_to_idle(Duration::from_secs( 5 * 60))
246///     // Create the cache.
247///     .build();
248///
249/// // This entry will expire after 5 minutes (TTI) if there is no get().
250/// cache.insert(0, "zero");
251///
252/// // This get() will extend the entry life for another 5 minutes.
253/// cache.get(&0);
254///
255/// // Even though we keep calling get(), the entry will expire
256/// // after 30 minutes (TTL) from the insert().
257/// ```
258///
259/// ## Per-entry expiration policy
260///
261/// `Cache` supports per-entry expiration policy through the `Expiry` trait.
262///
263/// `Expiry` trait provides three callback methods:
264/// [`expire_after_create`][exp-create], [`expire_after_read`][exp-read] and
265/// [`expire_after_update`][exp-update]. When a cache entry is inserted, read or
266/// updated, one of these methods is called. These methods return an
267/// `Option<Duration>`, which is used as the expiration duration of the entry.
268///
269/// `Expiry` trait provides the default implementations of these methods, so you will
270/// implement only the methods you want to customize.
271///
272/// [exp-create]: ../trait.Expiry.html#method.expire_after_create
273/// [exp-read]: ../trait.Expiry.html#method.expire_after_read
274/// [exp-update]: ../trait.Expiry.html#method.expire_after_update
275///
276/// ```rust
277/// use moka::{sync::Cache, Expiry};
278/// use std::time::{Duration, Instant};
279///
280/// // In this example, we will create a `sync::Cache` with `u32` as the key, and
281/// // `(Expiration, String)` as the value. `Expiration` is an enum to represent the
282/// // expiration of the value, and `String` is the application data of the value.
283///
284/// /// An enum to represent the expiration of a value.
285/// #[derive(Clone, Copy, Debug, Eq, PartialEq)]
286/// pub enum Expiration {
287///     /// The value never expires.
288///     Never,
289///     /// The value expires after a short time. (5 seconds in this example)
290///     AfterShortTime,
291///     /// The value expires after a long time. (15 seconds in this example)
292///     AfterLongTime,
293/// }
294///
295/// impl Expiration {
296///     /// Returns the duration of this expiration.
297///     pub fn as_duration(&self) -> Option<Duration> {
298///         match self {
299///             Expiration::Never => None,
300///             Expiration::AfterShortTime => Some(Duration::from_secs(5)),
301///             Expiration::AfterLongTime => Some(Duration::from_secs(15)),
302///         }
303///     }
304/// }
305///
306/// /// An expiry that implements `moka::Expiry` trait. `Expiry` trait provides the
307/// /// default implementations of three callback methods `expire_after_create`,
308/// /// `expire_after_read`, and `expire_after_update`.
309/// ///
310/// /// In this example, we only override the `expire_after_create` method.
311/// pub struct MyExpiry;
312///
313/// impl Expiry<u32, (Expiration, String)> for MyExpiry {
314///     /// Returns the duration of the expiration of the value that was just
315///     /// created.
316///     fn expire_after_create(
317///         &self,
318///         _key: &u32,
319///         value: &(Expiration, String),
320///         _current_time: Instant,
321///     ) -> Option<Duration> {
322///         let duration = value.0.as_duration();
323///         println!("MyExpiry: expire_after_create called with key {_key} and value {value:?}. Returning {duration:?}.");
324///         duration
325///     }
326/// }
327///
328/// // Create a `Cache<u32, (Expiration, String)>` with an expiry `MyExpiry` and
329/// // eviction listener.
330/// let expiry = MyExpiry;
331///
332/// let eviction_listener = |key, _value, cause| {
333///     println!("Evicted key {key}. Cause: {cause:?}");
334/// };
335///
336/// let cache = Cache::builder()
337///     .max_capacity(100)
338///     .expire_after(expiry)
339///     .eviction_listener(eviction_listener)
340///     .build();
341///
342/// // Insert some entries into the cache with different expirations.
343/// cache.get_with(0, || (Expiration::AfterShortTime, "a".to_string()));
344/// cache.get_with(1, || (Expiration::AfterLongTime, "b".to_string()));
345/// cache.get_with(2, || (Expiration::Never, "c".to_string()));
346///
347/// // Verify that all the inserted entries exist.
348/// assert!(cache.contains_key(&0));
349/// assert!(cache.contains_key(&1));
350/// assert!(cache.contains_key(&2));
351///
352/// // Sleep for 6 seconds. Key 0 should expire.
353/// println!("\nSleeping for 6 seconds...\n");
354/// std::thread::sleep(Duration::from_secs(6));
355/// println!("Entry count: {}", cache.entry_count());
356///
357/// // Verify that key 0 has been evicted.
358/// assert!(!cache.contains_key(&0));
359/// assert!(cache.contains_key(&1));
360/// assert!(cache.contains_key(&2));
361///
362/// // Sleep for 10 more seconds. Key 1 should expire.
363/// println!("\nSleeping for 10 seconds...\n");
364/// std::thread::sleep(Duration::from_secs(10));
365/// println!("Entry count: {}", cache.entry_count());
366///
367/// // Verify that key 1 has been evicted.
368/// assert!(!cache.contains_key(&1));
369/// assert!(cache.contains_key(&2));
370///
371/// // Manually invalidate key 2.
372/// cache.invalidate(&2);
373/// assert!(!cache.contains_key(&2));
374///
375/// println!("\nSleeping for a second...\n");
376/// std::thread::sleep(Duration::from_secs(1));
377/// println!("Entry count: {}", cache.entry_count());
378///
379/// println!("\nDone!");
380/// ```
381///
382/// # Example: Eviction Listener
383///
384/// A `Cache` can be configured with an eviction listener, a closure that is called
385/// every time there is a cache eviction. The listener takes three parameters: the
386/// key and value of the evicted entry, and the
387/// [`RemovalCause`](../notification/enum.RemovalCause.html) to indicate why the
388/// entry was evicted.
389///
390/// An eviction listener can be used to keep other data structures in sync with the
391/// cache, for example.
392///
393/// The following example demonstrates how to use an eviction listener with
394/// time-to-live expiration to manage the lifecycle of temporary files on a
395/// filesystem. The cache stores the paths of the files, and when one of them has
396/// expired, the eviction listener will be called with the path, so it can remove the
397/// file from the filesystem.
398///
399/// ```rust
400/// // Cargo.toml
401/// //
402/// // [dependencies]
403/// // anyhow = "1.0"
404/// // uuid = { version = "1.1", features = ["v4"] }
405///
406/// use moka::{sync::Cache, notification};
407///
408/// use anyhow::{anyhow, Context};
409/// use std::{
410///     fs, io,
411///     path::{Path, PathBuf},
412///     sync::{Arc, RwLock},
413///     time::Duration,
414/// };
415/// use uuid::Uuid;
416///
417/// /// The DataFileManager writes, reads and removes data files.
418/// struct DataFileManager {
419///     base_dir: PathBuf,
420///     file_count: usize,
421/// }
422///
423/// impl DataFileManager {
424///     fn new(base_dir: PathBuf) -> Self {
425///         Self {
426///             base_dir,
427///             file_count: 0,
428///         }
429///     }
430///
431///     fn write_data_file(
432///         &mut self,
433///         key: impl AsRef<str>,
434///         contents: String
435///     ) -> io::Result<PathBuf> {
436///         // Use the key as a part of the filename.
437///         let mut path = self.base_dir.to_path_buf();
438///         path.push(key.as_ref());
439///
440///         assert!(!path.exists(), "Path already exists: {path:?}");
441///
442///         // create the file at the path and write the contents to the file.
443///         fs::write(&path, contents)?;
444///         self.file_count += 1;
445///         println!("Created a data file at {path:?} (file count: {})", self.file_count);
446///         Ok(path)
447///     }
448///
449///     fn read_data_file(&self, path: impl AsRef<Path>) -> io::Result<String> {
450///         // Reads the contents of the file at the path, and return the contents.
451///         fs::read_to_string(path)
452///     }
453///
454///     fn remove_data_file(&mut self, path: impl AsRef<Path>) -> io::Result<()> {
455///         // Remove the file at the path.
456///         fs::remove_file(path.as_ref())?;
457///         self.file_count -= 1;
458///         println!(
459///             "Removed a data file at {:?} (file count: {})",
460///             path.as_ref(),
461///             self.file_count
462///         );
463///
464///         Ok(())
465///     }
466/// }
467///
468/// fn main() -> anyhow::Result<()> {
469///     // Create an instance of the DataFileManager and wrap it with
470///     // Arc<RwLock<_>> so it can be shared across threads.
471///     let mut base_dir = std::env::temp_dir();
472///     base_dir.push(Uuid::new_v4().as_hyphenated().to_string());
473///     println!("base_dir: {base_dir:?}");
474///     std::fs::create_dir(&base_dir)?;
475///
476///     let file_mgr = DataFileManager::new(base_dir);
477///     let file_mgr = Arc::new(RwLock::new(file_mgr));
478///
479///     let file_mgr1 = Arc::clone(&file_mgr);
480///
481///     // Create an eviction listener closure.
482///     let eviction_listener = move |k, v: PathBuf, cause| {
483///         // Try to remove the data file at the path `v`.
484///         println!("\n== An entry has been evicted. k: {k:?}, v: {v:?}, cause: {cause:?}");
485///
486///         // Acquire the write lock of the DataFileManager. We must handle
487///         // error cases here to prevent the listener from panicking.
488///         match file_mgr1.write() {
489///             Err(_e) => {
490///                 eprintln!("The lock has been poisoned");
491///             }
492///             Ok(mut mgr) => {
493///                 // Remove the data file using the DataFileManager.
494///                 if let Err(_e) = mgr.remove_data_file(v.as_path()) {
495///                     eprintln!("Failed to remove a data file at {v:?}");
496///                 }
497///             }
498///         }
499///     };
500///
501///     // Create the cache. Set time to live for two seconds and set the
502///     // eviction listener.
503///     let cache = Cache::builder()
504///         .max_capacity(100)
505///         .time_to_live(Duration::from_secs(2))
506///         .eviction_listener(eviction_listener)
507///         .build();
508///
509///     // Insert an entry to the cache.
510///     // This will create and write a data file for the key "user1", store the
511///     // path of the file to the cache, and return it.
512///     println!("== try_get_with()");
513///     let key = "user1";
514///     let path = cache
515///         .try_get_with(key, || -> anyhow::Result<_> {
516///             let mut mgr = file_mgr
517///                 .write()
518///                 .map_err(|_e| anyhow::anyhow!("The lock has been poisoned"))?;
519///             let path = mgr
520///                 .write_data_file(key, "user data".into())
521///                 .with_context(|| format!("Failed to create a data file"))?;
522///             Ok(path)
523///         })
524///         .map_err(|e| anyhow!("{e}"))?;
525///
526///     // Read the data file at the path and print the contents.
527///     println!("\n== read_data_file()");
528///     {
529///         let mgr = file_mgr
530///             .read()
531///             .map_err(|_e| anyhow::anyhow!("The lock has been poisoned"))?;
532///         let contents = mgr
533///             .read_data_file(path.as_path())
534///             .with_context(|| format!("Failed to read data from {path:?}"))?;
535///         println!("contents: {contents}");
536///     }
537///
538///     // Sleep for five seconds. While sleeping, the cache entry for key "user1"
539///     // will be expired and evicted, so the eviction listener will be called to
540///     // remove the file.
541///     std::thread::sleep(Duration::from_secs(5));
542///
543///     cache.run_pending_tasks();
544///
545///     Ok(())
546/// }
547/// ```
548///
549/// ## You should avoid eviction listener to panic
550///
551/// It is very important to make an eviction listener closure not to panic.
552/// Otherwise, the cache will stop calling the listener after a panic. This is an
553/// intended behavior because the cache cannot know whether it is memory safe or not
554/// to call the panicked listener again.
555///
556/// When a listener panics, the cache will swallow the panic and disable the
557/// listener. If you want to know when a listener panics and the reason of the panic,
558/// you can enable an optional `logging` feature of Moka and check error-level logs.
559///
560/// To enable the `logging`, do the followings:
561///
562/// 1. In `Cargo.toml`, add the crate feature `logging` for `moka`.
563/// 2. Set the logging level for `moka` to `error` or any lower levels (`warn`,
564///    `info`, ...):
565///     - If you are using the `env_logger` crate, you can achieve this by setting
566///       `RUST_LOG` environment variable to `moka=error`.
567/// 3. If you have more than one caches, you may want to set a distinct name for each
568///    cache by using cache builder's [`name`][builder-name-method] method. The name
569///    will appear in the log.
570///
571/// [builder-name-method]: ./struct.CacheBuilder.html#method.name
572///
573pub struct Cache<K, V, S = RandomState> {
574    pub(crate) base: BaseCache<K, V, S>,
575    value_initializer: Arc<ValueInitializer<K, V, S>>,
576}
577
578// TODO: https://github.com/moka-rs/moka/issues/54
579#[allow(clippy::non_send_fields_in_send_ty)]
580unsafe impl<K, V, S> Send for Cache<K, V, S>
581where
582    K: Send + Sync,
583    V: Send + Sync,
584    S: Send,
585{
586}
587
588unsafe impl<K, V, S> Sync for Cache<K, V, S>
589where
590    K: Send + Sync,
591    V: Send + Sync,
592    S: Sync,
593{
594}
595
596// NOTE: We cannot do `#[derive(Clone)]` because it will add `Clone` bound to `K`.
597impl<K, V, S> Clone for Cache<K, V, S> {
598    /// Makes a clone of this shared cache.
599    ///
600    /// This operation is cheap as it only creates thread-safe reference counted
601    /// pointers to the shared internal data structures.
602    fn clone(&self) -> Self {
603        Self {
604            base: self.base.clone(),
605            value_initializer: Arc::clone(&self.value_initializer),
606        }
607    }
608}
609
610impl<K, V, S> fmt::Debug for Cache<K, V, S>
611where
612    K: fmt::Debug + Eq + Hash + Send + Sync + 'static,
613    V: fmt::Debug + Clone + Send + Sync + 'static,
614    // TODO: Remove these bounds from S.
615    S: BuildHasher + Clone + Send + Sync + 'static,
616{
617    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
618        let mut d_map = f.debug_map();
619
620        for (k, v) in self {
621            d_map.entry(&k, &v);
622        }
623
624        d_map.finish()
625    }
626}
627
628impl<K, V, S> Cache<K, V, S> {
629    /// Returns cache’s name.
630    pub fn name(&self) -> Option<&str> {
631        self.base.name()
632    }
633
634    /// Returns a read-only cache policy of this cache.
635    ///
636    /// At this time, cache policy cannot be modified after cache creation.
637    /// A future version may support to modify it.
638    pub fn policy(&self) -> Policy {
639        self.base.policy()
640    }
641
642    /// Returns an approximate number of entries in this cache.
643    ///
644    /// The value returned is _an estimate_; the actual count may differ if there are
645    /// concurrent insertions or removals, or if some entries are pending removal due
646    /// to expiration. This inaccuracy can be mitigated by performing a `sync()`
647    /// first.
648    ///
649    /// # Example
650    ///
651    /// ```rust
652    /// use moka::sync::Cache;
653    ///
654    /// let cache = Cache::new(10);
655    /// cache.insert('n', "Netherland Dwarf");
656    /// cache.insert('l', "Lop Eared");
657    /// cache.insert('d', "Dutch");
658    ///
659    /// // Ensure an entry exists.
660    /// assert!(cache.contains_key(&'n'));
661    ///
662    /// // However, followings may print stale number zeros instead of threes.
663    /// println!("{}", cache.entry_count());   // -> 0
664    /// println!("{}", cache.weighted_size()); // -> 0
665    ///
666    /// // To mitigate the inaccuracy, Call `run_pending_tasks` method to run
667    /// // pending internal tasks.
668    /// cache.run_pending_tasks();
669    ///
670    /// // Followings will print the actual numbers.
671    /// println!("{}", cache.entry_count());   // -> 3
672    /// println!("{}", cache.weighted_size()); // -> 3
673    /// ```
674    ///
675    pub fn entry_count(&self) -> u64 {
676        self.base.entry_count()
677    }
678
679    /// Returns an approximate total weighted size of entries in this cache.
680    ///
681    /// The value returned is _an estimate_; the actual size may differ if there are
682    /// concurrent insertions or removals, or if some entries are pending removal due
683    /// to expiration. This inaccuracy can be mitigated by performing a `sync()`
684    /// first. See [`entry_count`](#method.entry_count) for a sample code.
685    pub fn weighted_size(&self) -> u64 {
686        self.base.weighted_size()
687    }
688}
689
690impl<K, V> Cache<K, V, RandomState>
691where
692    K: Hash + Eq + Send + Sync + 'static,
693    V: Clone + Send + Sync + 'static,
694{
695    /// Constructs a new `Cache<K, V>` that will store up to the `max_capacity`.
696    ///
697    /// To adjust various configuration knobs such as `initial_capacity` or
698    /// `time_to_live`, use the [`CacheBuilder`][builder-struct].
699    ///
700    /// [builder-struct]: ./struct.CacheBuilder.html
701    pub fn new(max_capacity: u64) -> Self {
702        let build_hasher = RandomState::default();
703        Self::with_everything(
704            None,
705            Some(max_capacity),
706            None,
707            build_hasher,
708            None,
709            EvictionPolicy::default(),
710            None,
711            ExpirationPolicy::default(),
712            HousekeeperConfig::default(),
713            false,
714            Clock::default(),
715        )
716    }
717
718    /// Returns a [`CacheBuilder`][builder-struct], which can builds a `Cache` or
719    /// `SegmentedCache` with various configuration knobs.
720    ///
721    /// [builder-struct]: ./struct.CacheBuilder.html
722    pub fn builder() -> CacheBuilder<K, V, Cache<K, V, RandomState>> {
723        CacheBuilder::default()
724    }
725}
726
727impl<K, V, S> Cache<K, V, S>
728where
729    K: Hash + Eq + Send + Sync + 'static,
730    V: Clone + Send + Sync + 'static,
731    S: BuildHasher + Clone + Send + Sync + 'static,
732{
733    // https://rust-lang.github.io/rust-clippy/master/index.html#too_many_arguments
734    #[allow(clippy::too_many_arguments)]
735    pub(crate) fn with_everything(
736        name: Option<String>,
737        max_capacity: Option<u64>,
738        initial_capacity: Option<usize>,
739        build_hasher: S,
740        weigher: Option<Weigher<K, V>>,
741        eviction_policy: EvictionPolicy,
742        eviction_listener: Option<EvictionListener<K, V>>,
743        expiration_policy: ExpirationPolicy<K, V>,
744        housekeeper_config: HousekeeperConfig,
745        invalidator_enabled: bool,
746        clock: Clock,
747    ) -> Self {
748        Self {
749            base: BaseCache::new(
750                name,
751                max_capacity,
752                initial_capacity,
753                build_hasher.clone(),
754                weigher,
755                eviction_policy,
756                eviction_listener,
757                expiration_policy,
758                housekeeper_config,
759                invalidator_enabled,
760                clock,
761            ),
762            value_initializer: Arc::new(ValueInitializer::with_hasher(build_hasher)),
763        }
764    }
765
766    /// Returns `true` if the cache contains a value for the key.
767    ///
768    /// Unlike the `get` method, this method is not considered a cache read operation,
769    /// so it does not update the historic popularity estimator or reset the idle
770    /// timer for the key.
771    ///
772    /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
773    /// on the borrowed form _must_ match those for the key type.
774    pub fn contains_key<Q>(&self, key: &Q) -> bool
775    where
776        K: Borrow<Q>,
777        Q: Hash + Eq + ?Sized,
778    {
779        self.base.contains_key_with_hash(key, self.base.hash(key))
780    }
781
782    pub(crate) fn contains_key_with_hash<Q>(&self, key: &Q, hash: u64) -> bool
783    where
784        K: Borrow<Q>,
785        Q: Hash + Eq + ?Sized,
786    {
787        self.base.contains_key_with_hash(key, hash)
788    }
789
790    /// Returns a _clone_ of the value corresponding to the key.
791    ///
792    /// If you want to store values that will be expensive to clone, wrap them by
793    /// `std::sync::Arc` before storing in a cache. [`Arc`][rustdoc-std-arc] is a
794    /// thread-safe reference-counted pointer and its `clone()` method is cheap.
795    ///
796    /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
797    /// on the borrowed form _must_ match those for the key type.
798    ///
799    /// [rustdoc-std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html
800    pub fn get<Q>(&self, key: &Q) -> Option<V>
801    where
802        K: Borrow<Q>,
803        Q: Hash + Eq + ?Sized,
804    {
805        self.base
806            .get_with_hash(key, self.base.hash(key), false)
807            .map(Entry::into_value)
808    }
809
810    pub(crate) fn get_with_hash<Q>(&self, key: &Q, hash: u64, need_key: bool) -> Option<Entry<K, V>>
811    where
812        K: Borrow<Q>,
813        Q: Hash + Eq + ?Sized,
814    {
815        self.base.get_with_hash(key, hash, need_key)
816    }
817
818    /// Takes a key `K` and returns an [`OwnedKeyEntrySelector`] that can be used to
819    /// select or insert an entry.
820    ///
821    /// [`OwnedKeyEntrySelector`]: ./struct.OwnedKeyEntrySelector.html
822    ///
823    /// # Example
824    ///
825    /// ```rust
826    /// use moka::sync::Cache;
827    ///
828    /// let cache: Cache<String, u32> = Cache::new(100);
829    /// let key = "key1".to_string();
830    ///
831    /// let entry = cache.entry(key.clone()).or_insert(3);
832    /// assert!(entry.is_fresh());
833    /// assert_eq!(entry.key(), &key);
834    /// assert_eq!(entry.into_value(), 3);
835    ///
836    /// let entry = cache.entry(key).or_insert(6);
837    /// // Not fresh because the value was already in the cache.
838    /// assert!(!entry.is_fresh());
839    /// assert_eq!(entry.into_value(), 3);
840    /// ```
841    pub fn entry(&self, key: K) -> OwnedKeyEntrySelector<'_, K, V, S>
842    where
843        K: Hash + Eq,
844    {
845        let hash = self.base.hash(&key);
846        OwnedKeyEntrySelector::new(key, hash, self)
847    }
848
849    /// Takes a reference `&Q` of a key and returns an [`RefKeyEntrySelector`] that
850    /// can be used to select or insert an entry.
851    ///
852    /// [`RefKeyEntrySelector`]: ./struct.RefKeyEntrySelector.html
853    ///
854    /// # Example
855    ///
856    /// ```rust
857    /// use moka::sync::Cache;
858    ///
859    /// let cache: Cache<String, u32> = Cache::new(100);
860    /// let key = "key1".to_string();
861    ///
862    /// let entry = cache.entry_by_ref(&key).or_insert(3);
863    /// assert!(entry.is_fresh());
864    /// assert_eq!(entry.key(), &key);
865    /// assert_eq!(entry.into_value(), 3);
866    ///
867    /// let entry = cache.entry_by_ref(&key).or_insert(6);
868    /// // Not fresh because the value was already in the cache.
869    /// assert!(!entry.is_fresh());
870    /// assert_eq!(entry.into_value(), 3);
871    /// ```
872    pub fn entry_by_ref<'a, Q>(&'a self, key: &'a Q) -> RefKeyEntrySelector<'a, K, Q, V, S>
873    where
874        K: Borrow<Q>,
875        Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
876    {
877        let hash = self.base.hash(key);
878        RefKeyEntrySelector::new(key, hash, self)
879    }
880
881    /// Returns a _clone_ of the value corresponding to the key. If the value does
882    /// not exist, evaluates the `init` closure and inserts the output.
883    ///
884    /// # Concurrent calls on the same key
885    ///
886    /// This method guarantees that concurrent calls on the same not-existing key are
887    /// coalesced into one evaluation of the `init` closure. Only one of the calls
888    /// evaluates its closure, and other calls wait for that closure to complete.
889    ///
890    /// The following code snippet demonstrates this behavior:
891    ///
892    /// ```rust
893    /// use moka::sync::Cache;
894    /// use std::{sync::Arc, thread};
895    ///
896    /// const TEN_MIB: usize = 10 * 1024 * 1024; // 10MiB
897    /// let cache = Cache::new(100);
898    ///
899    /// // Spawn four threads.
900    /// let threads: Vec<_> = (0..4_u8)
901    ///     .map(|task_id| {
902    ///         let my_cache = cache.clone();
903    ///         thread::spawn(move || {
904    ///             println!("Thread {task_id} started.");
905    ///
906    ///             // Try to insert and get the value for key1. Although all four
907    ///             // threads will call `get_with` at the same time, the `init` closure
908    ///             // must be evaluated only once.
909    ///             let value = my_cache.get_with("key1", || {
910    ///                 println!("Thread {task_id} inserting a value.");
911    ///                 Arc::new(vec![0u8; TEN_MIB])
912    ///             });
913    ///
914    ///             // Ensure the value exists now.
915    ///             assert_eq!(value.len(), TEN_MIB);
916    ///             assert!(my_cache.get(&"key1").is_some());
917    ///
918    ///             println!("Thread {task_id} got the value. (len: {})", value.len());
919    ///         })
920    ///     })
921    ///     .collect();
922    ///
923    /// // Wait all threads to complete.
924    /// threads
925    ///     .into_iter()
926    ///     .for_each(|t| t.join().expect("Thread failed"));
927    /// ```
928    ///
929    /// **Result**
930    ///
931    /// - The `init` closure was called exactly once by thread 1.
932    /// - Other threads were blocked until thread 1 inserted the value.
933    ///
934    /// ```console
935    /// Thread 1 started.
936    /// Thread 0 started.
937    /// Thread 3 started.
938    /// Thread 2 started.
939    /// Thread 1 inserting a value.
940    /// Thread 2 got the value. (len: 10485760)
941    /// Thread 1 got the value. (len: 10485760)
942    /// Thread 0 got the value. (len: 10485760)
943    /// Thread 3 got the value. (len: 10485760)
944    /// ```
945    ///
946    /// # Panics
947    ///
948    /// This method panics when the `init` closure has panicked. When it happens,
949    /// only the caller whose `init` closure panicked will get the panic (e.g. only
950    /// thread 1 in the above sample). If there are other calls in progress (e.g.
951    /// thread 0, 2 and 3 above), this method will restart and resolve one of the
952    /// remaining `init` closure.
953    ///
954    pub fn get_with(&self, key: K, init: impl FnOnce() -> V) -> V {
955        let hash = self.base.hash(&key);
956        let key = Arc::new(key);
957        let replace_if = None as Option<fn(&V) -> bool>;
958        self.get_or_insert_with_hash_and_fun(key, hash, init, replace_if, false)
959            .into_value()
960    }
961
962    /// Similar to [`get_with`](#method.get_with), but instead of passing an owned
963    /// key, you can pass a reference to the key. If the key does not exist in the
964    /// cache, the key will be cloned to create new entry in the cache.
965    pub fn get_with_by_ref<Q>(&self, key: &Q, init: impl FnOnce() -> V) -> V
966    where
967        K: Borrow<Q>,
968        Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
969    {
970        let hash = self.base.hash(key);
971        let replace_if = None as Option<fn(&V) -> bool>;
972
973        self.get_or_insert_with_hash_by_ref_and_fun(key, hash, init, replace_if, false)
974            .into_value()
975    }
976
977    /// TODO: Remove this in v0.13.0.
978    /// Deprecated, replaced with
979    /// [`entry()::or_insert_with_if()`](./struct.OwnedKeyEntrySelector.html#method.or_insert_with_if)
980    #[deprecated(since = "0.10.0", note = "Replaced with `entry().or_insert_with_if()`")]
981    pub fn get_with_if(
982        &self,
983        key: K,
984        init: impl FnOnce() -> V,
985        replace_if: impl FnMut(&V) -> bool,
986    ) -> V {
987        let hash = self.base.hash(&key);
988        let key = Arc::new(key);
989        self.get_or_insert_with_hash_and_fun(key, hash, init, Some(replace_if), false)
990            .into_value()
991    }
992
993    pub(crate) fn get_or_insert_with_hash_and_fun(
994        &self,
995        key: Arc<K>,
996        hash: u64,
997        init: impl FnOnce() -> V,
998        mut replace_if: Option<impl FnMut(&V) -> bool>,
999        need_key: bool,
1000    ) -> Entry<K, V> {
1001        self.base
1002            .get_with_hash_and_ignore_if(&key, hash, replace_if.as_mut(), need_key)
1003            .unwrap_or_else(|| self.insert_with_hash_and_fun(key, hash, init, replace_if, need_key))
1004    }
1005
1006    // Need to create new function instead of using the existing
1007    // `get_or_insert_with_hash_and_fun`. The reason is `by_ref` function will
1008    // require key reference to have `ToOwned` trait. If we modify the existing
1009    // `get_or_insert_with_hash_and_fun` function, it will require all the existing
1010    // apis that depends on it to make the `K` to have `ToOwned` trait.
1011    pub(crate) fn get_or_insert_with_hash_by_ref_and_fun<Q>(
1012        &self,
1013        key: &Q,
1014        hash: u64,
1015        init: impl FnOnce() -> V,
1016        mut replace_if: Option<impl FnMut(&V) -> bool>,
1017        need_key: bool,
1018    ) -> Entry<K, V>
1019    where
1020        K: Borrow<Q>,
1021        Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
1022    {
1023        self.base
1024            .get_with_hash_and_ignore_if(key, hash, replace_if.as_mut(), need_key)
1025            .unwrap_or_else(|| {
1026                let key = Arc::new(key.to_owned());
1027                self.insert_with_hash_and_fun(key, hash, init, replace_if, need_key)
1028            })
1029    }
1030
1031    pub(crate) fn insert_with_hash_and_fun(
1032        &self,
1033        key: Arc<K>,
1034        hash: u64,
1035        init: impl FnOnce() -> V,
1036        mut replace_if: Option<impl FnMut(&V) -> bool>,
1037        need_key: bool,
1038    ) -> Entry<K, V> {
1039        let get = || {
1040            self.base
1041                .get_with_hash_without_recording(&key, hash, replace_if.as_mut())
1042        };
1043        let insert = |v| self.insert_with_hash(key.clone(), hash, v);
1044
1045        let k = if need_key {
1046            Some(Arc::clone(&key))
1047        } else {
1048            None
1049        };
1050
1051        let type_id = ValueInitializer::<K, V, S>::type_id_for_get_with();
1052        let post_init = ValueInitializer::<K, V, S>::post_init_for_get_with;
1053
1054        match self
1055            .value_initializer
1056            .try_init_or_read(&key, type_id, get, init, insert, post_init)
1057        {
1058            InitResult::Initialized(v) => {
1059                crossbeam_epoch::pin().flush();
1060                Entry::new(k, v, true, false)
1061            }
1062            InitResult::ReadExisting(v) => Entry::new(k, v, false, false),
1063            InitResult::InitErr(_) => unreachable!(),
1064        }
1065    }
1066
1067    pub(crate) fn get_or_insert_with_hash(
1068        &self,
1069        key: Arc<K>,
1070        hash: u64,
1071        init: impl FnOnce() -> V,
1072    ) -> Entry<K, V> {
1073        match self.base.get_with_hash(&key, hash, true) {
1074            Some(entry) => entry,
1075            None => {
1076                let value = init();
1077                self.insert_with_hash(Arc::clone(&key), hash, value.clone());
1078                Entry::new(Some(key), value, true, false)
1079            }
1080        }
1081    }
1082
1083    pub(crate) fn get_or_insert_with_hash_by_ref<Q>(
1084        &self,
1085        key: &Q,
1086        hash: u64,
1087        init: impl FnOnce() -> V,
1088    ) -> Entry<K, V>
1089    where
1090        K: Borrow<Q>,
1091        Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
1092    {
1093        match self.base.get_with_hash(key, hash, true) {
1094            Some(entry) => entry,
1095            None => {
1096                let key = Arc::new(key.to_owned());
1097                let value = init();
1098                self.insert_with_hash(Arc::clone(&key), hash, value.clone());
1099                Entry::new(Some(key), value, true, false)
1100            }
1101        }
1102    }
1103
1104    /// Returns a _clone_ of the value corresponding to the key. If the value does
1105    /// not exist, evaluates the `init` closure, and inserts the value if
1106    /// `Some(value)` was returned. If `None` was returned from the closure, this
1107    /// method does not insert a value and returns `None`.
1108    ///
1109    /// # Concurrent calls on the same key
1110    ///
1111    /// This method guarantees that concurrent calls on the same not-existing key are
1112    /// coalesced into one evaluation of the `init` closure. Only one of the calls
1113    /// evaluates its closure, and other calls wait for that closure to complete.
1114    ///
1115    /// The following code snippet demonstrates this behavior:
1116    ///
1117    /// ```rust
1118    /// use moka::sync::Cache;
1119    /// use std::{path::Path, thread};
1120    ///
1121    /// /// This function tries to get the file size in bytes.
1122    /// fn get_file_size(thread_id: u8, path: impl AsRef<Path>) -> Option<u64> {
1123    ///     println!("get_file_size() called by thread {thread_id}.");
1124    ///     std::fs::metadata(path).ok().map(|m| m.len())
1125    /// }
1126    ///
1127    /// let cache = Cache::new(100);
1128    ///
1129    /// // Spawn four threads.
1130    /// let threads: Vec<_> = (0..4_u8)
1131    ///     .map(|thread_id| {
1132    ///         let my_cache = cache.clone();
1133    ///         thread::spawn(move || {
1134    ///             println!("Thread {thread_id} started.");
1135    ///
1136    ///             // Try to insert and get the value for key1. Although all four
1137    ///             // threads will call `optionally_get_with` at the same time,
1138    ///             // get_file_size() must be called only once.
1139    ///             let value = my_cache.optionally_get_with(
1140    ///                 "key1",
1141    ///                 || get_file_size(thread_id, "./Cargo.toml"),
1142    ///             );
1143    ///
1144    ///             // Ensure the value exists now.
1145    ///             assert!(value.is_some());
1146    ///             assert!(my_cache.get(&"key1").is_some());
1147    ///
1148    ///             println!(
1149    ///                 "Thread {thread_id} got the value. (len: {})",
1150    ///                 value.unwrap()
1151    ///             );
1152    ///         })
1153    ///     })
1154    ///     .collect();
1155    ///
1156    /// // Wait all threads to complete.
1157    /// threads
1158    ///     .into_iter()
1159    ///     .for_each(|t| t.join().expect("Thread failed"));
1160    /// ```
1161    ///
1162    /// **Result**
1163    ///
1164    /// - `get_file_size()` was called exactly once by thread 0.
1165    /// - Other threads were blocked until thread 0 inserted the value.
1166    ///
1167    /// ```console
1168    /// Thread 0 started.
1169    /// Thread 1 started.
1170    /// Thread 2 started.
1171    /// get_file_size() called by thread 0.
1172    /// Thread 3 started.
1173    /// Thread 2 got the value. (len: 1466)
1174    /// Thread 0 got the value. (len: 1466)
1175    /// Thread 1 got the value. (len: 1466)
1176    /// Thread 3 got the value. (len: 1466)
1177    /// ```
1178    ///
1179    /// # Panics
1180    ///
1181    /// This method panics when the `init` closure has panicked. When it happens,
1182    /// only the caller whose `init` closure panicked will get the panic (e.g. only
1183    /// thread 1 in the above sample). If there are other calls in progress (e.g.
1184    /// thread 0, 2 and 3 above), this method will restart and resolve one of the
1185    /// remaining `init` closure.
1186    ///
1187    pub fn optionally_get_with<F>(&self, key: K, init: F) -> Option<V>
1188    where
1189        F: FnOnce() -> Option<V>,
1190    {
1191        let hash = self.base.hash(&key);
1192        let key = Arc::new(key);
1193
1194        self.get_or_optionally_insert_with_hash_and_fun(key, hash, init, false)
1195            .map(Entry::into_value)
1196    }
1197
1198    /// Similar to [`optionally_get_with`](#method.optionally_get_with), but instead
1199    /// of passing an owned key, you can pass a reference to the key. If the key does
1200    /// not exist in the cache, the key will be cloned to create new entry in the
1201    /// cache.
1202    pub fn optionally_get_with_by_ref<F, Q>(&self, key: &Q, init: F) -> Option<V>
1203    where
1204        F: FnOnce() -> Option<V>,
1205        K: Borrow<Q>,
1206        Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
1207    {
1208        let hash = self.base.hash(key);
1209        self.get_or_optionally_insert_with_hash_by_ref_and_fun(key, hash, init, false)
1210            .map(Entry::into_value)
1211    }
1212
1213    pub(super) fn get_or_optionally_insert_with_hash_and_fun<F>(
1214        &self,
1215        key: Arc<K>,
1216        hash: u64,
1217        init: F,
1218        need_key: bool,
1219    ) -> Option<Entry<K, V>>
1220    where
1221        F: FnOnce() -> Option<V>,
1222    {
1223        let entry = self.get_with_hash(&key, hash, need_key);
1224        if entry.is_some() {
1225            return entry;
1226        }
1227
1228        self.optionally_insert_with_hash_and_fun(key, hash, init, need_key)
1229    }
1230
1231    pub(super) fn get_or_optionally_insert_with_hash_by_ref_and_fun<F, Q>(
1232        &self,
1233        key: &Q,
1234        hash: u64,
1235        init: F,
1236        need_key: bool,
1237    ) -> Option<Entry<K, V>>
1238    where
1239        F: FnOnce() -> Option<V>,
1240        K: Borrow<Q>,
1241        Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
1242    {
1243        let entry = self.get_with_hash(key, hash, need_key);
1244        if entry.is_some() {
1245            return entry;
1246        }
1247
1248        let key = Arc::new(key.to_owned());
1249        self.optionally_insert_with_hash_and_fun(key, hash, init, need_key)
1250    }
1251
1252    pub(super) fn optionally_insert_with_hash_and_fun<F>(
1253        &self,
1254        key: Arc<K>,
1255        hash: u64,
1256        init: F,
1257        need_key: bool,
1258    ) -> Option<Entry<K, V>>
1259    where
1260        F: FnOnce() -> Option<V>,
1261    {
1262        let get = || {
1263            let ignore_if = None as Option<&mut fn(&V) -> bool>;
1264            self.base
1265                .get_with_hash_without_recording(&key, hash, ignore_if)
1266        };
1267        let insert = |v| self.insert_with_hash(key.clone(), hash, v);
1268
1269        let k = if need_key {
1270            Some(Arc::clone(&key))
1271        } else {
1272            None
1273        };
1274
1275        let type_id = ValueInitializer::<K, V, S>::type_id_for_optionally_get_with();
1276        let post_init = ValueInitializer::<K, V, S>::post_init_for_optionally_get_with;
1277
1278        match self
1279            .value_initializer
1280            .try_init_or_read(&key, type_id, get, init, insert, post_init)
1281        {
1282            InitResult::Initialized(v) => {
1283                crossbeam_epoch::pin().flush();
1284                Some(Entry::new(k, v, true, false))
1285            }
1286            InitResult::ReadExisting(v) => Some(Entry::new(k, v, false, false)),
1287            InitResult::InitErr(_) => {
1288                crossbeam_epoch::pin().flush();
1289                None
1290            }
1291        }
1292    }
1293
1294    /// Returns a _clone_ of the value corresponding to the key. If the value does
1295    /// not exist, evaluates the `init` closure, and inserts the value if `Ok(value)`
1296    /// was returned. If `Err(_)` was returned from the closure, this method does not
1297    /// insert a value and returns the `Err` wrapped by [`std::sync::Arc`][std-arc].
1298    ///
1299    /// [std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html
1300    ///
1301    /// # Concurrent calls on the same key
1302    ///
1303    /// This method guarantees that concurrent calls on the same not-existing key are
1304    /// coalesced into one evaluation of the `init` closure (as long as these
1305    /// closures return the same error type). Only one of the calls evaluates its
1306    /// closure, and other calls wait for that closure to complete.
1307    ///
1308    /// The following code snippet demonstrates this behavior:
1309    ///
1310    /// ```rust
1311    /// use moka::sync::Cache;
1312    /// use std::{path::Path, thread};
1313    ///
1314    /// /// This function tries to get the file size in bytes.
1315    /// fn get_file_size(thread_id: u8, path: impl AsRef<Path>) -> Result<u64, std::io::Error> {
1316    ///     println!("get_file_size() called by thread {thread_id}.");
1317    ///     Ok(std::fs::metadata(path)?.len())
1318    /// }
1319    ///
1320    /// let cache = Cache::new(100);
1321    ///
1322    /// // Spawn four threads.
1323    /// let threads: Vec<_> = (0..4_u8)
1324    ///     .map(|thread_id| {
1325    ///         let my_cache = cache.clone();
1326    ///         thread::spawn(move || {
1327    ///             println!("Thread {thread_id} started.");
1328    ///
1329    ///             // Try to insert and get the value for key1. Although all four
1330    ///             // threads will call `try_get_with` at the same time,
1331    ///             // get_file_size() must be called only once.
1332    ///             let value = my_cache.try_get_with(
1333    ///                 "key1",
1334    ///                 || get_file_size(thread_id, "./Cargo.toml"),
1335    ///             );
1336    ///
1337    ///             // Ensure the value exists now.
1338    ///             assert!(value.is_ok());
1339    ///             assert!(my_cache.get(&"key1").is_some());
1340    ///
1341    ///             println!(
1342    ///                 "Thread {thread_id} got the value. (len: {})",
1343    ///                 value.unwrap()
1344    ///             );
1345    ///         })
1346    ///     })
1347    ///     .collect();
1348    ///
1349    /// // Wait all threads to complete.
1350    /// threads
1351    ///     .into_iter()
1352    ///     .for_each(|t| t.join().expect("Thread failed"));
1353    /// ```
1354    ///
1355    /// **Result**
1356    ///
1357    /// - `get_file_size()` was called exactly once by thread 1.
1358    /// - Other threads were blocked until thread 1 inserted the value.
1359    ///
1360    /// ```console
1361    /// Thread 1 started.
1362    /// Thread 2 started.
1363    /// get_file_size() called by thread 1.
1364    /// Thread 3 started.
1365    /// Thread 0 started.
1366    /// Thread 2 got the value. (len: 1466)
1367    /// Thread 0 got the value. (len: 1466)
1368    /// Thread 1 got the value. (len: 1466)
1369    /// Thread 3 got the value. (len: 1466)
1370    /// ```
1371    ///
1372    /// # Panics
1373    ///
1374    /// This method panics when the `init` closure has panicked. When it happens,
1375    /// only the caller whose `init` closure panicked will get the panic (e.g. only
1376    /// thread 1 in the above sample). If there are other calls in progress (e.g.
1377    /// thread 0, 2 and 3 above), this method will restart and resolve one of the
1378    /// remaining `init` closure.
1379    ///
1380    pub fn try_get_with<F, E>(&self, key: K, init: F) -> Result<V, Arc<E>>
1381    where
1382        F: FnOnce() -> Result<V, E>,
1383        E: Send + Sync + 'static,
1384    {
1385        let hash = self.base.hash(&key);
1386        let key = Arc::new(key);
1387        self.get_or_try_insert_with_hash_and_fun(key, hash, init, false)
1388            .map(Entry::into_value)
1389    }
1390
1391    /// Similar to [`try_get_with`](#method.try_get_with), but instead of passing an
1392    /// owned key, you can pass a reference to the key. If the key does not exist in
1393    /// the cache, the key will be cloned to create new entry in the cache.
1394    pub fn try_get_with_by_ref<F, E, Q>(&self, key: &Q, init: F) -> Result<V, Arc<E>>
1395    where
1396        F: FnOnce() -> Result<V, E>,
1397        E: Send + Sync + 'static,
1398        K: Borrow<Q>,
1399        Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
1400    {
1401        let hash = self.base.hash(key);
1402        self.get_or_try_insert_with_hash_by_ref_and_fun(key, hash, init, false)
1403            .map(Entry::into_value)
1404    }
1405
1406    pub(crate) fn get_or_try_insert_with_hash_and_fun<F, E>(
1407        &self,
1408        key: Arc<K>,
1409        hash: u64,
1410        init: F,
1411        need_key: bool,
1412    ) -> Result<Entry<K, V>, Arc<E>>
1413    where
1414        F: FnOnce() -> Result<V, E>,
1415        E: Send + Sync + 'static,
1416    {
1417        if let Some(entry) = self.get_with_hash(&key, hash, need_key) {
1418            return Ok(entry);
1419        }
1420
1421        self.try_insert_with_hash_and_fun(key, hash, init, need_key)
1422    }
1423
1424    pub(crate) fn get_or_try_insert_with_hash_by_ref_and_fun<F, Q, E>(
1425        &self,
1426        key: &Q,
1427        hash: u64,
1428        init: F,
1429        need_key: bool,
1430    ) -> Result<Entry<K, V>, Arc<E>>
1431    where
1432        F: FnOnce() -> Result<V, E>,
1433        E: Send + Sync + 'static,
1434        K: Borrow<Q>,
1435        Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
1436    {
1437        if let Some(entry) = self.get_with_hash(key, hash, false) {
1438            return Ok(entry);
1439        }
1440
1441        let key = Arc::new(key.to_owned());
1442        self.try_insert_with_hash_and_fun(key, hash, init, need_key)
1443    }
1444
1445    pub(crate) fn try_insert_with_hash_and_fun<F, E>(
1446        &self,
1447        key: Arc<K>,
1448        hash: u64,
1449        init: F,
1450        need_key: bool,
1451    ) -> Result<Entry<K, V>, Arc<E>>
1452    where
1453        F: FnOnce() -> Result<V, E>,
1454        E: Send + Sync + 'static,
1455    {
1456        let get = || {
1457            let ignore_if = None as Option<&mut fn(&V) -> bool>;
1458            self.base
1459                .get_with_hash_without_recording(&key, hash, ignore_if)
1460        };
1461        let insert = |v| self.insert_with_hash(key.clone(), hash, v);
1462
1463        let k = if need_key {
1464            Some(Arc::clone(&key))
1465        } else {
1466            None
1467        };
1468
1469        let type_id = ValueInitializer::<K, V, S>::type_id_for_try_get_with::<E>();
1470        let post_init = ValueInitializer::<K, V, S>::post_init_for_try_get_with;
1471
1472        match self
1473            .value_initializer
1474            .try_init_or_read(&key, type_id, get, init, insert, post_init)
1475        {
1476            InitResult::Initialized(v) => {
1477                crossbeam_epoch::pin().flush();
1478                Ok(Entry::new(k, v, true, false))
1479            }
1480            InitResult::ReadExisting(v) => Ok(Entry::new(k, v, false, false)),
1481            InitResult::InitErr(e) => {
1482                crossbeam_epoch::pin().flush();
1483                Err(e)
1484            }
1485        }
1486    }
1487
1488    /// Inserts a key-value pair into the cache.
1489    ///
1490    /// If the cache has this key present, the value is updated.
1491    pub fn insert(&self, key: K, value: V) {
1492        let hash = self.base.hash(&key);
1493        let key = Arc::new(key);
1494        self.insert_with_hash(key, hash, value);
1495    }
1496
1497    pub(crate) fn insert_with_hash(&self, key: Arc<K>, hash: u64, value: V) {
1498        if self.base.is_map_disabled() {
1499            return;
1500        }
1501
1502        let (op, now) = self.base.do_insert_with_hash(key, hash, value);
1503        let hk = self.base.housekeeper.as_ref();
1504        Self::schedule_write_op(
1505            self.base.inner.as_ref(),
1506            &self.base.write_op_ch,
1507            op,
1508            now,
1509            hk,
1510        )
1511        .expect("Failed to insert");
1512    }
1513
1514    pub(crate) fn compute_with_hash_and_fun<F>(
1515        &self,
1516        key: Arc<K>,
1517        hash: u64,
1518        f: F,
1519    ) -> compute::CompResult<K, V>
1520    where
1521        F: FnOnce(Option<Entry<K, V>>) -> compute::Op<V>,
1522    {
1523        let post_init = ValueInitializer::<K, V, S>::post_init_for_compute_with;
1524        match self
1525            .value_initializer
1526            .try_compute(key, hash, self, f, post_init, true)
1527        {
1528            Ok(result) => result,
1529            Err(_) => unreachable!(),
1530        }
1531    }
1532
1533    pub(crate) fn try_compute_with_hash_and_fun<F, E>(
1534        &self,
1535        key: Arc<K>,
1536        hash: u64,
1537        f: F,
1538    ) -> Result<compute::CompResult<K, V>, E>
1539    where
1540        F: FnOnce(Option<Entry<K, V>>) -> Result<compute::Op<V>, E>,
1541        E: Send + Sync + 'static,
1542    {
1543        let post_init = ValueInitializer::<K, V, S>::post_init_for_try_compute_with;
1544        self.value_initializer
1545            .try_compute(key, hash, self, f, post_init, true)
1546    }
1547
1548    pub(crate) fn upsert_with_hash_and_fun<F>(&self, key: Arc<K>, hash: u64, f: F) -> Entry<K, V>
1549    where
1550        F: FnOnce(Option<Entry<K, V>>) -> V,
1551    {
1552        let post_init = ValueInitializer::<K, V, S>::post_init_for_upsert_with;
1553        match self
1554            .value_initializer
1555            .try_compute(key, hash, self, f, post_init, false)
1556        {
1557            Ok(CompResult::Inserted(entry) | CompResult::ReplacedWith(entry)) => entry,
1558            _ => unreachable!(),
1559        }
1560    }
1561
1562    /// Discards any cached value for the key.
1563    ///
1564    /// If you need to get a the value that has been discarded, use the
1565    /// [`remove`](#method.remove) method instead.
1566    ///
1567    /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
1568    /// on the borrowed form _must_ match those for the key type.
1569    pub fn invalidate<Q>(&self, key: &Q)
1570    where
1571        K: Borrow<Q>,
1572        Q: Hash + Eq + ?Sized,
1573    {
1574        let hash = self.base.hash(key);
1575        self.invalidate_with_hash(key, hash, false);
1576    }
1577
1578    /// Discards any cached value for the key and returns a _clone_ of the value.
1579    ///
1580    /// If you do not need to get the value that has been discarded, use the
1581    /// [`invalidate`](#method.invalidate) method instead.
1582    ///
1583    /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq`
1584    /// on the borrowed form _must_ match those for the key type.
1585    pub fn remove<Q>(&self, key: &Q) -> Option<V>
1586    where
1587        K: Borrow<Q>,
1588        Q: Hash + Eq + ?Sized,
1589    {
1590        let hash = self.base.hash(key);
1591        self.invalidate_with_hash(key, hash, true)
1592    }
1593
1594    pub(crate) fn invalidate_with_hash<Q>(&self, key: &Q, hash: u64, need_value: bool) -> Option<V>
1595    where
1596        K: Borrow<Q>,
1597        Q: Hash + Eq + ?Sized,
1598    {
1599        // Lock the key for removal if blocking removal notification is enabled.
1600        let mut kl = None;
1601        let mut klg = None;
1602        if self.base.is_removal_notifier_enabled() {
1603            // To lock the key, we have to get Arc<K> for key (&Q).
1604            //
1605            // TODO: Enhance this if possible. This is rather hack now because
1606            // it cannot prevent race conditions like this:
1607            //
1608            // 1. We miss the key because it does not exist. So we do not lock
1609            //    the key.
1610            // 2. Somebody else (other thread) inserts the key.
1611            // 3. We remove the entry for the key, but without the key lock!
1612            //
1613            if let Some(arc_key) = self.base.get_key_with_hash(key, hash) {
1614                kl = self.base.maybe_key_lock(&arc_key);
1615                klg = kl.as_ref().map(|kl| kl.lock());
1616            }
1617        }
1618
1619        match self.base.remove_entry(key, hash) {
1620            None => None,
1621            Some(kv) => {
1622                let now = self.base.current_time();
1623
1624                let info = kv.entry.entry_info();
1625                let entry_gen = info.incr_entry_gen();
1626
1627                if self.base.is_removal_notifier_enabled() {
1628                    self.base.notify_invalidate(&kv.key, &kv.entry);
1629                }
1630                // Drop the locks before scheduling write op to avoid a potential
1631                // dead lock. (Scheduling write can do spin lock when the queue is
1632                // full, and queue will be drained by the housekeeping thread that
1633                // can lock the same key)
1634                std::mem::drop(klg);
1635                std::mem::drop(kl);
1636
1637                let maybe_v = if need_value {
1638                    Some(kv.entry.value.clone())
1639                } else {
1640                    None
1641                };
1642
1643                let op = WriteOp::Remove {
1644                    kv_entry: kv,
1645                    entry_gen,
1646                };
1647                let hk = self.base.housekeeper.as_ref();
1648                Self::schedule_write_op(
1649                    self.base.inner.as_ref(),
1650                    &self.base.write_op_ch,
1651                    op,
1652                    now,
1653                    hk,
1654                )
1655                .expect("Failed to remove");
1656                crossbeam_epoch::pin().flush();
1657                maybe_v
1658            }
1659        }
1660    }
1661
1662    /// Discards all cached values.
1663    ///
1664    /// This method returns immediately by just setting the current time as the
1665    /// invalidation time. `get` and other retrieval methods are guaranteed not to
1666    /// return the entries inserted before or at the invalidation time.
1667    ///
1668    /// The actual removal of the invalidated entries is done as a maintenance task
1669    /// driven by a user thread. For more details, see
1670    /// [the Maintenance Tasks section](../index.html#maintenance-tasks) in the crate
1671    /// level documentation.
1672    ///
1673    /// Like the `invalidate` method, this method does not clear the historic
1674    /// popularity estimator of keys so that it retains the client activities of
1675    /// trying to retrieve an item.
1676    pub fn invalidate_all(&self) {
1677        self.base.invalidate_all();
1678    }
1679
1680    /// Discards cached values that satisfy a predicate.
1681    ///
1682    /// `invalidate_entries_if` takes a closure that returns `true` or `false`. The
1683    /// closure is called against each cached entry inserted before or at the time
1684    /// when this method was called. If the closure returns `true` that entry will be
1685    /// evicted from the cache.
1686    ///
1687    /// This method returns immediately by not actually removing the invalidated
1688    /// entries. Instead, it just sets the predicate to the cache with the time when
1689    /// this method was called. The actual removal of the invalidated entries is done
1690    /// as a maintenance task driven by a user thread. For more details, see
1691    /// [the Maintenance Tasks section](../index.html#maintenance-tasks) in the crate
1692    /// level documentation.
1693    ///
1694    /// Also the `get` and other retrieval methods will apply the closure to a cached
1695    /// entry to determine if it should have been invalidated. Therefore, it is
1696    /// guaranteed that these methods must not return invalidated values.
1697    ///
1698    /// Note that you must call
1699    /// [`CacheBuilder::support_invalidation_closures`][support-invalidation-closures]
1700    /// at the cache creation time as the cache needs to maintain additional internal
1701    /// data structures to support this method. Otherwise, calling this method will
1702    /// fail with a
1703    /// [`PredicateError::InvalidationClosuresDisabled`][invalidation-disabled-error].
1704    ///
1705    /// Like the `invalidate` method, this method does not clear the historic
1706    /// popularity estimator of keys so that it retains the client activities of
1707    /// trying to retrieve an item.
1708    ///
1709    /// [support-invalidation-closures]:
1710    ///     ./struct.CacheBuilder.html#method.support_invalidation_closures
1711    /// [invalidation-disabled-error]:
1712    ///     ../enum.PredicateError.html#variant.InvalidationClosuresDisabled
1713    pub fn invalidate_entries_if<F>(&self, predicate: F) -> Result<PredicateId, PredicateError>
1714    where
1715        F: Fn(&K, &V) -> bool + Send + Sync + 'static,
1716    {
1717        self.base.invalidate_entries_if(Arc::new(predicate))
1718    }
1719
1720    pub(crate) fn invalidate_entries_with_arc_fun<F>(
1721        &self,
1722        predicate: Arc<F>,
1723    ) -> Result<PredicateId, PredicateError>
1724    where
1725        F: Fn(&K, &V) -> bool + Send + Sync + 'static,
1726    {
1727        self.base.invalidate_entries_if(predicate)
1728    }
1729
1730    /// Creates an iterator visiting all key-value pairs in arbitrary order. The
1731    /// iterator element type is `(Arc<K>, V)`, where `V` is a clone of a stored
1732    /// value.
1733    ///
1734    /// Iterators do not block concurrent reads and writes on the cache. An entry can
1735    /// be inserted to, invalidated or evicted from a cache while iterators are alive
1736    /// on the same cache.
1737    ///
1738    /// Unlike the `get` method, visiting entries via an iterator do not update the
1739    /// historic popularity estimator or reset idle timers for keys.
1740    ///
1741    /// # Guarantees
1742    ///
1743    /// In order to allow concurrent access to the cache, iterator's `next` method
1744    /// does _not_ guarantee the following:
1745    ///
1746    /// - It does not guarantee to return a key-value pair (an entry) if its key has
1747    ///   been inserted to the cache _after_ the iterator was created.
1748    ///   - Such an entry may or may not be returned depending on key's hash and
1749    ///     timing.
1750    ///
1751    /// and the `next` method guarantees the followings:
1752    ///
1753    /// - It guarantees not to return the same entry more than once.
1754    /// - It guarantees not to return an entry if it has been removed from the cache
1755    ///   after the iterator was created.
1756    ///     - Note: An entry can be removed by following reasons:
1757    ///         - Manually invalidated.
1758    ///         - Expired (e.g. time-to-live).
1759    ///         - Evicted as the cache capacity exceeded.
1760    ///
1761    /// # Examples
1762    ///
1763    /// ```rust
1764    /// use moka::sync::Cache;
1765    ///
1766    /// let cache = Cache::new(100);
1767    /// cache.insert("Julia", 14);
1768    ///
1769    /// let mut iter = cache.iter();
1770    /// let (k, v) = iter.next().unwrap(); // (Arc<K>, V)
1771    /// assert_eq!(*k, "Julia");
1772    /// assert_eq!(v, 14);
1773    ///
1774    /// assert!(iter.next().is_none());
1775    /// ```
1776    ///
1777    pub fn iter(&self) -> Iter<'_, K, V> {
1778        Iter::with_single_cache_segment(&self.base, self.num_cht_segments())
1779    }
1780
1781    /// Performs any pending maintenance operations needed by the cache.
1782    pub fn run_pending_tasks(&self) {
1783        if let Some(hk) = &self.base.housekeeper {
1784            hk.run_pending_tasks(&*self.base.inner);
1785        }
1786    }
1787}
1788
1789impl<'a, K, V, S> IntoIterator for &'a Cache<K, V, S>
1790where
1791    K: Hash + Eq + Send + Sync + 'static,
1792    V: Clone + Send + Sync + 'static,
1793    S: BuildHasher + Clone + Send + Sync + 'static,
1794{
1795    type Item = (Arc<K>, V);
1796
1797    type IntoIter = Iter<'a, K, V>;
1798
1799    fn into_iter(self) -> Self::IntoIter {
1800        self.iter()
1801    }
1802}
1803
1804//
1805// Iterator support
1806//
1807impl<K, V, S> ScanningGet<K, V> for Cache<K, V, S>
1808where
1809    K: Hash + Eq + Send + Sync + 'static,
1810    V: Clone + Send + Sync + 'static,
1811    S: BuildHasher + Clone + Send + Sync + 'static,
1812{
1813    fn num_cht_segments(&self) -> usize {
1814        self.base.num_cht_segments()
1815    }
1816
1817    fn scanning_get(&self, key: &Arc<K>) -> Option<V> {
1818        self.base.scanning_get(key)
1819    }
1820
1821    fn keys(&self, cht_segment: usize) -> Option<Vec<Arc<K>>> {
1822        self.base.keys(cht_segment)
1823    }
1824}
1825
1826//
1827// private methods
1828//
1829impl<K, V, S> Cache<K, V, S>
1830where
1831    K: Hash + Eq + Send + Sync + 'static,
1832    V: Clone + Send + Sync + 'static,
1833    S: BuildHasher + Clone + Send + Sync + 'static,
1834{
1835    // TODO: Like future::Cache, move this method to BaseCache.
1836    #[inline]
1837    fn schedule_write_op(
1838        inner: &impl InnerSync,
1839        ch: &Sender<WriteOp<K, V>>,
1840        op: WriteOp<K, V>,
1841        now: Instant,
1842        housekeeper: Option<&HouseKeeperArc>,
1843    ) -> Result<(), TrySendError<WriteOp<K, V>>> {
1844        let mut op = op;
1845
1846        // NOTES:
1847        // - This will block when the channel is full.
1848        // - We are doing a busy-loop here. We were originally calling `ch.send(op)?`,
1849        //   but we got a notable performance degradation.
1850        loop {
1851            BaseCache::<K, V, S>::apply_reads_writes_if_needed(inner, ch, now, housekeeper);
1852            match ch.try_send(op) {
1853                Ok(()) => break,
1854                Err(TrySendError::Full(op1)) => {
1855                    op = op1;
1856                    std::thread::sleep(Duration::from_micros(WRITE_RETRY_INTERVAL_MICROS));
1857                }
1858                Err(e @ TrySendError::Disconnected(_)) => return Err(e),
1859            }
1860        }
1861        Ok(())
1862    }
1863}
1864
1865// For unit tests.
1866#[cfg(test)]
1867impl<K, V, S> Cache<K, V, S> {
1868    pub(crate) fn is_table_empty(&self) -> bool {
1869        self.entry_count() == 0
1870    }
1871
1872    pub(crate) fn is_waiter_map_empty(&self) -> bool {
1873        self.value_initializer.waiter_count() == 0
1874    }
1875}
1876
1877#[cfg(test)]
1878impl<K, V, S> Cache<K, V, S>
1879where
1880    K: Hash + Eq + Send + Sync + 'static,
1881    V: Clone + Send + Sync + 'static,
1882    S: BuildHasher + Clone + Send + Sync + 'static,
1883{
1884    pub(crate) fn invalidation_predicate_count(&self) -> usize {
1885        self.base.invalidation_predicate_count()
1886    }
1887
1888    pub(crate) fn reconfigure_for_testing(&mut self) {
1889        self.base.reconfigure_for_testing();
1890    }
1891
1892    pub(crate) fn key_locks_map_is_empty(&self) -> bool {
1893        self.base.key_locks_map_is_empty()
1894    }
1895}
1896
1897// To see the debug prints, run test as `cargo test -- --nocapture`
1898#[cfg(test)]
1899mod tests {
1900    use super::Cache;
1901    use crate::{
1902        common::{time::Clock, HousekeeperConfig},
1903        notification::RemovalCause,
1904        policy::{test_utils::ExpiryCallCounters, EvictionPolicy},
1905        Expiry,
1906    };
1907
1908    use parking_lot::Mutex;
1909    use std::{
1910        convert::Infallible,
1911        sync::{
1912            atomic::{AtomicU8, Ordering},
1913            Arc,
1914        },
1915        time::{Duration, Instant as StdInstant},
1916    };
1917
1918    #[test]
1919    fn max_capacity_zero() {
1920        let mut cache = Cache::new(0);
1921        cache.reconfigure_for_testing();
1922
1923        // Make the cache exterior immutable.
1924        let cache = cache;
1925
1926        cache.insert(0, ());
1927
1928        assert!(!cache.contains_key(&0));
1929        assert!(cache.get(&0).is_none());
1930        cache.run_pending_tasks();
1931        assert!(!cache.contains_key(&0));
1932        assert!(cache.get(&0).is_none());
1933        assert_eq!(cache.entry_count(), 0)
1934    }
1935
1936    #[test]
1937    fn basic_single_thread() {
1938        // The following `Vec`s will hold actual and expected notifications.
1939        let actual = Arc::new(Mutex::new(Vec::new()));
1940        let mut expected = Vec::new();
1941
1942        // Create an eviction listener.
1943        let a1 = Arc::clone(&actual);
1944        let listener = move |k, v, cause| a1.lock().push((k, v, cause));
1945
1946        // Create a cache with the eviction listener.
1947        let mut cache = Cache::builder()
1948            .max_capacity(3)
1949            .eviction_listener(listener)
1950            .build();
1951        cache.reconfigure_for_testing();
1952
1953        // Make the cache exterior immutable.
1954        let cache = cache;
1955
1956        cache.insert("a", "alice");
1957        cache.insert("b", "bob");
1958        assert_eq!(cache.get(&"a"), Some("alice"));
1959        assert!(cache.contains_key(&"a"));
1960        assert!(cache.contains_key(&"b"));
1961        assert_eq!(cache.get(&"b"), Some("bob"));
1962        cache.run_pending_tasks();
1963        // counts: a -> 1, b -> 1
1964
1965        cache.insert("c", "cindy");
1966        assert_eq!(cache.get(&"c"), Some("cindy"));
1967        assert!(cache.contains_key(&"c"));
1968        // counts: a -> 1, b -> 1, c -> 1
1969        cache.run_pending_tasks();
1970
1971        assert!(cache.contains_key(&"a"));
1972        assert_eq!(cache.get(&"a"), Some("alice"));
1973        assert_eq!(cache.get(&"b"), Some("bob"));
1974        assert!(cache.contains_key(&"b"));
1975        cache.run_pending_tasks();
1976        // counts: a -> 2, b -> 2, c -> 1
1977
1978        // "d" should not be admitted because its frequency is too low.
1979        cache.insert("d", "david"); //   count: d -> 0
1980        expected.push((Arc::new("d"), "david", RemovalCause::Size));
1981        cache.run_pending_tasks();
1982        assert_eq!(cache.get(&"d"), None); //   d -> 1
1983        assert!(!cache.contains_key(&"d"));
1984
1985        cache.insert("d", "david");
1986        expected.push((Arc::new("d"), "david", RemovalCause::Size));
1987        cache.run_pending_tasks();
1988        assert!(!cache.contains_key(&"d"));
1989        assert_eq!(cache.get(&"d"), None); //   d -> 2
1990
1991        // "d" should be admitted and "c" should be evicted
1992        // because d's frequency is higher than c's.
1993        cache.insert("d", "dennis");
1994        expected.push((Arc::new("c"), "cindy", RemovalCause::Size));
1995        cache.run_pending_tasks();
1996        assert_eq!(cache.get(&"a"), Some("alice"));
1997        assert_eq!(cache.get(&"b"), Some("bob"));
1998        assert_eq!(cache.get(&"c"), None);
1999        assert_eq!(cache.get(&"d"), Some("dennis"));
2000        assert!(cache.contains_key(&"a"));
2001        assert!(cache.contains_key(&"b"));
2002        assert!(!cache.contains_key(&"c"));
2003        assert!(cache.contains_key(&"d"));
2004
2005        cache.invalidate(&"b");
2006        expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
2007        cache.run_pending_tasks();
2008        assert_eq!(cache.get(&"b"), None);
2009        assert!(!cache.contains_key(&"b"));
2010
2011        assert!(cache.remove(&"b").is_none());
2012        assert_eq!(cache.remove(&"d"), Some("dennis"));
2013        expected.push((Arc::new("d"), "dennis", RemovalCause::Explicit));
2014        cache.run_pending_tasks();
2015        assert_eq!(cache.get(&"d"), None);
2016        assert!(!cache.contains_key(&"d"));
2017
2018        verify_notification_vec(&cache, actual, &expected);
2019        assert!(cache.key_locks_map_is_empty());
2020    }
2021
2022    #[test]
2023    fn basic_lru_single_thread() {
2024        // The following `Vec`s will hold actual and expected notifications.
2025        let actual = Arc::new(Mutex::new(Vec::new()));
2026        let mut expected = Vec::new();
2027
2028        // Create an eviction listener.
2029        let a1 = Arc::clone(&actual);
2030        let listener = move |k, v, cause| a1.lock().push((k, v, cause));
2031
2032        // Create a cache with the eviction listener.
2033        let mut cache = Cache::builder()
2034            .max_capacity(3)
2035            .eviction_policy(EvictionPolicy::lru())
2036            .eviction_listener(listener)
2037            .build();
2038        cache.reconfigure_for_testing();
2039
2040        // Make the cache exterior immutable.
2041        let cache = cache;
2042
2043        cache.insert("a", "alice");
2044        cache.insert("b", "bob");
2045        assert_eq!(cache.get(&"a"), Some("alice"));
2046        assert!(cache.contains_key(&"a"));
2047        assert!(cache.contains_key(&"b"));
2048        assert_eq!(cache.get(&"b"), Some("bob"));
2049        cache.run_pending_tasks();
2050        // a -> b
2051
2052        cache.insert("c", "cindy");
2053        assert_eq!(cache.get(&"c"), Some("cindy"));
2054        assert!(cache.contains_key(&"c"));
2055        cache.run_pending_tasks();
2056        // a -> b -> c
2057
2058        assert!(cache.contains_key(&"a"));
2059        assert_eq!(cache.get(&"a"), Some("alice"));
2060        assert_eq!(cache.get(&"b"), Some("bob"));
2061        assert!(cache.contains_key(&"b"));
2062        cache.run_pending_tasks();
2063        // c -> a -> b
2064
2065        // "d" should be admitted because the cache uses the LRU strategy.
2066        cache.insert("d", "david");
2067        // "c" is the LRU and should have be evicted.
2068        expected.push((Arc::new("c"), "cindy", RemovalCause::Size));
2069        cache.run_pending_tasks();
2070
2071        assert_eq!(cache.get(&"a"), Some("alice"));
2072        assert_eq!(cache.get(&"b"), Some("bob"));
2073        assert_eq!(cache.get(&"c"), None);
2074        assert_eq!(cache.get(&"d"), Some("david"));
2075        assert!(cache.contains_key(&"a"));
2076        assert!(cache.contains_key(&"b"));
2077        assert!(!cache.contains_key(&"c"));
2078        assert!(cache.contains_key(&"d"));
2079        cache.run_pending_tasks();
2080        // a -> b -> d
2081
2082        cache.invalidate(&"b");
2083        expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
2084        cache.run_pending_tasks();
2085        // a -> d
2086        assert_eq!(cache.get(&"b"), None);
2087        assert!(!cache.contains_key(&"b"));
2088
2089        assert!(cache.remove(&"b").is_none());
2090        assert_eq!(cache.remove(&"d"), Some("david"));
2091        expected.push((Arc::new("d"), "david", RemovalCause::Explicit));
2092        cache.run_pending_tasks();
2093        // a
2094        assert_eq!(cache.get(&"d"), None);
2095        assert!(!cache.contains_key(&"d"));
2096
2097        cache.insert("e", "emily");
2098        cache.insert("f", "frank");
2099        // "a" should be evicted because it is the LRU.
2100        cache.insert("g", "gina");
2101        expected.push((Arc::new("a"), "alice", RemovalCause::Size));
2102        cache.run_pending_tasks();
2103        // e -> f -> g
2104        assert_eq!(cache.get(&"a"), None);
2105        assert_eq!(cache.get(&"e"), Some("emily"));
2106        assert_eq!(cache.get(&"f"), Some("frank"));
2107        assert_eq!(cache.get(&"g"), Some("gina"));
2108        assert!(!cache.contains_key(&"a"));
2109        assert!(cache.contains_key(&"e"));
2110        assert!(cache.contains_key(&"f"));
2111        assert!(cache.contains_key(&"g"));
2112
2113        verify_notification_vec(&cache, actual, &expected);
2114        assert!(cache.key_locks_map_is_empty());
2115    }
2116
2117    #[test]
2118    fn size_aware_eviction() {
2119        let weigher = |_k: &&str, v: &(&str, u32)| v.1;
2120
2121        let alice = ("alice", 10);
2122        let bob = ("bob", 15);
2123        let bill = ("bill", 20);
2124        let cindy = ("cindy", 5);
2125        let david = ("david", 15);
2126        let dennis = ("dennis", 15);
2127
2128        // The following `Vec`s will hold actual and expected notifications.
2129        let actual = Arc::new(Mutex::new(Vec::new()));
2130        let mut expected = Vec::new();
2131
2132        // Create an eviction listener.
2133        let a1 = Arc::clone(&actual);
2134        let listener = move |k, v, cause| a1.lock().push((k, v, cause));
2135
2136        // Create a cache with the eviction listener.
2137        let mut cache = Cache::builder()
2138            .max_capacity(31)
2139            .weigher(weigher)
2140            .eviction_listener(listener)
2141            .build();
2142        cache.reconfigure_for_testing();
2143
2144        // Make the cache exterior immutable.
2145        let cache = cache;
2146
2147        cache.insert("a", alice);
2148        cache.insert("b", bob);
2149        assert_eq!(cache.get(&"a"), Some(alice));
2150        assert!(cache.contains_key(&"a"));
2151        assert!(cache.contains_key(&"b"));
2152        assert_eq!(cache.get(&"b"), Some(bob));
2153        cache.run_pending_tasks();
2154        // order (LRU -> MRU) and counts: a -> 1, b -> 1
2155
2156        cache.insert("c", cindy);
2157        assert_eq!(cache.get(&"c"), Some(cindy));
2158        assert!(cache.contains_key(&"c"));
2159        // order and counts: a -> 1, b -> 1, c -> 1
2160        cache.run_pending_tasks();
2161
2162        assert!(cache.contains_key(&"a"));
2163        assert_eq!(cache.get(&"a"), Some(alice));
2164        assert_eq!(cache.get(&"b"), Some(bob));
2165        assert!(cache.contains_key(&"b"));
2166        cache.run_pending_tasks();
2167        // order and counts: c -> 1, a -> 2, b -> 2
2168
2169        // To enter "d" (weight: 15), it needs to evict "c" (w: 5) and "a" (w: 10).
2170        // "d" must have higher count than 3, which is the aggregated count
2171        // of "a" and "c".
2172        cache.insert("d", david); //   count: d -> 0
2173        expected.push((Arc::new("d"), david, RemovalCause::Size));
2174        cache.run_pending_tasks();
2175        assert_eq!(cache.get(&"d"), None); //   d -> 1
2176        assert!(!cache.contains_key(&"d"));
2177
2178        cache.insert("d", david);
2179        expected.push((Arc::new("d"), david, RemovalCause::Size));
2180        cache.run_pending_tasks();
2181        assert!(!cache.contains_key(&"d"));
2182        assert_eq!(cache.get(&"d"), None); //   d -> 2
2183
2184        cache.insert("d", david);
2185        expected.push((Arc::new("d"), david, RemovalCause::Size));
2186        cache.run_pending_tasks();
2187        assert_eq!(cache.get(&"d"), None); //   d -> 3
2188        assert!(!cache.contains_key(&"d"));
2189
2190        cache.insert("d", david);
2191        expected.push((Arc::new("d"), david, RemovalCause::Size));
2192        cache.run_pending_tasks();
2193        assert!(!cache.contains_key(&"d"));
2194        assert_eq!(cache.get(&"d"), None); //   d -> 4
2195
2196        // Finally "d" should be admitted by evicting "c" and "a".
2197        cache.insert("d", dennis);
2198        expected.push((Arc::new("c"), cindy, RemovalCause::Size));
2199        expected.push((Arc::new("a"), alice, RemovalCause::Size));
2200        cache.run_pending_tasks();
2201        assert_eq!(cache.get(&"a"), None);
2202        assert_eq!(cache.get(&"b"), Some(bob));
2203        assert_eq!(cache.get(&"c"), None);
2204        assert_eq!(cache.get(&"d"), Some(dennis));
2205        assert!(!cache.contains_key(&"a"));
2206        assert!(cache.contains_key(&"b"));
2207        assert!(!cache.contains_key(&"c"));
2208        assert!(cache.contains_key(&"d"));
2209
2210        // Update "b" with "bill" (w: 15 -> 20). This should evict "d" (w: 15).
2211        cache.insert("b", bill);
2212        expected.push((Arc::new("b"), bob, RemovalCause::Replaced));
2213        expected.push((Arc::new("d"), dennis, RemovalCause::Size));
2214        cache.run_pending_tasks();
2215        assert_eq!(cache.get(&"b"), Some(bill));
2216        assert_eq!(cache.get(&"d"), None);
2217        assert!(cache.contains_key(&"b"));
2218        assert!(!cache.contains_key(&"d"));
2219
2220        // Re-add "a" (w: 10) and update "b" with "bob" (w: 20 -> 15).
2221        cache.insert("a", alice);
2222        cache.insert("b", bob);
2223        expected.push((Arc::new("b"), bill, RemovalCause::Replaced));
2224        cache.run_pending_tasks();
2225        assert_eq!(cache.get(&"a"), Some(alice));
2226        assert_eq!(cache.get(&"b"), Some(bob));
2227        assert_eq!(cache.get(&"d"), None);
2228        assert!(cache.contains_key(&"a"));
2229        assert!(cache.contains_key(&"b"));
2230        assert!(!cache.contains_key(&"d"));
2231
2232        // Verify the sizes.
2233        assert_eq!(cache.entry_count(), 2);
2234        assert_eq!(cache.weighted_size(), 25);
2235
2236        verify_notification_vec(&cache, actual, &expected);
2237        assert!(cache.key_locks_map_is_empty());
2238    }
2239
2240    #[test]
2241    fn basic_multi_threads() {
2242        let num_threads = 4;
2243        let cache = Cache::new(100);
2244
2245        // https://rust-lang.github.io/rust-clippy/master/index.html#needless_collect
2246        #[allow(clippy::needless_collect)]
2247        let handles = (0..num_threads)
2248            .map(|id| {
2249                let cache = cache.clone();
2250                std::thread::spawn(move || {
2251                    cache.insert(10, format!("{id}-100"));
2252                    cache.get(&10);
2253                    cache.insert(20, format!("{id}-200"));
2254                    cache.invalidate(&10);
2255                })
2256            })
2257            .collect::<Vec<_>>();
2258
2259        handles.into_iter().for_each(|h| h.join().expect("Failed"));
2260
2261        assert!(cache.get(&10).is_none());
2262        assert!(cache.get(&20).is_some());
2263        assert!(!cache.contains_key(&10));
2264        assert!(cache.contains_key(&20));
2265    }
2266
2267    #[test]
2268    fn invalidate_all() {
2269        // The following `Vec`s will hold actual and expected notifications.
2270        let actual = Arc::new(Mutex::new(Vec::new()));
2271        let mut expected = Vec::new();
2272
2273        // Create an eviction listener.
2274        let a1 = Arc::clone(&actual);
2275        let listener = move |k, v, cause| a1.lock().push((k, v, cause));
2276
2277        // Create a cache with the eviction listener.
2278        let mut cache = Cache::builder()
2279            .max_capacity(100)
2280            .eviction_listener(listener)
2281            .build();
2282        cache.reconfigure_for_testing();
2283
2284        // Make the cache exterior immutable.
2285        let cache = cache;
2286
2287        cache.insert("a", "alice");
2288        cache.insert("b", "bob");
2289        cache.insert("c", "cindy");
2290        assert_eq!(cache.get(&"a"), Some("alice"));
2291        assert_eq!(cache.get(&"b"), Some("bob"));
2292        assert_eq!(cache.get(&"c"), Some("cindy"));
2293        assert!(cache.contains_key(&"a"));
2294        assert!(cache.contains_key(&"b"));
2295        assert!(cache.contains_key(&"c"));
2296
2297        // `cache.run_pending_tasks()` is no longer needed here before invalidating. The last
2298        // modified timestamp of the entries were updated when they were inserted.
2299        // https://github.com/moka-rs/moka/issues/155
2300
2301        cache.invalidate_all();
2302        expected.push((Arc::new("a"), "alice", RemovalCause::Explicit));
2303        expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
2304        expected.push((Arc::new("c"), "cindy", RemovalCause::Explicit));
2305        cache.run_pending_tasks();
2306
2307        cache.insert("d", "david");
2308        cache.run_pending_tasks();
2309
2310        assert!(cache.get(&"a").is_none());
2311        assert!(cache.get(&"b").is_none());
2312        assert!(cache.get(&"c").is_none());
2313        assert_eq!(cache.get(&"d"), Some("david"));
2314        assert!(!cache.contains_key(&"a"));
2315        assert!(!cache.contains_key(&"b"));
2316        assert!(!cache.contains_key(&"c"));
2317        assert!(cache.contains_key(&"d"));
2318
2319        verify_notification_vec(&cache, actual, &expected);
2320    }
2321
2322    #[test]
2323    fn invalidate_entries_if() -> Result<(), Box<dyn std::error::Error>> {
2324        use std::collections::HashSet;
2325
2326        // The following `Vec`s will hold actual and expected notifications.
2327        let actual = Arc::new(Mutex::new(Vec::new()));
2328        let mut expected = Vec::new();
2329
2330        // Create an eviction listener.
2331        let a1 = Arc::clone(&actual);
2332        let listener = move |k, v, cause| a1.lock().push((k, v, cause));
2333
2334        let (clock, mock) = Clock::mock();
2335
2336        // Create a cache with the eviction listener.
2337        let mut cache = Cache::builder()
2338            .max_capacity(100)
2339            .support_invalidation_closures()
2340            .eviction_listener(listener)
2341            .clock(clock)
2342            .build();
2343        cache.reconfigure_for_testing();
2344
2345        // Make the cache exterior immutable.
2346        let cache = cache;
2347
2348        cache.insert(0, "alice");
2349        cache.insert(1, "bob");
2350        cache.insert(2, "alex");
2351        cache.run_pending_tasks();
2352
2353        mock.increment(Duration::from_secs(5)); // 5 secs from the start.
2354        cache.run_pending_tasks();
2355
2356        assert_eq!(cache.get(&0), Some("alice"));
2357        assert_eq!(cache.get(&1), Some("bob"));
2358        assert_eq!(cache.get(&2), Some("alex"));
2359        assert!(cache.contains_key(&0));
2360        assert!(cache.contains_key(&1));
2361        assert!(cache.contains_key(&2));
2362
2363        let names = ["alice", "alex"].iter().cloned().collect::<HashSet<_>>();
2364        cache.invalidate_entries_if(move |_k, &v| names.contains(v))?;
2365        assert_eq!(cache.base.invalidation_predicate_count(), 1);
2366        expected.push((Arc::new(0), "alice", RemovalCause::Explicit));
2367        expected.push((Arc::new(2), "alex", RemovalCause::Explicit));
2368
2369        mock.increment(Duration::from_secs(5)); // 10 secs from the start.
2370
2371        cache.insert(3, "alice");
2372
2373        // Run the invalidation task and wait for it to finish. (TODO: Need a better way than sleeping)
2374        cache.run_pending_tasks(); // To submit the invalidation task.
2375        std::thread::sleep(Duration::from_millis(200));
2376        cache.run_pending_tasks(); // To process the task result.
2377        std::thread::sleep(Duration::from_millis(200));
2378
2379        assert!(cache.get(&0).is_none());
2380        assert!(cache.get(&2).is_none());
2381        assert_eq!(cache.get(&1), Some("bob"));
2382        // This should survive as it was inserted after calling invalidate_entries_if.
2383        assert_eq!(cache.get(&3), Some("alice"));
2384
2385        assert!(!cache.contains_key(&0));
2386        assert!(cache.contains_key(&1));
2387        assert!(!cache.contains_key(&2));
2388        assert!(cache.contains_key(&3));
2389
2390        assert_eq!(cache.entry_count(), 2);
2391        assert_eq!(cache.invalidation_predicate_count(), 0);
2392
2393        mock.increment(Duration::from_secs(5)); // 15 secs from the start.
2394
2395        cache.invalidate_entries_if(|_k, &v| v == "alice")?;
2396        cache.invalidate_entries_if(|_k, &v| v == "bob")?;
2397        assert_eq!(cache.invalidation_predicate_count(), 2);
2398        // key 1 was inserted before key 3.
2399        expected.push((Arc::new(1), "bob", RemovalCause::Explicit));
2400        expected.push((Arc::new(3), "alice", RemovalCause::Explicit));
2401
2402        // Run the invalidation task and wait for it to finish. (TODO: Need a better way than sleeping)
2403        cache.run_pending_tasks(); // To submit the invalidation task.
2404        std::thread::sleep(Duration::from_millis(200));
2405        cache.run_pending_tasks(); // To process the task result.
2406        std::thread::sleep(Duration::from_millis(200));
2407
2408        assert!(cache.get(&1).is_none());
2409        assert!(cache.get(&3).is_none());
2410
2411        assert!(!cache.contains_key(&1));
2412        assert!(!cache.contains_key(&3));
2413
2414        assert_eq!(cache.entry_count(), 0);
2415        assert_eq!(cache.invalidation_predicate_count(), 0);
2416
2417        verify_notification_vec(&cache, actual, &expected);
2418
2419        Ok(())
2420    }
2421
2422    #[test]
2423    fn time_to_live() {
2424        // The following `Vec`s will hold actual and expected notifications.
2425        let actual = Arc::new(Mutex::new(Vec::new()));
2426        let mut expected = Vec::new();
2427
2428        // Create an eviction listener.
2429        let a1 = Arc::clone(&actual);
2430        let listener = move |k, v, cause| a1.lock().push((k, v, cause));
2431
2432        let (clock, mock) = Clock::mock();
2433
2434        // Create a cache with the eviction listener.
2435        let mut cache = Cache::builder()
2436            .max_capacity(100)
2437            .time_to_live(Duration::from_secs(10))
2438            .eviction_listener(listener)
2439            .clock(clock)
2440            .build();
2441        cache.reconfigure_for_testing();
2442
2443        // Make the cache exterior immutable.
2444        let cache = cache;
2445
2446        cache.insert("a", "alice");
2447        cache.run_pending_tasks();
2448
2449        mock.increment(Duration::from_secs(5)); // 5 secs from the start.
2450        cache.run_pending_tasks();
2451
2452        assert_eq!(cache.get(&"a"), Some("alice"));
2453        assert!(cache.contains_key(&"a"));
2454
2455        mock.increment(Duration::from_secs(5)); // 10 secs.
2456        expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
2457        assert_eq!(cache.get(&"a"), None);
2458        assert!(!cache.contains_key(&"a"));
2459
2460        assert_eq!(cache.iter().count(), 0);
2461
2462        cache.run_pending_tasks();
2463        assert!(cache.is_table_empty());
2464
2465        cache.insert("b", "bob");
2466        cache.run_pending_tasks();
2467
2468        assert_eq!(cache.entry_count(), 1);
2469
2470        mock.increment(Duration::from_secs(5)); // 15 secs.
2471        cache.run_pending_tasks();
2472
2473        assert_eq!(cache.get(&"b"), Some("bob"));
2474        assert!(cache.contains_key(&"b"));
2475        assert_eq!(cache.entry_count(), 1);
2476
2477        cache.insert("b", "bill");
2478        expected.push((Arc::new("b"), "bob", RemovalCause::Replaced));
2479        cache.run_pending_tasks();
2480
2481        mock.increment(Duration::from_secs(5)); // 20 secs
2482        cache.run_pending_tasks();
2483
2484        assert_eq!(cache.get(&"b"), Some("bill"));
2485        assert!(cache.contains_key(&"b"));
2486        assert_eq!(cache.entry_count(), 1);
2487
2488        mock.increment(Duration::from_secs(5)); // 25 secs
2489        expected.push((Arc::new("b"), "bill", RemovalCause::Expired));
2490
2491        assert_eq!(cache.get(&"a"), None);
2492        assert_eq!(cache.get(&"b"), None);
2493        assert!(!cache.contains_key(&"a"));
2494        assert!(!cache.contains_key(&"b"));
2495
2496        assert_eq!(cache.iter().count(), 0);
2497
2498        cache.run_pending_tasks();
2499        assert!(cache.is_table_empty());
2500
2501        verify_notification_vec(&cache, actual, &expected);
2502    }
2503
2504    #[test]
2505    fn time_to_idle() {
2506        // The following `Vec`s will hold actual and expected notifications.
2507        let actual = Arc::new(Mutex::new(Vec::new()));
2508        let mut expected = Vec::new();
2509
2510        // Create an eviction listener.
2511        let a1 = Arc::clone(&actual);
2512        let listener = move |k, v, cause| a1.lock().push((k, v, cause));
2513
2514        let (clock, mock) = Clock::mock();
2515
2516        // Create a cache with the eviction listener.
2517        let mut cache = Cache::builder()
2518            .max_capacity(100)
2519            .time_to_idle(Duration::from_secs(10))
2520            .eviction_listener(listener)
2521            .clock(clock)
2522            .build();
2523        cache.reconfigure_for_testing();
2524
2525        // Make the cache exterior immutable.
2526        let cache = cache;
2527
2528        cache.insert("a", "alice");
2529        cache.run_pending_tasks();
2530
2531        mock.increment(Duration::from_secs(5)); // 5 secs from the start.
2532        cache.run_pending_tasks();
2533
2534        assert_eq!(cache.get(&"a"), Some("alice"));
2535
2536        mock.increment(Duration::from_secs(5)); // 10 secs.
2537        cache.run_pending_tasks();
2538
2539        cache.insert("b", "bob");
2540        cache.run_pending_tasks();
2541
2542        assert_eq!(cache.entry_count(), 2);
2543
2544        mock.increment(Duration::from_secs(2)); // 12 secs.
2545        cache.run_pending_tasks();
2546
2547        // contains_key does not reset the idle timer for the key.
2548        assert!(cache.contains_key(&"a"));
2549        assert!(cache.contains_key(&"b"));
2550        cache.run_pending_tasks();
2551
2552        assert_eq!(cache.entry_count(), 2);
2553
2554        mock.increment(Duration::from_secs(3)); // 15 secs.
2555        expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
2556
2557        assert_eq!(cache.get(&"a"), None);
2558        assert_eq!(cache.get(&"b"), Some("bob"));
2559        assert!(!cache.contains_key(&"a"));
2560        assert!(cache.contains_key(&"b"));
2561
2562        assert_eq!(cache.iter().count(), 1);
2563
2564        cache.run_pending_tasks();
2565        assert_eq!(cache.entry_count(), 1);
2566
2567        mock.increment(Duration::from_secs(10)); // 25 secs
2568        expected.push((Arc::new("b"), "bob", RemovalCause::Expired));
2569
2570        assert_eq!(cache.get(&"a"), None);
2571        assert_eq!(cache.get(&"b"), None);
2572        assert!(!cache.contains_key(&"a"));
2573        assert!(!cache.contains_key(&"b"));
2574
2575        assert_eq!(cache.iter().count(), 0);
2576
2577        cache.run_pending_tasks();
2578        assert!(cache.is_table_empty());
2579
2580        verify_notification_vec(&cache, actual, &expected);
2581    }
2582
2583    // https://github.com/moka-rs/moka/issues/359
2584    #[test]
2585    fn ensure_access_time_is_updated_immediately_after_read() {
2586        let (clock, mock) = Clock::mock();
2587        let mut cache = Cache::builder()
2588            .max_capacity(10)
2589            .time_to_idle(Duration::from_secs(5))
2590            .clock(clock)
2591            .build();
2592        cache.reconfigure_for_testing();
2593
2594        // Make the cache exterior immutable.
2595        let cache = cache;
2596
2597        cache.insert(1, 1);
2598
2599        mock.increment(Duration::from_secs(4));
2600        assert_eq!(cache.get(&1), Some(1));
2601
2602        mock.increment(Duration::from_secs(2));
2603        assert_eq!(cache.get(&1), Some(1));
2604        cache.run_pending_tasks();
2605        assert_eq!(cache.get(&1), Some(1));
2606    }
2607
2608    #[test]
2609    fn time_to_live_by_expiry_type() {
2610        // Define an expiry type.
2611        struct MyExpiry {
2612            counters: Arc<ExpiryCallCounters>,
2613        }
2614
2615        impl MyExpiry {
2616            fn new(counters: Arc<ExpiryCallCounters>) -> Self {
2617                Self { counters }
2618            }
2619        }
2620
2621        impl Expiry<&str, &str> for MyExpiry {
2622            fn expire_after_create(
2623                &self,
2624                _key: &&str,
2625                _value: &&str,
2626                _current_time: StdInstant,
2627            ) -> Option<Duration> {
2628                self.counters.incl_actual_creations();
2629                Some(Duration::from_secs(10))
2630            }
2631
2632            fn expire_after_update(
2633                &self,
2634                _key: &&str,
2635                _value: &&str,
2636                _current_time: StdInstant,
2637                _current_duration: Option<Duration>,
2638            ) -> Option<Duration> {
2639                self.counters.incl_actual_updates();
2640                Some(Duration::from_secs(10))
2641            }
2642        }
2643
2644        // The following `Vec`s will hold actual and expected notifications.
2645        let actual = Arc::new(Mutex::new(Vec::new()));
2646        let mut expected = Vec::new();
2647
2648        // Create expiry counters and the expiry.
2649        let expiry_counters = Arc::new(ExpiryCallCounters::default());
2650        let expiry = MyExpiry::new(Arc::clone(&expiry_counters));
2651
2652        // Create an eviction listener.
2653        let a1 = Arc::clone(&actual);
2654        let listener = move |k, v, cause| a1.lock().push((k, v, cause));
2655
2656        let (clock, mock) = Clock::mock();
2657
2658        // Create a cache with the eviction listener.
2659        let mut cache = Cache::builder()
2660            .max_capacity(100)
2661            .expire_after(expiry)
2662            .eviction_listener(listener)
2663            .clock(clock)
2664            .build();
2665        cache.reconfigure_for_testing();
2666
2667        // Make the cache exterior immutable.
2668        let cache = cache;
2669
2670        cache.insert("a", "alice");
2671        expiry_counters.incl_expected_creations();
2672        cache.run_pending_tasks();
2673
2674        mock.increment(Duration::from_secs(5)); // 5 secs from the start.
2675        cache.run_pending_tasks();
2676
2677        assert_eq!(cache.get(&"a"), Some("alice"));
2678        assert!(cache.contains_key(&"a"));
2679
2680        mock.increment(Duration::from_secs(5)); // 10 secs.
2681        expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
2682        assert_eq!(cache.get(&"a"), None);
2683        assert!(!cache.contains_key(&"a"));
2684
2685        assert_eq!(cache.iter().count(), 0);
2686
2687        cache.run_pending_tasks();
2688        assert!(cache.is_table_empty());
2689
2690        cache.insert("b", "bob");
2691        expiry_counters.incl_expected_creations();
2692        cache.run_pending_tasks();
2693
2694        assert_eq!(cache.entry_count(), 1);
2695
2696        mock.increment(Duration::from_secs(5)); // 15 secs.
2697        cache.run_pending_tasks();
2698
2699        assert_eq!(cache.get(&"b"), Some("bob"));
2700        assert!(cache.contains_key(&"b"));
2701        assert_eq!(cache.entry_count(), 1);
2702
2703        cache.insert("b", "bill");
2704        expected.push((Arc::new("b"), "bob", RemovalCause::Replaced));
2705        expiry_counters.incl_expected_updates();
2706        cache.run_pending_tasks();
2707
2708        mock.increment(Duration::from_secs(5)); // 20 secs
2709        cache.run_pending_tasks();
2710
2711        assert_eq!(cache.get(&"b"), Some("bill"));
2712        assert!(cache.contains_key(&"b"));
2713        assert_eq!(cache.entry_count(), 1);
2714
2715        mock.increment(Duration::from_secs(5)); // 25 secs
2716        expected.push((Arc::new("b"), "bill", RemovalCause::Expired));
2717
2718        assert_eq!(cache.get(&"a"), None);
2719        assert_eq!(cache.get(&"b"), None);
2720        assert!(!cache.contains_key(&"a"));
2721        assert!(!cache.contains_key(&"b"));
2722
2723        assert_eq!(cache.iter().count(), 0);
2724
2725        cache.run_pending_tasks();
2726        assert!(cache.is_table_empty());
2727
2728        expiry_counters.verify();
2729        verify_notification_vec(&cache, actual, &expected);
2730    }
2731
2732    #[test]
2733    fn time_to_idle_by_expiry_type() {
2734        // Define an expiry type.
2735        struct MyExpiry {
2736            counters: Arc<ExpiryCallCounters>,
2737        }
2738
2739        impl MyExpiry {
2740            fn new(counters: Arc<ExpiryCallCounters>) -> Self {
2741                Self { counters }
2742            }
2743        }
2744
2745        impl Expiry<&str, &str> for MyExpiry {
2746            fn expire_after_read(
2747                &self,
2748                _key: &&str,
2749                _value: &&str,
2750                _current_time: StdInstant,
2751                _current_duration: Option<Duration>,
2752                _last_modified_at: StdInstant,
2753            ) -> Option<Duration> {
2754                self.counters.incl_actual_reads();
2755                Some(Duration::from_secs(10))
2756            }
2757        }
2758
2759        // The following `Vec`s will hold actual and expected notifications.
2760        let actual = Arc::new(Mutex::new(Vec::new()));
2761        let mut expected = Vec::new();
2762
2763        // Create expiry counters and the expiry.
2764        let expiry_counters = Arc::new(ExpiryCallCounters::default());
2765        let expiry = MyExpiry::new(Arc::clone(&expiry_counters));
2766
2767        // Create an eviction listener.
2768        let a1 = Arc::clone(&actual);
2769        let listener = move |k, v, cause| a1.lock().push((k, v, cause));
2770
2771        let (clock, mock) = Clock::mock();
2772
2773        // Create a cache with the eviction listener.
2774        let mut cache = Cache::builder()
2775            .max_capacity(100)
2776            .expire_after(expiry)
2777            .eviction_listener(listener)
2778            .clock(clock)
2779            .build();
2780        cache.reconfigure_for_testing();
2781
2782        // Make the cache exterior immutable.
2783        let cache = cache;
2784
2785        cache.insert("a", "alice");
2786        cache.run_pending_tasks();
2787
2788        mock.increment(Duration::from_secs(5)); // 5 secs from the start.
2789        cache.run_pending_tasks();
2790
2791        assert_eq!(cache.get(&"a"), Some("alice"));
2792        expiry_counters.incl_expected_reads();
2793
2794        mock.increment(Duration::from_secs(5)); // 10 secs.
2795        cache.run_pending_tasks();
2796
2797        cache.insert("b", "bob");
2798        cache.run_pending_tasks();
2799
2800        assert_eq!(cache.entry_count(), 2);
2801
2802        mock.increment(Duration::from_secs(2)); // 12 secs.
2803        cache.run_pending_tasks();
2804
2805        // contains_key does not reset the idle timer for the key.
2806        assert!(cache.contains_key(&"a"));
2807        assert!(cache.contains_key(&"b"));
2808        cache.run_pending_tasks();
2809
2810        assert_eq!(cache.entry_count(), 2);
2811
2812        mock.increment(Duration::from_secs(3)); // 15 secs.
2813        expected.push((Arc::new("a"), "alice", RemovalCause::Expired));
2814
2815        assert_eq!(cache.get(&"a"), None);
2816        assert_eq!(cache.get(&"b"), Some("bob"));
2817        expiry_counters.incl_expected_reads();
2818        assert!(!cache.contains_key(&"a"));
2819        assert!(cache.contains_key(&"b"));
2820
2821        assert_eq!(cache.iter().count(), 1);
2822
2823        cache.run_pending_tasks();
2824        assert_eq!(cache.entry_count(), 1);
2825
2826        mock.increment(Duration::from_secs(10)); // 25 secs
2827        expected.push((Arc::new("b"), "bob", RemovalCause::Expired));
2828
2829        assert_eq!(cache.get(&"a"), None);
2830        assert_eq!(cache.get(&"b"), None);
2831        assert!(!cache.contains_key(&"a"));
2832        assert!(!cache.contains_key(&"b"));
2833
2834        assert_eq!(cache.iter().count(), 0);
2835
2836        cache.run_pending_tasks();
2837        assert!(cache.is_table_empty());
2838
2839        expiry_counters.verify();
2840        verify_notification_vec(&cache, actual, &expected);
2841    }
2842
2843    /// Verify that the `Expiry::expire_after_read()` method is called in `get_with`
2844    /// only when the key was already present in the cache.
2845    #[test]
2846    fn test_expiry_using_get_with() {
2847        // Define an expiry type, which always return `None`.
2848        struct NoExpiry {
2849            counters: Arc<ExpiryCallCounters>,
2850        }
2851
2852        impl NoExpiry {
2853            fn new(counters: Arc<ExpiryCallCounters>) -> Self {
2854                Self { counters }
2855            }
2856        }
2857
2858        impl Expiry<&str, &str> for NoExpiry {
2859            fn expire_after_create(
2860                &self,
2861                _key: &&str,
2862                _value: &&str,
2863                _current_time: StdInstant,
2864            ) -> Option<Duration> {
2865                self.counters.incl_actual_creations();
2866                None
2867            }
2868
2869            fn expire_after_read(
2870                &self,
2871                _key: &&str,
2872                _value: &&str,
2873                _current_time: StdInstant,
2874                _current_duration: Option<Duration>,
2875                _last_modified_at: StdInstant,
2876            ) -> Option<Duration> {
2877                self.counters.incl_actual_reads();
2878                None
2879            }
2880
2881            fn expire_after_update(
2882                &self,
2883                _key: &&str,
2884                _value: &&str,
2885                _current_time: StdInstant,
2886                _current_duration: Option<Duration>,
2887            ) -> Option<Duration> {
2888                unreachable!("The `expire_after_update()` method should not be called.");
2889            }
2890        }
2891
2892        // Create expiry counters and the expiry.
2893        let expiry_counters = Arc::new(ExpiryCallCounters::default());
2894        let expiry = NoExpiry::new(Arc::clone(&expiry_counters));
2895
2896        // Create a cache with the expiry and eviction listener.
2897        let mut cache = Cache::builder()
2898            .max_capacity(100)
2899            .expire_after(expiry)
2900            .build();
2901        cache.reconfigure_for_testing();
2902
2903        // Make the cache exterior immutable.
2904        let cache = cache;
2905
2906        // The key is not present.
2907        cache.get_with("a", || "alice");
2908        expiry_counters.incl_expected_creations();
2909        cache.run_pending_tasks();
2910
2911        // The key is present.
2912        cache.get_with("a", || "alex");
2913        expiry_counters.incl_expected_reads();
2914        cache.run_pending_tasks();
2915
2916        // The key is not present.
2917        cache.invalidate("a");
2918        cache.get_with("a", || "amanda");
2919        expiry_counters.incl_expected_creations();
2920        cache.run_pending_tasks();
2921
2922        expiry_counters.verify();
2923    }
2924
2925    // https://github.com/moka-rs/moka/issues/345
2926    #[test]
2927    fn test_race_between_updating_entry_and_processing_its_write_ops() {
2928        let (clock, mock) = Clock::mock();
2929        let cache = Cache::builder()
2930            .max_capacity(2)
2931            .time_to_idle(Duration::from_secs(1))
2932            .clock(clock)
2933            .build();
2934
2935        cache.insert("a", "alice");
2936        cache.insert("b", "bob");
2937        cache.insert("c", "cathy"); // c1
2938        mock.increment(Duration::from_secs(2));
2939
2940        // The following `insert` will do the followings:
2941        // 1. Replaces current "c" (c1) in the concurrent hash table (cht).
2942        // 2. Runs the pending tasks implicitly.
2943        //    (1) "a" will be admitted.
2944        //    (2) "b" will be admitted.
2945        //    (3) c1 will be evicted by size constraint.
2946        //    (4) "a" will be evicted due to expiration.
2947        //    (5) "b" will be evicted due to expiration.
2948        // 3. Send its `WriteOp` log to the channel.
2949        cache.insert("c", "cindy"); // c2
2950
2951        // Remove "c" (c2) from the cht.
2952        assert_eq!(cache.remove(&"c"), Some("cindy")); // c-remove
2953
2954        mock.increment(Duration::from_secs(2));
2955
2956        // The following `run_pending_tasks` will do the followings:
2957        // 1. Admits "c" (c2) to the cache. (Create a node in the LRU deque)
2958        // 2. Because of c-remove, removes c2's node from the LRU deque.
2959        cache.run_pending_tasks();
2960        assert_eq!(cache.entry_count(), 0);
2961    }
2962
2963    #[test]
2964    fn test_race_between_recreating_entry_and_processing_its_write_ops() {
2965        let cache = Cache::builder().max_capacity(2).build();
2966
2967        cache.insert('a', "a");
2968        cache.insert('b', "b");
2969        cache.run_pending_tasks();
2970
2971        cache.insert('c', "c1"); // (a) `EntryInfo` 1, gen: 1
2972        assert!(cache.remove(&'a').is_some()); // (b)
2973        assert!(cache.remove(&'b').is_some()); // (c)
2974        assert!(cache.remove(&'c').is_some()); // (d) `EntryInfo` 1, gen: 2
2975        cache.insert('c', "c2"); // (e) `EntryInfo` 2, gen: 1
2976
2977        // Now the `write_op_ch` channel contains the following `WriteOp`s:
2978        //
2979        // - 0: (a) insert "c1" (`EntryInfo` 1, gen: 1)
2980        // - 1: (b) remove "a"
2981        // - 2: (c) remove "b"
2982        // - 3: (d) remove "c1" (`EntryInfo` 1, gen: 2)
2983        // - 4: (e) insert "c2" (`EntryInfo` 2, gen: 1)
2984        //
2985        // 0 for "c1" is going to be rejected because the cache is full. Let's ensure
2986        // processing 0 must not remove "c2" from the concurrent hash table. (Their
2987        // gen are the same, but `EntryInfo`s are different)
2988        cache.run_pending_tasks();
2989        assert_eq!(cache.get(&'c'), Some("c2"));
2990    }
2991
2992    #[test]
2993    fn test_iter() {
2994        const NUM_KEYS: usize = 50;
2995
2996        fn make_value(key: usize) -> String {
2997            format!("val: {key}")
2998        }
2999
3000        let cache = Cache::builder()
3001            .max_capacity(100)
3002            .time_to_idle(Duration::from_secs(10))
3003            .build();
3004
3005        for key in 0..NUM_KEYS {
3006            cache.insert(key, make_value(key));
3007        }
3008
3009        let mut key_set = std::collections::HashSet::new();
3010
3011        for (key, value) in &cache {
3012            assert_eq!(value, make_value(*key));
3013
3014            key_set.insert(*key);
3015        }
3016
3017        // Ensure there are no missing or duplicate keys in the iteration.
3018        assert_eq!(key_set.len(), NUM_KEYS);
3019    }
3020
3021    /// Runs 16 threads at the same time and ensures no deadlock occurs.
3022    ///
3023    /// - Eight of the threads will update key-values in the cache.
3024    /// - Eight others will iterate the cache.
3025    ///
3026    #[test]
3027    fn test_iter_multi_threads() {
3028        use std::collections::HashSet;
3029
3030        const NUM_KEYS: usize = 1024;
3031        const NUM_THREADS: usize = 16;
3032
3033        fn make_value(key: usize) -> String {
3034            format!("val: {key}")
3035        }
3036
3037        let cache = Cache::builder()
3038            .max_capacity(2048)
3039            .time_to_idle(Duration::from_secs(10))
3040            .build();
3041
3042        // Initialize the cache.
3043        for key in 0..NUM_KEYS {
3044            cache.insert(key, make_value(key));
3045        }
3046
3047        let rw_lock = Arc::new(std::sync::RwLock::<()>::default());
3048        let write_lock = rw_lock.write().unwrap();
3049
3050        // https://rust-lang.github.io/rust-clippy/master/index.html#needless_collect
3051        #[allow(clippy::needless_collect)]
3052        let handles = (0..NUM_THREADS)
3053            .map(|n| {
3054                let cache = cache.clone();
3055                let rw_lock = Arc::clone(&rw_lock);
3056
3057                if n % 2 == 0 {
3058                    // This thread will update the cache.
3059                    std::thread::spawn(move || {
3060                        let read_lock = rw_lock.read().unwrap();
3061                        for key in 0..NUM_KEYS {
3062                            // TODO: Update keys in a random order?
3063                            cache.insert(key, make_value(key));
3064                        }
3065                        std::mem::drop(read_lock);
3066                    })
3067                } else {
3068                    // This thread will iterate the cache.
3069                    std::thread::spawn(move || {
3070                        let read_lock = rw_lock.read().unwrap();
3071                        let mut key_set = HashSet::new();
3072                        for (key, value) in &cache {
3073                            assert_eq!(value, make_value(*key));
3074                            key_set.insert(*key);
3075                        }
3076                        // Ensure there are no missing or duplicate keys in the iteration.
3077                        assert_eq!(key_set.len(), NUM_KEYS);
3078                        std::mem::drop(read_lock);
3079                    })
3080                }
3081            })
3082            .collect::<Vec<_>>();
3083
3084        // Let these threads to run by releasing the write lock.
3085        std::mem::drop(write_lock);
3086
3087        handles.into_iter().for_each(|h| h.join().expect("Failed"));
3088
3089        // Ensure there are no missing or duplicate keys in the iteration.
3090        let key_set = cache.iter().map(|(k, _v)| *k).collect::<HashSet<_>>();
3091        assert_eq!(key_set.len(), NUM_KEYS);
3092    }
3093
3094    #[test]
3095    fn get_with() {
3096        use std::thread::{sleep, spawn};
3097
3098        let cache = Cache::new(100);
3099        const KEY: u32 = 0;
3100
3101        // This test will run five threads:
3102        //
3103        // Thread1 will be the first thread to call `get_with` for a key, so its init
3104        // closure will be evaluated and then a &str value "thread1" will be inserted
3105        // to the cache.
3106        let thread1 = {
3107            let cache1 = cache.clone();
3108            spawn(move || {
3109                // Call `get_with` immediately.
3110                let v = cache1.get_with(KEY, || {
3111                    // Wait for 300 ms and return a &str value.
3112                    sleep(Duration::from_millis(300));
3113                    "thread1"
3114                });
3115                assert_eq!(v, "thread1");
3116            })
3117        };
3118
3119        // Thread2 will be the second thread to call `get_with` for the same key, so
3120        // its init closure will not be evaluated. Once thread1's init closure
3121        // finishes, it will get the value inserted by thread1's init closure.
3122        let thread2 = {
3123            let cache2 = cache.clone();
3124            spawn(move || {
3125                // Wait for 100 ms before calling `get_with`.
3126                sleep(Duration::from_millis(100));
3127                let v = cache2.get_with(KEY, || unreachable!());
3128                assert_eq!(v, "thread1");
3129            })
3130        };
3131
3132        // Thread3 will be the third thread to call `get_with` for the same key. By
3133        // the time it calls, thread1's init closure should have finished already and
3134        // the value should be already inserted to the cache. So its init closure
3135        // will not be evaluated and will get the value insert by thread1's init
3136        // closure immediately.
3137        let thread3 = {
3138            let cache3 = cache.clone();
3139            spawn(move || {
3140                // Wait for 400 ms before calling `get_with`.
3141                sleep(Duration::from_millis(400));
3142                let v = cache3.get_with(KEY, || unreachable!());
3143                assert_eq!(v, "thread1");
3144            })
3145        };
3146
3147        // Thread4 will call `get` for the same key. It will call when thread1's init
3148        // closure is still running, so it will get none for the key.
3149        let thread4 = {
3150            let cache4 = cache.clone();
3151            spawn(move || {
3152                // Wait for 200 ms before calling `get`.
3153                sleep(Duration::from_millis(200));
3154                let maybe_v = cache4.get(&KEY);
3155                assert!(maybe_v.is_none());
3156            })
3157        };
3158
3159        // Thread5 will call `get` for the same key. It will call after thread1's init
3160        // closure finished, so it will get the value insert by thread1's init closure.
3161        let thread5 = {
3162            let cache5 = cache.clone();
3163            spawn(move || {
3164                // Wait for 400 ms before calling `get`.
3165                sleep(Duration::from_millis(400));
3166                let maybe_v = cache5.get(&KEY);
3167                assert_eq!(maybe_v, Some("thread1"));
3168            })
3169        };
3170
3171        for t in [thread1, thread2, thread3, thread4, thread5] {
3172            t.join().expect("Failed to join");
3173        }
3174
3175        assert!(cache.is_waiter_map_empty());
3176    }
3177
3178    #[test]
3179    fn get_with_by_ref() {
3180        use std::thread::{sleep, spawn};
3181
3182        let cache = Cache::new(100);
3183        const KEY: &u32 = &0;
3184
3185        // This test will run five threads:
3186        //
3187        // Thread1 will be the first thread to call `get_with_by_ref` for a key, so
3188        // its init closure will be evaluated and then a &str value "thread1" will be
3189        // inserted to the cache.
3190        let thread1 = {
3191            let cache1 = cache.clone();
3192            spawn(move || {
3193                // Call `get_with_by_ref` immediately.
3194                let v = cache1.get_with_by_ref(KEY, || {
3195                    // Wait for 300 ms and return a &str value.
3196                    sleep(Duration::from_millis(300));
3197                    "thread1"
3198                });
3199                assert_eq!(v, "thread1");
3200            })
3201        };
3202
3203        // Thread2 will be the second thread to call `get_with_by_ref` for the same
3204        // key, so its init closure will not be evaluated. Once thread1's init
3205        // closure finishes, it will get the value inserted by thread1's init
3206        // closure.
3207        let thread2 = {
3208            let cache2 = cache.clone();
3209            spawn(move || {
3210                // Wait for 100 ms before calling `get_with_by_ref`.
3211                sleep(Duration::from_millis(100));
3212                let v = cache2.get_with_by_ref(KEY, || unreachable!());
3213                assert_eq!(v, "thread1");
3214            })
3215        };
3216
3217        // Thread3 will be the third thread to call `get_with_by_ref` for the same
3218        // key. By the time it calls, thread1's init closure should have finished
3219        // already and the value should be already inserted to the cache. So its init
3220        // closure will not be evaluated and will get the value insert by thread1's
3221        // init closure immediately.
3222        let thread3 = {
3223            let cache3 = cache.clone();
3224            spawn(move || {
3225                // Wait for 400 ms before calling `get_with_by_ref`.
3226                sleep(Duration::from_millis(400));
3227                let v = cache3.get_with_by_ref(KEY, || unreachable!());
3228                assert_eq!(v, "thread1");
3229            })
3230        };
3231
3232        // Thread4 will call `get` for the same key. It will call when thread1's init
3233        // closure is still running, so it will get none for the key.
3234        let thread4 = {
3235            let cache4 = cache.clone();
3236            spawn(move || {
3237                // Wait for 200 ms before calling `get`.
3238                sleep(Duration::from_millis(200));
3239                let maybe_v = cache4.get(KEY);
3240                assert!(maybe_v.is_none());
3241            })
3242        };
3243
3244        // Thread5 will call `get` for the same key. It will call after thread1's init
3245        // closure finished, so it will get the value insert by thread1's init closure.
3246        let thread5 = {
3247            let cache5 = cache.clone();
3248            spawn(move || {
3249                // Wait for 400 ms before calling `get`.
3250                sleep(Duration::from_millis(400));
3251                let maybe_v = cache5.get(KEY);
3252                assert_eq!(maybe_v, Some("thread1"));
3253            })
3254        };
3255
3256        for t in [thread1, thread2, thread3, thread4, thread5] {
3257            t.join().expect("Failed to join");
3258        }
3259
3260        assert!(cache.is_waiter_map_empty());
3261    }
3262
3263    #[test]
3264    fn entry_or_insert_with_if() {
3265        use std::thread::{sleep, spawn};
3266
3267        let cache = Cache::new(100);
3268        const KEY: u32 = 0;
3269
3270        // This test will run seven threads:
3271        //
3272        // Thread1 will be the first thread to call `or_insert_with_if` for a key, so
3273        // its init closure will be evaluated and then a &str value "thread1" will be
3274        // inserted to the cache.
3275        let thread1 = {
3276            let cache1 = cache.clone();
3277            spawn(move || {
3278                // Call `get_with` immediately.
3279                let entry = cache1.entry(KEY).or_insert_with_if(
3280                    || {
3281                        // Wait for 300 ms and return a &str value.
3282                        sleep(Duration::from_millis(300));
3283                        "thread1"
3284                    },
3285                    |_v| unreachable!(),
3286                );
3287                // Entry should be fresh because our async block should have been
3288                // evaluated.
3289                assert!(entry.is_fresh());
3290                assert_eq!(entry.into_value(), "thread1");
3291            })
3292        };
3293
3294        // Thread2 will be the second thread to call `or_insert_with_if` for the same
3295        // key, so its init closure will not be evaluated. Once thread1's init
3296        // closure finishes, it will get the value inserted by thread1's init
3297        // closure.
3298        let thread2 = {
3299            let cache2 = cache.clone();
3300            spawn(move || {
3301                // Wait for 100 ms before calling `get_with`.
3302                sleep(Duration::from_millis(100));
3303                let entry = cache2
3304                    .entry(KEY)
3305                    .or_insert_with_if(|| unreachable!(), |_v| unreachable!());
3306                // Entry should not be fresh because thread1's async block should have
3307                // been evaluated instead of ours.
3308                assert!(!entry.is_fresh());
3309                assert_eq!(entry.into_value(), "thread1");
3310            })
3311        };
3312
3313        // Thread3 will be the third thread to call `or_insert_with_if` for the same
3314        // key. By the time it calls, thread1's init closure should have finished
3315        // already and the value should be already inserted to the cache. Also
3316        // thread3's `replace_if` closure returns `false`. So its init closure will
3317        // not be evaluated and will get the value inserted by thread1's init closure
3318        // immediately.
3319        let thread3 = {
3320            let cache3 = cache.clone();
3321            spawn(move || {
3322                // Wait for 350 ms before calling `or_insert_with_if`.
3323                sleep(Duration::from_millis(350));
3324                let entry = cache3.entry(KEY).or_insert_with_if(
3325                    || unreachable!(),
3326                    |v| {
3327                        assert_eq!(v, &"thread1");
3328                        false
3329                    },
3330                );
3331                assert!(!entry.is_fresh());
3332                assert_eq!(entry.into_value(), "thread1");
3333            })
3334        };
3335
3336        // Thread4 will be the fourth thread to call `or_insert_with_if` for the same
3337        // key. The value should have been already inserted to the cache by thread1.
3338        // However thread4's `replace_if` closure returns `true`. So its init closure
3339        // will be evaluated to replace the current value.
3340        let thread4 = {
3341            let cache4 = cache.clone();
3342            spawn(move || {
3343                // Wait for 400 ms before calling `or_insert_with_if`.
3344                sleep(Duration::from_millis(400));
3345                let entry = cache4.entry(KEY).or_insert_with_if(
3346                    || "thread4",
3347                    |v| {
3348                        assert_eq!(v, &"thread1");
3349                        true
3350                    },
3351                );
3352                assert!(entry.is_fresh());
3353                assert_eq!(entry.into_value(), "thread4");
3354            })
3355        };
3356
3357        // Thread5 will call `get` for the same key. It will call when thread1's init
3358        // closure is still running, so it will get none for the key.
3359        let thread5 = {
3360            let cache5 = cache.clone();
3361            spawn(move || {
3362                // Wait for 200 ms before calling `get`.
3363                sleep(Duration::from_millis(200));
3364                let maybe_v = cache5.get(&KEY);
3365                assert!(maybe_v.is_none());
3366            })
3367        };
3368
3369        // Thread6 will call `get` for the same key. It will call when thread1's init
3370        // closure is still running, so it will get none for the key.
3371        let thread6 = {
3372            let cache6 = cache.clone();
3373            spawn(move || {
3374                // Wait for 350 ms before calling `get`.
3375                sleep(Duration::from_millis(350));
3376                let maybe_v = cache6.get(&KEY);
3377                assert_eq!(maybe_v, Some("thread1"));
3378            })
3379        };
3380
3381        // Thread7 will call `get` for the same key. It will call after thread1's init
3382        // closure finished, so it will get the value insert by thread1's init closure.
3383        let thread7 = {
3384            let cache7 = cache.clone();
3385            spawn(move || {
3386                // Wait for 450 ms before calling `get`.
3387                sleep(Duration::from_millis(450));
3388                let maybe_v = cache7.get(&KEY);
3389                assert_eq!(maybe_v, Some("thread4"));
3390            })
3391        };
3392
3393        for t in [
3394            thread1, thread2, thread3, thread4, thread5, thread6, thread7,
3395        ] {
3396            t.join().expect("Failed to join");
3397        }
3398
3399        assert!(cache.is_waiter_map_empty());
3400    }
3401
3402    #[test]
3403    fn entry_by_ref_or_insert_with_if() {
3404        use std::thread::{sleep, spawn};
3405
3406        let cache: Cache<u32, &str> = Cache::new(100);
3407        const KEY: &u32 = &0;
3408
3409        // This test will run seven threads:
3410        //
3411        // Thread1 will be the first thread to call `or_insert_with_if` for a key, so
3412        // its init closure will be evaluated and then a &str value "thread1" will be
3413        // inserted to the cache.
3414        let thread1 = {
3415            let cache1 = cache.clone();
3416            spawn(move || {
3417                // Call `get_with` immediately.
3418                let v = cache1
3419                    .entry_by_ref(KEY)
3420                    .or_insert_with_if(
3421                        || {
3422                            // Wait for 300 ms and return a &str value.
3423                            sleep(Duration::from_millis(300));
3424                            "thread1"
3425                        },
3426                        |_v| unreachable!(),
3427                    )
3428                    .into_value();
3429                assert_eq!(v, "thread1");
3430            })
3431        };
3432
3433        // Thread2 will be the second thread to call `or_insert_with_if` for the same
3434        // key, so its init closure will not be evaluated. Once thread1's init
3435        // closure finishes, it will get the value inserted by thread1's init
3436        // closure.
3437        let thread2 = {
3438            let cache2 = cache.clone();
3439            spawn(move || {
3440                // Wait for 100 ms before calling `get_with`.
3441                sleep(Duration::from_millis(100));
3442                let v = cache2
3443                    .entry_by_ref(KEY)
3444                    .or_insert_with_if(|| unreachable!(), |_v| unreachable!())
3445                    .into_value();
3446                assert_eq!(v, "thread1");
3447            })
3448        };
3449
3450        // Thread3 will be the third thread to call `or_insert_with_if` for the same
3451        // key. By the time it calls, thread1's init closure should have finished
3452        // already and the value should be already inserted to the cache. Also
3453        // thread3's `replace_if` closure returns `false`. So its init closure will
3454        // not be evaluated and will get the value inserted by thread1's init closure
3455        // immediately.
3456        let thread3 = {
3457            let cache3 = cache.clone();
3458            spawn(move || {
3459                // Wait for 350 ms before calling `or_insert_with_if`.
3460                sleep(Duration::from_millis(350));
3461                let v = cache3
3462                    .entry_by_ref(KEY)
3463                    .or_insert_with_if(
3464                        || unreachable!(),
3465                        |v| {
3466                            assert_eq!(v, &"thread1");
3467                            false
3468                        },
3469                    )
3470                    .into_value();
3471                assert_eq!(v, "thread1");
3472            })
3473        };
3474
3475        // Thread4 will be the fourth thread to call `or_insert_with_if` for the same
3476        // key. The value should have been already inserted to the cache by
3477        // thread1. However thread4's `replace_if` closure returns `true`. So its
3478        // init closure will be evaluated to replace the current value.
3479        let thread4 = {
3480            let cache4 = cache.clone();
3481            spawn(move || {
3482                // Wait for 400 ms before calling `or_insert_with_if`.
3483                sleep(Duration::from_millis(400));
3484                let v = cache4
3485                    .entry_by_ref(KEY)
3486                    .or_insert_with_if(
3487                        || "thread4",
3488                        |v| {
3489                            assert_eq!(v, &"thread1");
3490                            true
3491                        },
3492                    )
3493                    .into_value();
3494                assert_eq!(v, "thread4");
3495            })
3496        };
3497
3498        // Thread5 will call `get` for the same key. It will call when thread1's init
3499        // closure is still running, so it will get none for the key.
3500        let thread5 = {
3501            let cache5 = cache.clone();
3502            spawn(move || {
3503                // Wait for 200 ms before calling `get`.
3504                sleep(Duration::from_millis(200));
3505                let maybe_v = cache5.get(KEY);
3506                assert!(maybe_v.is_none());
3507            })
3508        };
3509
3510        // Thread6 will call `get` for the same key. It will call when thread1's init
3511        // closure is still running, so it will get none for the key.
3512        let thread6 = {
3513            let cache6 = cache.clone();
3514            spawn(move || {
3515                // Wait for 350 ms before calling `get`.
3516                sleep(Duration::from_millis(350));
3517                let maybe_v = cache6.get(KEY);
3518                assert_eq!(maybe_v, Some("thread1"));
3519            })
3520        };
3521
3522        // Thread7 will call `get` for the same key. It will call after thread1's init
3523        // closure finished, so it will get the value insert by thread1's init closure.
3524        let thread7 = {
3525            let cache7 = cache.clone();
3526            spawn(move || {
3527                // Wait for 450 ms before calling `get`.
3528                sleep(Duration::from_millis(450));
3529                let maybe_v = cache7.get(KEY);
3530                assert_eq!(maybe_v, Some("thread4"));
3531            })
3532        };
3533
3534        for t in [
3535            thread1, thread2, thread3, thread4, thread5, thread6, thread7,
3536        ] {
3537            t.join().expect("Failed to join");
3538        }
3539
3540        assert!(cache.is_waiter_map_empty());
3541    }
3542
3543    #[test]
3544    fn try_get_with() {
3545        use std::{
3546            sync::Arc,
3547            thread::{sleep, spawn},
3548        };
3549
3550        // Note that MyError does not implement std::error::Error trait like
3551        // anyhow::Error.
3552        #[derive(Debug)]
3553        pub struct MyError(#[allow(dead_code)] String);
3554
3555        type MyResult<T> = Result<T, Arc<MyError>>;
3556
3557        let cache = Cache::new(100);
3558        const KEY: u32 = 0;
3559
3560        // This test will run eight threads:
3561        //
3562        // Thread1 will be the first thread to call `try_get_with` for a key, so its
3563        // init closure will be evaluated and then an error will be returned. Nothing
3564        // will be inserted to the cache.
3565        let thread1 = {
3566            let cache1 = cache.clone();
3567            spawn(move || {
3568                // Call `try_get_with` immediately.
3569                let v = cache1.try_get_with(KEY, || {
3570                    // Wait for 300 ms and return an error.
3571                    sleep(Duration::from_millis(300));
3572                    Err(MyError("thread1 error".into()))
3573                });
3574                assert!(v.is_err());
3575            })
3576        };
3577
3578        // Thread2 will be the second thread to call `try_get_with` for the same key,
3579        // so its init closure will not be evaluated. Once thread1's init closure
3580        // finishes, it will get the same error value returned by thread1's init
3581        // closure.
3582        let thread2 = {
3583            let cache2 = cache.clone();
3584            spawn(move || {
3585                // Wait for 100 ms before calling `try_get_with`.
3586                sleep(Duration::from_millis(100));
3587                let v: MyResult<_> = cache2.try_get_with(KEY, || unreachable!());
3588                assert!(v.is_err());
3589            })
3590        };
3591
3592        // Thread3 will be the third thread to call `get_with` for the same key. By
3593        // the time it calls, thread1's init closure should have finished already,
3594        // but the key still does not exist in the cache. So its init closure will be
3595        // evaluated and then an okay &str value will be returned. That value will be
3596        // inserted to the cache.
3597        let thread3 = {
3598            let cache3 = cache.clone();
3599            spawn(move || {
3600                // Wait for 400 ms before calling `try_get_with`.
3601                sleep(Duration::from_millis(400));
3602                let v: MyResult<_> = cache3.try_get_with(KEY, || {
3603                    // Wait for 300 ms and return an Ok(&str) value.
3604                    sleep(Duration::from_millis(300));
3605                    Ok("thread3")
3606                });
3607                assert_eq!(v.unwrap(), "thread3");
3608            })
3609        };
3610
3611        // thread4 will be the fourth thread to call `try_get_with` for the same
3612        // key. So its init closure will not be evaluated. Once thread3's init
3613        // closure finishes, it will get the same okay &str value.
3614        let thread4 = {
3615            let cache4 = cache.clone();
3616            spawn(move || {
3617                // Wait for 500 ms before calling `try_get_with`.
3618                sleep(Duration::from_millis(500));
3619                let v: MyResult<_> = cache4.try_get_with(KEY, || unreachable!());
3620                assert_eq!(v.unwrap(), "thread3");
3621            })
3622        };
3623
3624        // Thread5 will be the fifth thread to call `try_get_with` for the same
3625        // key. So its init closure will not be evaluated. By the time it calls,
3626        // thread3's init closure should have finished already, so its init closure
3627        // will not be evaluated and will get the value insert by thread3's init
3628        // closure immediately.
3629        let thread5 = {
3630            let cache5 = cache.clone();
3631            spawn(move || {
3632                // Wait for 800 ms before calling `try_get_with`.
3633                sleep(Duration::from_millis(800));
3634                let v: MyResult<_> = cache5.try_get_with(KEY, || unreachable!());
3635                assert_eq!(v.unwrap(), "thread3");
3636            })
3637        };
3638
3639        // Thread6 will call `get` for the same key. It will call when thread1's init
3640        // closure is still running, so it will get none for the key.
3641        let thread6 = {
3642            let cache6 = cache.clone();
3643            spawn(move || {
3644                // Wait for 200 ms before calling `get`.
3645                sleep(Duration::from_millis(200));
3646                let maybe_v = cache6.get(&KEY);
3647                assert!(maybe_v.is_none());
3648            })
3649        };
3650
3651        // Thread7 will call `get` for the same key. It will call after thread1's init
3652        // closure finished with an error. So it will get none for the key.
3653        let thread7 = {
3654            let cache7 = cache.clone();
3655            spawn(move || {
3656                // Wait for 400 ms before calling `get`.
3657                sleep(Duration::from_millis(400));
3658                let maybe_v = cache7.get(&KEY);
3659                assert!(maybe_v.is_none());
3660            })
3661        };
3662
3663        // Thread8 will call `get` for the same key. It will call after thread3's init
3664        // closure finished, so it will get the value insert by thread3's init closure.
3665        let thread8 = {
3666            let cache8 = cache.clone();
3667            spawn(move || {
3668                // Wait for 800 ms before calling `get`.
3669                sleep(Duration::from_millis(800));
3670                let maybe_v = cache8.get(&KEY);
3671                assert_eq!(maybe_v, Some("thread3"));
3672            })
3673        };
3674
3675        for t in [
3676            thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8,
3677        ] {
3678            t.join().expect("Failed to join");
3679        }
3680
3681        assert!(cache.is_waiter_map_empty());
3682    }
3683
3684    #[test]
3685    fn try_get_with_by_ref() {
3686        use std::{
3687            sync::Arc,
3688            thread::{sleep, spawn},
3689        };
3690
3691        // Note that MyError does not implement std::error::Error trait like
3692        // anyhow::Error.
3693        #[derive(Debug)]
3694        pub struct MyError(#[allow(dead_code)] String);
3695
3696        type MyResult<T> = Result<T, Arc<MyError>>;
3697
3698        let cache = Cache::new(100);
3699        const KEY: &u32 = &0;
3700
3701        // This test will run eight threads:
3702        //
3703        // Thread1 will be the first thread to call `try_get_with_by_ref` for a key,
3704        // so its init closure will be evaluated and then an error will be returned.
3705        // Nothing will be inserted to the cache.
3706        let thread1 = {
3707            let cache1 = cache.clone();
3708            spawn(move || {
3709                // Call `try_get_with_by_ref` immediately.
3710                let v = cache1.try_get_with_by_ref(KEY, || {
3711                    // Wait for 300 ms and return an error.
3712                    sleep(Duration::from_millis(300));
3713                    Err(MyError("thread1 error".into()))
3714                });
3715                assert!(v.is_err());
3716            })
3717        };
3718
3719        // Thread2 will be the second thread to call `try_get_with_by_ref` for the
3720        // same key, so its init closure will not be evaluated. Once thread1's init
3721        // closure finishes, it will get the same error value returned by thread1's
3722        // init closure.
3723        let thread2 = {
3724            let cache2 = cache.clone();
3725            spawn(move || {
3726                // Wait for 100 ms before calling `try_get_with_by_ref`.
3727                sleep(Duration::from_millis(100));
3728                let v: MyResult<_> = cache2.try_get_with_by_ref(KEY, || unreachable!());
3729                assert!(v.is_err());
3730            })
3731        };
3732
3733        // Thread3 will be the third thread to call `get_with` for the same key. By
3734        // the time it calls, thread1's init closure should have finished already,
3735        // but the key still does not exist in the cache. So its init closure will be
3736        // evaluated and then an okay &str value will be returned. That value will be
3737        // inserted to the cache.
3738        let thread3 = {
3739            let cache3 = cache.clone();
3740            spawn(move || {
3741                // Wait for 400 ms before calling `try_get_with_by_ref`.
3742                sleep(Duration::from_millis(400));
3743                let v: MyResult<_> = cache3.try_get_with_by_ref(KEY, || {
3744                    // Wait for 300 ms and return an Ok(&str) value.
3745                    sleep(Duration::from_millis(300));
3746                    Ok("thread3")
3747                });
3748                assert_eq!(v.unwrap(), "thread3");
3749            })
3750        };
3751
3752        // thread4 will be the fourth thread to call `try_get_with_by_ref` for the
3753        // same key. So its init closure will not be evaluated. Once thread3's init
3754        // closure finishes, it will get the same okay &str value.
3755        let thread4 = {
3756            let cache4 = cache.clone();
3757            spawn(move || {
3758                // Wait for 500 ms before calling `try_get_with_by_ref`.
3759                sleep(Duration::from_millis(500));
3760                let v: MyResult<_> = cache4.try_get_with_by_ref(KEY, || unreachable!());
3761                assert_eq!(v.unwrap(), "thread3");
3762            })
3763        };
3764
3765        // Thread5 will be the fifth thread to call `try_get_with_by_ref` for the
3766        // same key. So its init closure will not be evaluated. By the time it calls,
3767        // thread3's init closure should have finished already, so its init closure
3768        // will not be evaluated and will get the value insert by thread3's init
3769        // closure immediately.
3770        let thread5 = {
3771            let cache5 = cache.clone();
3772            spawn(move || {
3773                // Wait for 800 ms before calling `try_get_with_by_ref`.
3774                sleep(Duration::from_millis(800));
3775                let v: MyResult<_> = cache5.try_get_with_by_ref(KEY, || unreachable!());
3776                assert_eq!(v.unwrap(), "thread3");
3777            })
3778        };
3779
3780        // Thread6 will call `get` for the same key. It will call when thread1's init
3781        // closure is still running, so it will get none for the key.
3782        let thread6 = {
3783            let cache6 = cache.clone();
3784            spawn(move || {
3785                // Wait for 200 ms before calling `get`.
3786                sleep(Duration::from_millis(200));
3787                let maybe_v = cache6.get(KEY);
3788                assert!(maybe_v.is_none());
3789            })
3790        };
3791
3792        // Thread7 will call `get` for the same key. It will call after thread1's init
3793        // closure finished with an error. So it will get none for the key.
3794        let thread7 = {
3795            let cache7 = cache.clone();
3796            spawn(move || {
3797                // Wait for 400 ms before calling `get`.
3798                sleep(Duration::from_millis(400));
3799                let maybe_v = cache7.get(KEY);
3800                assert!(maybe_v.is_none());
3801            })
3802        };
3803
3804        // Thread8 will call `get` for the same key. It will call after thread3's init
3805        // closure finished, so it will get the value insert by thread3's init closure.
3806        let thread8 = {
3807            let cache8 = cache.clone();
3808            spawn(move || {
3809                // Wait for 800 ms before calling `get`.
3810                sleep(Duration::from_millis(800));
3811                let maybe_v = cache8.get(KEY);
3812                assert_eq!(maybe_v, Some("thread3"));
3813            })
3814        };
3815
3816        for t in [
3817            thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8,
3818        ] {
3819            t.join().expect("Failed to join");
3820        }
3821
3822        assert!(cache.is_waiter_map_empty());
3823    }
3824
3825    #[test]
3826    fn optionally_get_with() {
3827        use std::thread::{sleep, spawn};
3828
3829        let cache = Cache::new(100);
3830        const KEY: u32 = 0;
3831
3832        // This test will run eight threads:
3833        //
3834        // Thread1 will be the first thread to call `optionally_get_with` for a key,
3835        // so its init closure will be evaluated and then an error will be returned.
3836        // Nothing will be inserted to the cache.
3837        let thread1 = {
3838            let cache1 = cache.clone();
3839            spawn(move || {
3840                // Call `optionally_get_with` immediately.
3841                let v = cache1.optionally_get_with(KEY, || {
3842                    // Wait for 300 ms and return an error.
3843                    sleep(Duration::from_millis(300));
3844                    None
3845                });
3846                assert!(v.is_none());
3847            })
3848        };
3849
3850        // Thread2 will be the second thread to call `optionally_get_with` for the
3851        // same key, so its init closure will not be evaluated. Once thread1's init
3852        // closure finishes, it will get the same error value returned by thread1's
3853        // init closure.
3854        let thread2 = {
3855            let cache2 = cache.clone();
3856            spawn(move || {
3857                // Wait for 100 ms before calling `optionally_get_with`.
3858                sleep(Duration::from_millis(100));
3859                let v = cache2.optionally_get_with(KEY, || unreachable!());
3860                assert!(v.is_none());
3861            })
3862        };
3863
3864        // Thread3 will be the third thread to call `get_with` for the same key. By
3865        // the time it calls, thread1's init closure should have finished already,
3866        // but the key still does not exist in the cache. So its init closure will be
3867        // evaluated and then an okay &str value will be returned. That value will be
3868        // inserted to the cache.
3869        let thread3 = {
3870            let cache3 = cache.clone();
3871            spawn(move || {
3872                // Wait for 400 ms before calling `optionally_get_with`.
3873                sleep(Duration::from_millis(400));
3874                let v = cache3.optionally_get_with(KEY, || {
3875                    // Wait for 300 ms and return an Ok(&str) value.
3876                    sleep(Duration::from_millis(300));
3877                    Some("thread3")
3878                });
3879                assert_eq!(v.unwrap(), "thread3");
3880            })
3881        };
3882
3883        // thread4 will be the fourth thread to call `optionally_get_with` for the
3884        // same key. So its init closure will not be evaluated. Once thread3's init
3885        // closure finishes, it will get the same okay &str value.
3886        let thread4 = {
3887            let cache4 = cache.clone();
3888            spawn(move || {
3889                // Wait for 500 ms before calling `optionally_get_with`.
3890                sleep(Duration::from_millis(500));
3891                let v = cache4.optionally_get_with(KEY, || unreachable!());
3892                assert_eq!(v.unwrap(), "thread3");
3893            })
3894        };
3895
3896        // Thread5 will be the fifth thread to call `optionally_get_with` for the
3897        // same key. So its init closure will not be evaluated. By the time it calls,
3898        // thread3's init closure should have finished already, so its init closure
3899        // will not be evaluated and will get the value insert by thread3's init
3900        // closure immediately.
3901        let thread5 = {
3902            let cache5 = cache.clone();
3903            spawn(move || {
3904                // Wait for 800 ms before calling `optionally_get_with`.
3905                sleep(Duration::from_millis(800));
3906                let v = cache5.optionally_get_with(KEY, || unreachable!());
3907                assert_eq!(v.unwrap(), "thread3");
3908            })
3909        };
3910
3911        // Thread6 will call `get` for the same key. It will call when thread1's init
3912        // closure is still running, so it will get none for the key.
3913        let thread6 = {
3914            let cache6 = cache.clone();
3915            spawn(move || {
3916                // Wait for 200 ms before calling `get`.
3917                sleep(Duration::from_millis(200));
3918                let maybe_v = cache6.get(&KEY);
3919                assert!(maybe_v.is_none());
3920            })
3921        };
3922
3923        // Thread7 will call `get` for the same key. It will call after thread1's init
3924        // closure finished with an error. So it will get none for the key.
3925        let thread7 = {
3926            let cache7 = cache.clone();
3927            spawn(move || {
3928                // Wait for 400 ms before calling `get`.
3929                sleep(Duration::from_millis(400));
3930                let maybe_v = cache7.get(&KEY);
3931                assert!(maybe_v.is_none());
3932            })
3933        };
3934
3935        // Thread8 will call `get` for the same key. It will call after thread3's init
3936        // closure finished, so it will get the value insert by thread3's init closure.
3937        let thread8 = {
3938            let cache8 = cache.clone();
3939            spawn(move || {
3940                // Wait for 800 ms before calling `get`.
3941                sleep(Duration::from_millis(800));
3942                let maybe_v = cache8.get(&KEY);
3943                assert_eq!(maybe_v, Some("thread3"));
3944            })
3945        };
3946
3947        for t in [
3948            thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8,
3949        ] {
3950            t.join().expect("Failed to join");
3951        }
3952
3953        assert!(cache.is_waiter_map_empty());
3954    }
3955
3956    #[test]
3957    fn optionally_get_with_by_ref() {
3958        use std::thread::{sleep, spawn};
3959
3960        let cache = Cache::new(100);
3961        const KEY: &u32 = &0;
3962
3963        // This test will run eight threads:
3964        //
3965        // Thread1 will be the first thread to call `optionally_get_with_by_ref` for
3966        // a key, so its init closure will be evaluated and then an error will be
3967        // returned. Nothing will be inserted to the cache.
3968        let thread1 = {
3969            let cache1 = cache.clone();
3970            spawn(move || {
3971                // Call `optionally_get_with_by_ref` immediately.
3972                let v = cache1.optionally_get_with_by_ref(KEY, || {
3973                    // Wait for 300 ms and return an error.
3974                    sleep(Duration::from_millis(300));
3975                    None
3976                });
3977                assert!(v.is_none());
3978            })
3979        };
3980
3981        // Thread2 will be the second thread to call `optionally_get_with_by_ref` for
3982        // the same key, so its init closure will not be evaluated. Once thread1's
3983        // init closure finishes, it will get the same error value returned by
3984        // thread1's init closure.
3985        let thread2 = {
3986            let cache2 = cache.clone();
3987            spawn(move || {
3988                // Wait for 100 ms before calling `optionally_get_with_by_ref`.
3989                sleep(Duration::from_millis(100));
3990                let v = cache2.optionally_get_with_by_ref(KEY, || unreachable!());
3991                assert!(v.is_none());
3992            })
3993        };
3994
3995        // Thread3 will be the third thread to call `get_with` for the same key. By
3996        // the time it calls, thread1's init closure should have finished already,
3997        // but the key still does not exist in the cache. So its init closure will be
3998        // evaluated and then an okay &str value will be returned. That value will be
3999        // inserted to the cache.
4000        let thread3 = {
4001            let cache3 = cache.clone();
4002            spawn(move || {
4003                // Wait for 400 ms before calling `optionally_get_with_by_ref`.
4004                sleep(Duration::from_millis(400));
4005                let v = cache3.optionally_get_with_by_ref(KEY, || {
4006                    // Wait for 300 ms and return an Ok(&str) value.
4007                    sleep(Duration::from_millis(300));
4008                    Some("thread3")
4009                });
4010                assert_eq!(v.unwrap(), "thread3");
4011            })
4012        };
4013
4014        // thread4 will be the fourth thread to call `optionally_get_with_by_ref` for
4015        // the same key. So its init closure will not be evaluated. Once thread3's
4016        // init closure finishes, it will get the same okay &str value.
4017        let thread4 = {
4018            let cache4 = cache.clone();
4019            spawn(move || {
4020                // Wait for 500 ms before calling `optionally_get_with_by_ref`.
4021                sleep(Duration::from_millis(500));
4022                let v = cache4.optionally_get_with_by_ref(KEY, || unreachable!());
4023                assert_eq!(v.unwrap(), "thread3");
4024            })
4025        };
4026
4027        // Thread5 will be the fifth thread to call `optionally_get_with_by_ref` for
4028        // the same key. So its init closure will not be evaluated. By the time it
4029        // calls, thread3's init closure should have finished already, so its init
4030        // closure will not be evaluated and will get the value insert by thread3's
4031        // init closure immediately.
4032        let thread5 = {
4033            let cache5 = cache.clone();
4034            spawn(move || {
4035                // Wait for 800 ms before calling `optionally_get_with_by_ref`.
4036                sleep(Duration::from_millis(800));
4037                let v = cache5.optionally_get_with_by_ref(KEY, || unreachable!());
4038                assert_eq!(v.unwrap(), "thread3");
4039            })
4040        };
4041
4042        // Thread6 will call `get` for the same key. It will call when thread1's init
4043        // closure is still running, so it will get none for the key.
4044        let thread6 = {
4045            let cache6 = cache.clone();
4046            spawn(move || {
4047                // Wait for 200 ms before calling `get`.
4048                sleep(Duration::from_millis(200));
4049                let maybe_v = cache6.get(KEY);
4050                assert!(maybe_v.is_none());
4051            })
4052        };
4053
4054        // Thread7 will call `get` for the same key. It will call after thread1's init
4055        // closure finished with an error. So it will get none for the key.
4056        let thread7 = {
4057            let cache7 = cache.clone();
4058            spawn(move || {
4059                // Wait for 400 ms before calling `get`.
4060                sleep(Duration::from_millis(400));
4061                let maybe_v = cache7.get(KEY);
4062                assert!(maybe_v.is_none());
4063            })
4064        };
4065
4066        // Thread8 will call `get` for the same key. It will call after thread3's init
4067        // closure finished, so it will get the value insert by thread3's init closure.
4068        let thread8 = {
4069            let cache8 = cache.clone();
4070            spawn(move || {
4071                // Wait for 800 ms before calling `get`.
4072                sleep(Duration::from_millis(800));
4073                let maybe_v = cache8.get(KEY);
4074                assert_eq!(maybe_v, Some("thread3"));
4075            })
4076        };
4077
4078        for t in [
4079            thread1, thread2, thread3, thread4, thread5, thread6, thread7, thread8,
4080        ] {
4081            t.join().expect("Failed to join");
4082        }
4083
4084        assert!(cache.is_waiter_map_empty());
4085    }
4086
4087    #[test]
4088    fn upsert_with() {
4089        use std::thread::{sleep, spawn};
4090
4091        let cache = Cache::new(100);
4092        const KEY: u32 = 0;
4093
4094        // Spawn three threads to call `and_upsert_with` for the same key and each
4095        // task increments the current value by 1. Ensure the key-level lock is
4096        // working by verifying the value is 3 after all threads finish.
4097        //
4098        // |        | thread 1 | thread 2 | thread 3 |
4099        // |--------|----------|----------|----------|
4100        // |   0 ms | get none |          |          |
4101        // | 100 ms |          | blocked  |          |
4102        // | 200 ms | insert 1 |          |          |
4103        // |        |          | get 1    |          |
4104        // | 300 ms |          |          | blocked  |
4105        // | 400 ms |          | insert 2 |          |
4106        // |        |          |          | get 2    |
4107        // | 500 ms |          |          | insert 3 |
4108
4109        let thread1 = {
4110            let cache1 = cache.clone();
4111            spawn(move || {
4112                cache1.entry(KEY).and_upsert_with(|maybe_entry| {
4113                    sleep(Duration::from_millis(200));
4114                    assert!(maybe_entry.is_none());
4115                    1
4116                })
4117            })
4118        };
4119
4120        let thread2 = {
4121            let cache2 = cache.clone();
4122            spawn(move || {
4123                sleep(Duration::from_millis(100));
4124                cache2.entry_by_ref(&KEY).and_upsert_with(|maybe_entry| {
4125                    sleep(Duration::from_millis(200));
4126                    let entry = maybe_entry.expect("The entry should exist");
4127                    entry.into_value() + 1
4128                })
4129            })
4130        };
4131
4132        let thread3 = {
4133            let cache3 = cache.clone();
4134            spawn(move || {
4135                sleep(Duration::from_millis(300));
4136                cache3.entry_by_ref(&KEY).and_upsert_with(|maybe_entry| {
4137                    sleep(Duration::from_millis(100));
4138                    let entry = maybe_entry.expect("The entry should exist");
4139                    entry.into_value() + 1
4140                })
4141            })
4142        };
4143
4144        let ent1 = thread1.join().expect("Thread 1 should finish");
4145        let ent2 = thread2.join().expect("Thread 2 should finish");
4146        let ent3 = thread3.join().expect("Thread 3 should finish");
4147        assert_eq!(ent1.into_value(), 1);
4148        assert_eq!(ent2.into_value(), 2);
4149        assert_eq!(ent3.into_value(), 3);
4150
4151        assert_eq!(cache.get(&KEY), Some(3));
4152
4153        assert!(cache.is_waiter_map_empty());
4154    }
4155
4156    #[test]
4157    fn compute_with() {
4158        use crate::ops::compute;
4159        use std::{
4160            sync::RwLock,
4161            thread::{sleep, spawn},
4162        };
4163
4164        let cache = Cache::new(100);
4165        const KEY: u32 = 0;
4166
4167        // Spawn six threads to call `and_compute_with` for the same key. Ensure the
4168        // key-level lock is working by verifying the value after all threads finish.
4169        //
4170        // |         |  thread 1  |   thread 2    |  thread 3  | thread 4 |  thread 5  | thread 6 |
4171        // |---------|------------|---------------|------------|----------|------------|----------|
4172        // |    0 ms | get none   |               |            |          |            |          |
4173        // |  100 ms |            | blocked       |            |          |            |          |
4174        // |  200 ms | insert [1] |               |            |          |            |          |
4175        // |         |            | get [1]       |            |          |            |          |
4176        // |  300 ms |            |               | blocked    |          |            |          |
4177        // |  400 ms |            | insert [1, 2] |            |          |            |          |
4178        // |         |            |               | get [1, 2] |          |            |          |
4179        // |  500 ms |            |               |            | blocked  |            |          |
4180        // |  600 ms |            |               | remove     |          |            |          |
4181        // |         |            |               |            | get none |            |          |
4182        // |  700 ms |            |               |            |          | blocked    |          |
4183        // |  800 ms |            |               |            | nop      |            |          |
4184        // |         |            |               |            |          | get none   |          |
4185        // |  900 ms |            |               |            |          |            | blocked  |
4186        // | 1000 ms |            |               |            |          | insert [5] |          |
4187        // |         |            |               |            |          |            | get [5]  |
4188        // | 1100 ms |            |               |            |          |            | nop      |
4189
4190        let thread1 = {
4191            let cache1 = cache.clone();
4192            spawn(move || {
4193                cache1.entry(KEY).and_compute_with(|maybe_entry| {
4194                    sleep(Duration::from_millis(200));
4195                    assert!(maybe_entry.is_none());
4196                    compute::Op::Put(Arc::new(RwLock::new(vec![1])))
4197                })
4198            })
4199        };
4200
4201        let thread2 = {
4202            let cache2 = cache.clone();
4203            spawn(move || {
4204                sleep(Duration::from_millis(100));
4205                cache2.entry_by_ref(&KEY).and_compute_with(|maybe_entry| {
4206                    let entry = maybe_entry.expect("The entry should exist");
4207                    let value = entry.into_value();
4208                    assert_eq!(*value.read().unwrap(), vec![1]);
4209                    sleep(Duration::from_millis(200));
4210                    value.write().unwrap().push(2);
4211                    compute::Op::Put(value)
4212                })
4213            })
4214        };
4215
4216        let thread3 = {
4217            let cache3 = cache.clone();
4218            spawn(move || {
4219                sleep(Duration::from_millis(300));
4220                cache3.entry(KEY).and_compute_with(|maybe_entry| {
4221                    let entry = maybe_entry.expect("The entry should exist");
4222                    let value = entry.into_value();
4223                    assert_eq!(*value.read().unwrap(), vec![1, 2]);
4224                    sleep(Duration::from_millis(200));
4225                    compute::Op::Remove
4226                })
4227            })
4228        };
4229
4230        let thread4 = {
4231            let cache4 = cache.clone();
4232            spawn(move || {
4233                sleep(Duration::from_millis(500));
4234                cache4.entry(KEY).and_compute_with(|maybe_entry| {
4235                    assert!(maybe_entry.is_none());
4236                    sleep(Duration::from_millis(200));
4237                    compute::Op::Nop
4238                })
4239            })
4240        };
4241
4242        let thread5 = {
4243            let cache5 = cache.clone();
4244            spawn(move || {
4245                sleep(Duration::from_millis(700));
4246                cache5.entry_by_ref(&KEY).and_compute_with(|maybe_entry| {
4247                    assert!(maybe_entry.is_none());
4248                    sleep(Duration::from_millis(200));
4249                    compute::Op::Put(Arc::new(RwLock::new(vec![5])))
4250                })
4251            })
4252        };
4253
4254        let thread6 = {
4255            let cache6 = cache.clone();
4256            spawn(move || {
4257                sleep(Duration::from_millis(900));
4258                cache6.entry_by_ref(&KEY).and_compute_with(|maybe_entry| {
4259                    let entry = maybe_entry.expect("The entry should exist");
4260                    let value = entry.into_value();
4261                    assert_eq!(*value.read().unwrap(), vec![5]);
4262                    sleep(Duration::from_millis(100));
4263                    compute::Op::Nop
4264                })
4265            })
4266        };
4267
4268        let res1 = thread1.join().expect("Thread 1 should finish");
4269        let res2 = thread2.join().expect("Thread 2 should finish");
4270        let res3 = thread3.join().expect("Thread 3 should finish");
4271        let res4 = thread4.join().expect("Thread 4 should finish");
4272        let res5 = thread5.join().expect("Thread 5 should finish");
4273        let res6 = thread6.join().expect("Thread 6 should finish");
4274
4275        let compute::CompResult::Inserted(entry) = res1 else {
4276            panic!("Expected `Inserted`. Got {res1:?}")
4277        };
4278        assert_eq!(
4279            *entry.into_value().read().unwrap(),
4280            vec![1, 2] // The same Vec was modified by task2.
4281        );
4282
4283        let compute::CompResult::ReplacedWith(entry) = res2 else {
4284            panic!("Expected `ReplacedWith`. Got {res2:?}")
4285        };
4286        assert_eq!(*entry.into_value().read().unwrap(), vec![1, 2]);
4287
4288        let compute::CompResult::Removed(entry) = res3 else {
4289            panic!("Expected `Removed`. Got {res3:?}")
4290        };
4291        assert_eq!(*entry.into_value().read().unwrap(), vec![1, 2]);
4292
4293        let compute::CompResult::StillNone(key) = res4 else {
4294            panic!("Expected `StillNone`. Got {res4:?}")
4295        };
4296        assert_eq!(*key, KEY);
4297
4298        let compute::CompResult::Inserted(entry) = res5 else {
4299            panic!("Expected `Inserted`. Got {res5:?}")
4300        };
4301        assert_eq!(*entry.into_value().read().unwrap(), vec![5]);
4302
4303        let compute::CompResult::Unchanged(entry) = res6 else {
4304            panic!("Expected `Unchanged`. Got {res6:?}")
4305        };
4306        assert_eq!(*entry.into_value().read().unwrap(), vec![5]);
4307
4308        assert!(cache.is_waiter_map_empty());
4309    }
4310
4311    #[test]
4312    fn try_compute_with() {
4313        use crate::ops::compute;
4314        use std::{
4315            sync::RwLock,
4316            thread::{sleep, spawn},
4317        };
4318
4319        let cache: Cache<u32, Arc<RwLock<Vec<i32>>>> = Cache::new(100);
4320        const KEY: u32 = 0;
4321
4322        // Spawn four threads to call `and_try_compute_with` for the same key. Ensure
4323        // the key-level lock is working by verifying the value after all threads
4324        // finish.
4325        //
4326        // |         |  thread 1  |   thread 2    |  thread 3  | thread 4   |
4327        // |---------|------------|---------------|------------|------------|
4328        // |    0 ms | get none   |               |            |            |
4329        // |  100 ms |            | blocked       |            |            |
4330        // |  200 ms | insert [1] |               |            |            |
4331        // |         |            | get [1]       |            |            |
4332        // |  300 ms |            |               | blocked    |            |
4333        // |  400 ms |            | insert [1, 2] |            |            |
4334        // |         |            |               | get [1, 2] |            |
4335        // |  500 ms |            |               |            | blocked    |
4336        // |  600 ms |            |               | err        |            |
4337        // |         |            |               |            | get [1, 2] |
4338        // |  700 ms |            |               |            | remove     |
4339        //
4340        // This test is shorter than `compute_with` test because this one omits `Nop`
4341        // cases.
4342
4343        let thread1 = {
4344            let cache1 = cache.clone();
4345            spawn(move || {
4346                cache1.entry(KEY).and_try_compute_with(|maybe_entry| {
4347                    sleep(Duration::from_millis(200));
4348                    assert!(maybe_entry.is_none());
4349                    Ok(compute::Op::Put(Arc::new(RwLock::new(vec![1])))) as Result<_, ()>
4350                })
4351            })
4352        };
4353
4354        let thread2 = {
4355            let cache2 = cache.clone();
4356            spawn(move || {
4357                sleep(Duration::from_millis(100));
4358                cache2
4359                    .entry_by_ref(&KEY)
4360                    .and_try_compute_with(|maybe_entry| {
4361                        let entry = maybe_entry.expect("The entry should exist");
4362                        let value = entry.into_value();
4363                        assert_eq!(*value.read().unwrap(), vec![1]);
4364                        sleep(Duration::from_millis(200));
4365                        value.write().unwrap().push(2);
4366                        Ok(compute::Op::Put(value)) as Result<_, ()>
4367                    })
4368            })
4369        };
4370
4371        let thread3 = {
4372            let cache3 = cache.clone();
4373            spawn(move || {
4374                sleep(Duration::from_millis(300));
4375                cache3.entry(KEY).and_try_compute_with(|maybe_entry| {
4376                    let entry = maybe_entry.expect("The entry should exist");
4377                    let value = entry.into_value();
4378                    assert_eq!(*value.read().unwrap(), vec![1, 2]);
4379                    sleep(Duration::from_millis(200));
4380                    Err(())
4381                })
4382            })
4383        };
4384
4385        let thread4 = {
4386            let cache4 = cache.clone();
4387            spawn(move || {
4388                sleep(Duration::from_millis(500));
4389                cache4.entry(KEY).and_try_compute_with(|maybe_entry| {
4390                    let entry = maybe_entry.expect("The entry should exist");
4391                    let value = entry.into_value();
4392                    assert_eq!(*value.read().unwrap(), vec![1, 2]);
4393                    sleep(Duration::from_millis(100));
4394                    Ok(compute::Op::Remove) as Result<_, ()>
4395                })
4396            })
4397        };
4398
4399        let res1 = thread1.join().expect("Thread 1 should finish");
4400        let res2 = thread2.join().expect("Thread 2 should finish");
4401        let res3 = thread3.join().expect("Thread 3 should finish");
4402        let res4 = thread4.join().expect("Thread 4 should finish");
4403
4404        let Ok(compute::CompResult::Inserted(entry)) = res1 else {
4405            panic!("Expected `Inserted`. Got {res1:?}")
4406        };
4407        assert_eq!(
4408            *entry.into_value().read().unwrap(),
4409            vec![1, 2] // The same Vec was modified by task2.
4410        );
4411
4412        let Ok(compute::CompResult::ReplacedWith(entry)) = res2 else {
4413            panic!("Expected `ReplacedWith`. Got {res2:?}")
4414        };
4415        assert_eq!(*entry.into_value().read().unwrap(), vec![1, 2]);
4416
4417        assert!(res3.is_err());
4418
4419        let Ok(compute::CompResult::Removed(entry)) = res4 else {
4420            panic!("Expected `Removed`. Got {res4:?}")
4421        };
4422        assert_eq!(
4423            *entry.into_value().read().unwrap(),
4424            vec![1, 2] // Removed value.
4425        );
4426
4427        assert!(cache.is_waiter_map_empty());
4428    }
4429
4430    #[test]
4431    // https://github.com/moka-rs/moka/issues/43
4432    fn handle_panic_in_get_with() {
4433        use std::{sync::Barrier, thread};
4434
4435        let cache = Cache::new(16);
4436        let barrier = Arc::new(Barrier::new(2));
4437        {
4438            let cache_ref = cache.clone();
4439            let barrier_ref = barrier.clone();
4440            thread::spawn(move || {
4441                let _ = cache_ref.get_with(1, || {
4442                    barrier_ref.wait();
4443                    thread::sleep(Duration::from_millis(50));
4444                    panic!("Panic during get_with");
4445                });
4446            });
4447        }
4448
4449        barrier.wait();
4450        assert_eq!(cache.get_with(1, || 5), 5);
4451
4452        assert!(cache.is_waiter_map_empty());
4453    }
4454
4455    #[test]
4456    // https://github.com/moka-rs/moka/issues/43
4457    fn handle_panic_in_try_get_with() {
4458        use std::{sync::Barrier, thread};
4459
4460        let cache = Cache::new(16);
4461        let barrier = Arc::new(Barrier::new(2));
4462        {
4463            let cache_ref = cache.clone();
4464            let barrier_ref = barrier.clone();
4465            thread::spawn(move || {
4466                let _ = cache_ref.try_get_with(1, || {
4467                    barrier_ref.wait();
4468                    thread::sleep(Duration::from_millis(50));
4469                    panic!("Panic during try_get_with");
4470                }) as Result<_, Arc<Infallible>>;
4471            });
4472        }
4473
4474        barrier.wait();
4475        assert_eq!(
4476            cache.try_get_with(1, || Ok(5)) as Result<_, Arc<Infallible>>,
4477            Ok(5)
4478        );
4479
4480        assert!(cache.is_waiter_map_empty());
4481    }
4482
4483    #[test]
4484    fn test_removal_notifications() {
4485        // The following `Vec`s will hold actual and expected notifications.
4486        let actual = Arc::new(Mutex::new(Vec::new()));
4487        let mut expected = Vec::new();
4488
4489        // Create an eviction listener.
4490        let a1 = Arc::clone(&actual);
4491        let listener = move |k, v, cause| a1.lock().push((k, v, cause));
4492
4493        // Create a cache with the eviction listener.
4494        let mut cache = Cache::builder()
4495            .max_capacity(3)
4496            .eviction_listener(listener)
4497            .build();
4498        cache.reconfigure_for_testing();
4499
4500        // Make the cache exterior immutable.
4501        let cache = cache;
4502
4503        cache.insert('a', "alice");
4504        cache.invalidate(&'a');
4505        expected.push((Arc::new('a'), "alice", RemovalCause::Explicit));
4506
4507        cache.run_pending_tasks();
4508        assert_eq!(cache.entry_count(), 0);
4509
4510        cache.insert('b', "bob");
4511        cache.insert('c', "cathy");
4512        cache.insert('d', "david");
4513        cache.run_pending_tasks();
4514        assert_eq!(cache.entry_count(), 3);
4515
4516        // This will be rejected due to the size constraint.
4517        cache.insert('e', "emily");
4518        expected.push((Arc::new('e'), "emily", RemovalCause::Size));
4519        cache.run_pending_tasks();
4520        assert_eq!(cache.entry_count(), 3);
4521
4522        // Raise the popularity of 'e' so it will be accepted next time.
4523        cache.get(&'e');
4524        cache.run_pending_tasks();
4525
4526        // Retry.
4527        cache.insert('e', "eliza");
4528        // and the LRU entry will be evicted.
4529        expected.push((Arc::new('b'), "bob", RemovalCause::Size));
4530        cache.run_pending_tasks();
4531        assert_eq!(cache.entry_count(), 3);
4532
4533        // Replace an existing entry.
4534        cache.insert('d', "dennis");
4535        expected.push((Arc::new('d'), "david", RemovalCause::Replaced));
4536        cache.run_pending_tasks();
4537        assert_eq!(cache.entry_count(), 3);
4538
4539        verify_notification_vec(&cache, actual, &expected);
4540    }
4541
4542    #[test]
4543    fn test_immediate_removal_notifications_with_updates() {
4544        // The following `Vec` will hold actual notifications.
4545        let actual = Arc::new(Mutex::new(Vec::new()));
4546
4547        // Create an eviction listener.
4548        let a1 = Arc::clone(&actual);
4549        let listener = move |k, v, cause| a1.lock().push((k, v, cause));
4550
4551        let (clock, mock) = Clock::mock();
4552
4553        // Create a cache with the eviction listener and also TTL and TTI.
4554        let mut cache = Cache::builder()
4555            .eviction_listener(listener)
4556            .time_to_live(Duration::from_secs(7))
4557            .time_to_idle(Duration::from_secs(5))
4558            .clock(clock)
4559            .build();
4560        cache.reconfigure_for_testing();
4561
4562        // Make the cache exterior immutable.
4563        let cache = cache;
4564
4565        cache.insert("alice", "a0");
4566        cache.run_pending_tasks();
4567
4568        // Now alice (a0) has been expired by the idle timeout (TTI).
4569        mock.increment(Duration::from_secs(6));
4570        assert_eq!(cache.get(&"alice"), None);
4571
4572        // We have not ran sync after the expiration of alice (a0), so it is
4573        // still in the cache.
4574        assert_eq!(cache.entry_count(), 1);
4575
4576        // Re-insert alice with a different value. Since alice (a0) is still
4577        // in the cache, this is actually a replace operation rather than an
4578        // insert operation. We want to verify that the RemovalCause of a0 is
4579        // Expired, not Replaced.
4580        cache.insert("alice", "a1");
4581        {
4582            let mut a = actual.lock();
4583            assert_eq!(a.len(), 1);
4584            assert_eq!(a[0], (Arc::new("alice"), "a0", RemovalCause::Expired));
4585            a.clear();
4586        }
4587
4588        cache.run_pending_tasks();
4589
4590        mock.increment(Duration::from_secs(4));
4591        assert_eq!(cache.get(&"alice"), Some("a1"));
4592        cache.run_pending_tasks();
4593
4594        // Now alice has been expired by time-to-live (TTL).
4595        mock.increment(Duration::from_secs(4));
4596        assert_eq!(cache.get(&"alice"), None);
4597
4598        // But, again, it is still in the cache.
4599        assert_eq!(cache.entry_count(), 1);
4600
4601        // Re-insert alice with a different value and verify that the
4602        // RemovalCause of a1 is Expired (not Replaced).
4603        cache.insert("alice", "a2");
4604        {
4605            let mut a = actual.lock();
4606            assert_eq!(a.len(), 1);
4607            assert_eq!(a[0], (Arc::new("alice"), "a1", RemovalCause::Expired));
4608            a.clear();
4609        }
4610
4611        cache.run_pending_tasks();
4612
4613        assert_eq!(cache.entry_count(), 1);
4614
4615        // Now alice (a2) has been expired by the idle timeout.
4616        mock.increment(Duration::from_secs(6));
4617        assert_eq!(cache.get(&"alice"), None);
4618        assert_eq!(cache.entry_count(), 1);
4619
4620        // This invalidate will internally remove alice (a2).
4621        cache.invalidate(&"alice");
4622        cache.run_pending_tasks();
4623        assert_eq!(cache.entry_count(), 0);
4624
4625        {
4626            let mut a = actual.lock();
4627            assert_eq!(a.len(), 1);
4628            assert_eq!(a[0], (Arc::new("alice"), "a2", RemovalCause::Expired));
4629            a.clear();
4630        }
4631
4632        // Re-insert, and this time, make it expired by the TTL.
4633        cache.insert("alice", "a3");
4634        cache.run_pending_tasks();
4635        mock.increment(Duration::from_secs(4));
4636        assert_eq!(cache.get(&"alice"), Some("a3"));
4637        cache.run_pending_tasks();
4638        mock.increment(Duration::from_secs(4));
4639        assert_eq!(cache.get(&"alice"), None);
4640        assert_eq!(cache.entry_count(), 1);
4641
4642        // This invalidate will internally remove alice (a2).
4643        cache.invalidate(&"alice");
4644        cache.run_pending_tasks();
4645
4646        assert_eq!(cache.entry_count(), 0);
4647
4648        {
4649            let mut a = actual.lock();
4650            assert_eq!(a.len(), 1);
4651            assert_eq!(a[0], (Arc::new("alice"), "a3", RemovalCause::Expired));
4652            a.clear();
4653        }
4654
4655        assert!(cache.key_locks_map_is_empty());
4656    }
4657
4658    // This test ensures the key-level lock for the immediate notification
4659    // delivery mode is working so that the notifications for a given key
4660    // should always be ordered. This is true even if multiple client threads
4661    // try to modify the entries for the key at the same time. (This test will
4662    // run three client threads)
4663    //
4664    // This test is ignored by default. It becomes unstable when run in parallel
4665    // with other tests.
4666    #[test]
4667    #[ignore]
4668    fn test_key_lock_used_by_immediate_removal_notifications() {
4669        use std::thread::{sleep, spawn};
4670
4671        const KEY: &str = "alice";
4672
4673        type Val = &'static str;
4674
4675        #[derive(PartialEq, Eq, Debug)]
4676        enum Event {
4677            Insert(Val),
4678            Invalidate(Val),
4679            BeginNotify(Val, RemovalCause),
4680            EndNotify(Val, RemovalCause),
4681        }
4682
4683        // The following `Vec will hold actual notifications.
4684        let actual = Arc::new(Mutex::new(Vec::new()));
4685
4686        // Create an eviction listener.
4687        // Note that this listener is slow and will take 300 ms to complete.
4688        let a0 = Arc::clone(&actual);
4689        let listener = move |_k, v, cause| {
4690            a0.lock().push(Event::BeginNotify(v, cause));
4691            sleep(Duration::from_millis(300));
4692            a0.lock().push(Event::EndNotify(v, cause));
4693        };
4694
4695        // Create a cache with the eviction listener and also TTL 500 ms.
4696        let mut cache = Cache::builder()
4697            .eviction_listener(listener)
4698            .time_to_live(Duration::from_millis(500))
4699            .build();
4700        cache.reconfigure_for_testing();
4701
4702        // Make the cache exterior immutable.
4703        let cache = cache;
4704
4705        // - Notifications for the same key must not overlap.
4706
4707        // Time  Event
4708        // ----- -------------------------------------
4709        // 0000: Insert value a0
4710        // 0500: a0 expired
4711        // 0600: Insert value a1 -> expired a0 (N-A0)
4712        // 0800: Insert value a2 (waiting) (A-A2)
4713        // 0900: N-A0 processed
4714        //       A-A2 finished waiting -> replace a1 (N-A1)
4715        // 1100: Invalidate (waiting) (R-A2)
4716        // 1200: N-A1 processed
4717        //       R-A2 finished waiting -> explicit a2 (N-A2)
4718        // 1500: N-A2 processed
4719
4720        let expected = vec![
4721            Event::Insert("a0"),
4722            Event::Insert("a1"),
4723            Event::BeginNotify("a0", RemovalCause::Expired),
4724            Event::Insert("a2"),
4725            Event::EndNotify("a0", RemovalCause::Expired),
4726            Event::BeginNotify("a1", RemovalCause::Replaced),
4727            Event::Invalidate("a2"),
4728            Event::EndNotify("a1", RemovalCause::Replaced),
4729            Event::BeginNotify("a2", RemovalCause::Explicit),
4730            Event::EndNotify("a2", RemovalCause::Explicit),
4731        ];
4732
4733        // 0000: Insert value a0
4734        actual.lock().push(Event::Insert("a0"));
4735        cache.insert(KEY, "a0");
4736        // Call `sync` to set the last modified for the KEY immediately so that
4737        // this entry should expire in 1000 ms from now.
4738        cache.run_pending_tasks();
4739
4740        // 0500: Insert value a1 -> expired a0 (N-A0)
4741        let thread1 = {
4742            let a1 = Arc::clone(&actual);
4743            let c1 = cache.clone();
4744            spawn(move || {
4745                sleep(Duration::from_millis(600));
4746                a1.lock().push(Event::Insert("a1"));
4747                c1.insert(KEY, "a1");
4748            })
4749        };
4750
4751        // 0800: Insert value a2 (waiting) (A-A2)
4752        let thread2 = {
4753            let a2 = Arc::clone(&actual);
4754            let c2 = cache.clone();
4755            spawn(move || {
4756                sleep(Duration::from_millis(800));
4757                a2.lock().push(Event::Insert("a2"));
4758                c2.insert(KEY, "a2");
4759            })
4760        };
4761
4762        // 1100: Invalidate (waiting) (R-A2)
4763        let thread3 = {
4764            let a3 = Arc::clone(&actual);
4765            let c3 = cache.clone();
4766            spawn(move || {
4767                sleep(Duration::from_millis(1100));
4768                a3.lock().push(Event::Invalidate("a2"));
4769                c3.invalidate(&KEY);
4770            })
4771        };
4772
4773        for t in [thread1, thread2, thread3] {
4774            t.join().expect("Failed to join");
4775        }
4776
4777        let actual = actual.lock();
4778        assert_eq!(actual.len(), expected.len());
4779
4780        for (i, (actual, expected)) in actual.iter().zip(&expected).enumerate() {
4781            assert_eq!(actual, expected, "expected[{i}]");
4782        }
4783
4784        assert!(cache.key_locks_map_is_empty());
4785    }
4786
4787    // When the eviction listener is not set, calling `run_pending_tasks` once should
4788    // evict all entries that can be removed.
4789    #[test]
4790    fn no_batch_size_limit_on_eviction() {
4791        const MAX_CAPACITY: u64 = 20;
4792
4793        const EVICTION_TIMEOUT: Duration = Duration::from_nanos(0);
4794        const MAX_LOG_SYNC_REPEATS: u32 = 1;
4795        const EVICTION_BATCH_SIZE: u32 = 1;
4796
4797        let hk_conf = HousekeeperConfig::new(
4798            // Timeout should be ignored when the eviction listener is not provided.
4799            Some(EVICTION_TIMEOUT),
4800            Some(MAX_LOG_SYNC_REPEATS),
4801            Some(EVICTION_BATCH_SIZE),
4802        );
4803
4804        // Create a cache with the LRU policy.
4805        let mut cache = Cache::builder()
4806            .max_capacity(MAX_CAPACITY)
4807            .eviction_policy(EvictionPolicy::lru())
4808            .housekeeper_config(hk_conf)
4809            .build();
4810        cache.reconfigure_for_testing();
4811
4812        // Make the cache exterior immutable.
4813        let cache = cache;
4814
4815        // Fill the cache.
4816        for i in 0..MAX_CAPACITY {
4817            let v = format!("v{i}");
4818            cache.insert(i, v)
4819        }
4820        // The max capacity should not change because we have not called
4821        // `run_pending_tasks` yet.
4822        assert_eq!(cache.entry_count(), 0);
4823
4824        cache.run_pending_tasks();
4825        assert_eq!(cache.entry_count(), MAX_CAPACITY);
4826
4827        // Insert more items to the cache.
4828        for i in MAX_CAPACITY..(MAX_CAPACITY * 2) {
4829            let v = format!("v{i}");
4830            cache.insert(i, v)
4831        }
4832        // The max capacity should not change because we have not called
4833        // `run_pending_tasks` yet.
4834        assert_eq!(cache.entry_count(), MAX_CAPACITY);
4835        // Both old and new keys should exist.
4836        assert!(cache.contains_key(&0)); // old
4837        assert!(cache.contains_key(&(MAX_CAPACITY - 1))); // old
4838        assert!(cache.contains_key(&(MAX_CAPACITY * 2 - 1))); // new
4839
4840        // Process the remaining write op logs (there should be MAX_CAPACITY logs),
4841        // and evict the LRU entries.
4842        cache.run_pending_tasks();
4843        assert_eq!(cache.entry_count(), MAX_CAPACITY);
4844
4845        // Now all the old keys should be gone.
4846        assert!(!cache.contains_key(&0));
4847        assert!(!cache.contains_key(&(MAX_CAPACITY - 1)));
4848        // And the new keys should exist.
4849        assert!(cache.contains_key(&(MAX_CAPACITY * 2 - 1)));
4850    }
4851
4852    #[test]
4853    fn slow_eviction_listener() {
4854        const MAX_CAPACITY: u64 = 20;
4855
4856        const EVICTION_TIMEOUT: Duration = Duration::from_millis(30);
4857        const LISTENER_DELAY: Duration = Duration::from_millis(11);
4858        const MAX_LOG_SYNC_REPEATS: u32 = 1;
4859        const EVICTION_BATCH_SIZE: u32 = 1;
4860
4861        let hk_conf = HousekeeperConfig::new(
4862            Some(EVICTION_TIMEOUT),
4863            Some(MAX_LOG_SYNC_REPEATS),
4864            Some(EVICTION_BATCH_SIZE),
4865        );
4866
4867        let (clock, mock) = Clock::mock();
4868        let listener_call_count = Arc::new(AtomicU8::new(0));
4869        let lcc = Arc::clone(&listener_call_count);
4870
4871        // A slow eviction listener that spend `LISTENER_DELAY` to process a removal
4872        // notification.
4873        let listener = move |_k, _v, _cause| {
4874            mock.increment(LISTENER_DELAY);
4875            lcc.fetch_add(1, Ordering::AcqRel);
4876        };
4877
4878        // Create a cache with the LRU policy.
4879        let mut cache = Cache::builder()
4880            .max_capacity(MAX_CAPACITY)
4881            .eviction_policy(EvictionPolicy::lru())
4882            .eviction_listener(listener)
4883            .housekeeper_config(hk_conf)
4884            .clock(clock)
4885            .build();
4886        cache.reconfigure_for_testing();
4887
4888        // Make the cache exterior immutable.
4889        let cache = cache;
4890
4891        // Fill the cache.
4892        for i in 0..MAX_CAPACITY {
4893            let v = format!("v{i}");
4894            cache.insert(i, v)
4895        }
4896        // The max capacity should not change because we have not called
4897        // `run_pending_tasks` yet.
4898        assert_eq!(cache.entry_count(), 0);
4899
4900        cache.run_pending_tasks();
4901        assert_eq!(listener_call_count.load(Ordering::Acquire), 0);
4902        assert_eq!(cache.entry_count(), MAX_CAPACITY);
4903
4904        // Insert more items to the cache.
4905        for i in MAX_CAPACITY..(MAX_CAPACITY * 2) {
4906            let v = format!("v{i}");
4907            cache.insert(i, v);
4908        }
4909        assert_eq!(cache.entry_count(), MAX_CAPACITY);
4910
4911        cache.run_pending_tasks();
4912        // Because of the slow listener, cache should get an over capacity.
4913        let mut expected_call_count = 3;
4914        assert_eq!(
4915            listener_call_count.load(Ordering::Acquire) as u64,
4916            expected_call_count
4917        );
4918        assert_eq!(cache.entry_count(), MAX_CAPACITY * 2 - expected_call_count);
4919
4920        loop {
4921            cache.run_pending_tasks();
4922
4923            expected_call_count += 3;
4924            if expected_call_count > MAX_CAPACITY {
4925                expected_call_count = MAX_CAPACITY;
4926            }
4927
4928            let actual_count = listener_call_count.load(Ordering::Acquire) as u64;
4929            assert_eq!(actual_count, expected_call_count);
4930            let expected_entry_count = MAX_CAPACITY * 2 - expected_call_count;
4931            assert_eq!(cache.entry_count(), expected_entry_count);
4932
4933            if expected_call_count >= MAX_CAPACITY {
4934                break;
4935            }
4936        }
4937
4938        assert_eq!(cache.entry_count(), MAX_CAPACITY);
4939    }
4940
4941    // NOTE: To enable the panic logging, run the following command:
4942    //
4943    // RUST_LOG=moka=info cargo test --features 'logging' -- \
4944    //   sync::cache::tests::recover_from_panicking_eviction_listener --exact --nocapture
4945    //
4946    #[test]
4947    fn recover_from_panicking_eviction_listener() {
4948        #[cfg(feature = "logging")]
4949        let _ = env_logger::builder().is_test(true).try_init();
4950
4951        // The following `Vec`s will hold actual and expected notifications.
4952        let actual = Arc::new(Mutex::new(Vec::new()));
4953        let mut expected = Vec::new();
4954
4955        // Create an eviction listener that panics when it see
4956        // a value "panic now!".
4957        let a1 = Arc::clone(&actual);
4958        let listener = move |k, v, cause| {
4959            if v == "panic now!" {
4960                panic!("Panic now!");
4961            }
4962            a1.lock().push((k, v, cause))
4963        };
4964
4965        // Create a cache with the eviction listener.
4966        let mut cache = Cache::builder()
4967            .name("My Sync Cache")
4968            .eviction_listener(listener)
4969            .build();
4970        cache.reconfigure_for_testing();
4971
4972        // Make the cache exterior immutable.
4973        let cache = cache;
4974
4975        // Insert an okay value.
4976        cache.insert("alice", "a0");
4977        cache.run_pending_tasks();
4978
4979        // Insert a value that will cause the eviction listener to panic.
4980        cache.insert("alice", "panic now!");
4981        expected.push((Arc::new("alice"), "a0", RemovalCause::Replaced));
4982        cache.run_pending_tasks();
4983
4984        // Insert an okay value. This will replace the previous
4985        // value "panic now!" so the eviction listener will panic.
4986        cache.insert("alice", "a2");
4987        cache.run_pending_tasks();
4988        // No more removal notification should be sent.
4989
4990        // Invalidate the okay value.
4991        cache.invalidate(&"alice");
4992        cache.run_pending_tasks();
4993
4994        verify_notification_vec(&cache, actual, &expected);
4995    }
4996
4997    // This test ensures that the `contains_key`, `get` and `invalidate` can use
4998    // borrowed form `&[u8]` for key with type `Vec<u8>`.
4999    // https://github.com/moka-rs/moka/issues/166
5000    #[test]
5001    fn borrowed_forms_of_key() {
5002        let cache: Cache<Vec<u8>, ()> = Cache::new(1);
5003
5004        let key = vec![1_u8];
5005        cache.insert(key.clone(), ());
5006
5007        // key as &Vec<u8>
5008        let key_v: &Vec<u8> = &key;
5009        assert!(cache.contains_key(key_v));
5010        assert_eq!(cache.get(key_v), Some(()));
5011        cache.invalidate(key_v);
5012
5013        cache.insert(key, ());
5014
5015        // key as &[u8]
5016        let key_s: &[u8] = &[1_u8];
5017        assert!(cache.contains_key(key_s));
5018        assert_eq!(cache.get(key_s), Some(()));
5019        cache.invalidate(key_s);
5020    }
5021
5022    // Ignored by default. This test becomes unstable when run in parallel with
5023    // other tests.
5024    #[test]
5025    #[ignore]
5026    fn drop_value_immediately_after_eviction() {
5027        use crate::common::test_utils::{Counters, Value};
5028
5029        const MAX_CAPACITY: u32 = 500;
5030        const KEYS: u32 = ((MAX_CAPACITY as f64) * 1.2) as u32;
5031
5032        let counters = Arc::new(Counters::default());
5033        let counters1 = Arc::clone(&counters);
5034
5035        let listener = move |_k, _v, cause| match cause {
5036            RemovalCause::Size => counters1.incl_evicted(),
5037            RemovalCause::Explicit => counters1.incl_invalidated(),
5038            _ => (),
5039        };
5040
5041        let mut cache = Cache::builder()
5042            .max_capacity(MAX_CAPACITY as u64)
5043            .eviction_listener(listener)
5044            .build();
5045        cache.reconfigure_for_testing();
5046
5047        // Make the cache exterior immutable.
5048        let cache = cache;
5049
5050        for key in 0..KEYS {
5051            let value = Arc::new(Value::new(vec![0u8; 1024], &counters));
5052            cache.insert(key, value);
5053            counters.incl_inserted();
5054            cache.run_pending_tasks();
5055        }
5056
5057        let eviction_count = KEYS - MAX_CAPACITY;
5058
5059        cache.run_pending_tasks();
5060        assert_eq!(counters.inserted(), KEYS, "inserted");
5061        assert_eq!(counters.value_created(), KEYS, "value_created");
5062        assert_eq!(counters.evicted(), eviction_count, "evicted");
5063        assert_eq!(counters.invalidated(), 0, "invalidated");
5064        assert_eq!(counters.value_dropped(), eviction_count, "value_dropped");
5065
5066        for key in 0..KEYS {
5067            cache.invalidate(&key);
5068            cache.run_pending_tasks();
5069        }
5070
5071        cache.run_pending_tasks();
5072        assert_eq!(counters.inserted(), KEYS, "inserted");
5073        assert_eq!(counters.value_created(), KEYS, "value_created");
5074        assert_eq!(counters.evicted(), eviction_count, "evicted");
5075        assert_eq!(counters.invalidated(), MAX_CAPACITY, "invalidated");
5076        assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
5077
5078        std::mem::drop(cache);
5079        assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
5080    }
5081
5082    // For testing the issue reported by: https://github.com/moka-rs/moka/issues/383
5083    //
5084    // Ignored by default. This test becomes unstable when run in parallel with
5085    // other tests.
5086    #[test]
5087    #[ignore]
5088    fn ensure_gc_runs_when_dropping_cache() {
5089        let cache = Cache::builder().build();
5090        let val = Arc::new(0);
5091        {
5092            let val = Arc::clone(&val);
5093            cache.get_with(1, move || val);
5094        }
5095        drop(cache);
5096        assert_eq!(Arc::strong_count(&val), 1);
5097    }
5098
5099    #[test]
5100    fn test_debug_format() {
5101        let cache = Cache::new(10);
5102        cache.insert('a', "alice");
5103        cache.insert('b', "bob");
5104        cache.insert('c', "cindy");
5105
5106        let debug_str = format!("{cache:?}");
5107        assert!(debug_str.starts_with('{'));
5108        assert!(debug_str.contains(r#"'a': "alice""#));
5109        assert!(debug_str.contains(r#"'b': "bob""#));
5110        assert!(debug_str.contains(r#"'c': "cindy""#));
5111        assert!(debug_str.ends_with('}'));
5112    }
5113
5114    type NotificationTuple<K, V> = (Arc<K>, V, RemovalCause);
5115
5116    fn verify_notification_vec<K, V, S>(
5117        cache: &Cache<K, V, S>,
5118        actual: Arc<Mutex<Vec<NotificationTuple<K, V>>>>,
5119        expected: &[NotificationTuple<K, V>],
5120    ) where
5121        K: std::hash::Hash + Eq + std::fmt::Debug + Send + Sync + 'static,
5122        V: Eq + std::fmt::Debug + Clone + Send + Sync + 'static,
5123        S: std::hash::BuildHasher + Clone + Send + Sync + 'static,
5124    {
5125        // Retries will be needed when testing in a QEMU VM.
5126        const MAX_RETRIES: usize = 5;
5127        let mut retries = 0;
5128        loop {
5129            // Ensure all scheduled notifications have been processed.
5130            cache.run_pending_tasks();
5131            std::thread::sleep(Duration::from_millis(500));
5132
5133            let actual = &*actual.lock();
5134            if actual.len() != expected.len() {
5135                if retries <= MAX_RETRIES {
5136                    retries += 1;
5137                    continue;
5138                } else {
5139                    assert_eq!(actual.len(), expected.len(), "Retries exhausted");
5140                }
5141            }
5142
5143            for (i, (actual, expected)) in actual.iter().zip(expected).enumerate() {
5144                assert_eq!(actual, expected, "expected[{i}]");
5145            }
5146
5147            break;
5148        }
5149    }
5150}